Fixed application deployment. Fault recovery works correctly.
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_scheduling@303 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/CCConfig.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/CCConfig.java
index 7cdaee1..4d9b5a0 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/CCConfig.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/CCConfig.java
@@ -32,6 +32,6 @@
@Option(name = "-profile-dump-period", usage = "Sets the time duration between two profile dumps from each node controller in milliseconds. 0 to disable. (default: 0)")
public int profileDumpPeriod = 0;
- @Option(name = "-use-jol", usage = "Forces Hyracks to use the JOL based scheduler (default: false)")
- public boolean useJOL = false;
+ @Option(name = "-default-max-job-attempts", usage = "Sets the default number of job attempts allowed if not specified in the job specification. (default: 5)")
+ public int defaultMaxJobAttempts = 5;
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
index bf1f0bd..59072f8 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
@@ -53,7 +53,7 @@
private final Set<ConstraintExpression> userConstraints;
- private int maxRetries;
+ private int maxAttempts;
public JobSpecification() {
roots = new ArrayList<OperatorDescriptorId>();
@@ -173,12 +173,12 @@
return roots;
}
- public void setMaxRetries(int maxRetries) {
- this.maxRetries = maxRetries;
+ public void setMaxAttempts(int maxAttempts) {
+ this.maxAttempts = maxAttempts;
}
- public int getMaxRetries() {
- return maxRetries;
+ public int getMaxAttempts() {
+ return maxAttempts;
}
public void addUserConstraint(ConstraintExpression constraint) {
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 8772940..a4aa482 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -58,6 +58,7 @@
import edu.uci.ics.hyracks.control.cc.job.manager.events.StageletCompleteEvent;
import edu.uci.ics.hyracks.control.cc.job.manager.events.StageletFailureEvent;
import edu.uci.ics.hyracks.control.cc.job.manager.events.UnregisterNodeEvent;
+import edu.uci.ics.hyracks.control.cc.jobqueue.FutureValue;
import edu.uci.ics.hyracks.control.cc.jobqueue.JobQueue;
import edu.uci.ics.hyracks.control.cc.scheduler.IScheduler;
import edu.uci.ics.hyracks.control.cc.scheduler.naive.NaiveScheduler;
@@ -248,14 +249,16 @@
@Override
public void destroyApplication(String appName) throws Exception {
- ApplicationDestroyEvent de = new ApplicationDestroyEvent(this, appName);
- jobQueue.scheduleAndSync(de);
+ FutureValue fv = new FutureValue();
+ jobQueue.schedule(new ApplicationDestroyEvent(this, appName, fv));
+ fv.get();
}
@Override
public void startApplication(final String appName) throws Exception {
- ApplicationStartEvent r = new ApplicationStartEvent(this, appName);
- jobQueue.scheduleAndSync(r);
+ FutureValue fv = new FutureValue();
+ jobQueue.schedule(new ApplicationStartEvent(this, appName, fv));
+ fv.get();
}
@Override
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/ApplicationDestroyEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/ApplicationDestroyEvent.java
index ce22334..69c244a 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/ApplicationDestroyEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/ApplicationDestroyEvent.java
@@ -19,33 +19,56 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableRunnable;
+import edu.uci.ics.hyracks.control.cc.jobqueue.FutureValue;
import edu.uci.ics.hyracks.control.cc.remote.RemoteOp;
import edu.uci.ics.hyracks.control.cc.remote.RemoteRunner;
import edu.uci.ics.hyracks.control.cc.remote.ops.ApplicationDestroyer;
import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
-public class ApplicationDestroyEvent extends SynchronizableRunnable {
+public class ApplicationDestroyEvent implements Runnable {
private final ClusterControllerService ccs;
private final String appName;
+ private FutureValue fv;
- public ApplicationDestroyEvent(ClusterControllerService ccs, String appName) {
+ public ApplicationDestroyEvent(ClusterControllerService ccs, String appName, FutureValue fv) {
this.ccs = ccs;
this.appName = appName;
+ this.fv = fv;
}
@Override
- protected void doRun() throws Exception {
- ApplicationContext appCtx = ccs.getApplicationMap().remove(appName);
+ public void run() {
+ final ApplicationContext appCtx = ccs.getApplicationMap().remove(appName);
if (appCtx == null) {
- throw new HyracksException("No application with name: " + appName);
+ fv.setException(new HyracksException("No application with name: " + appName));
+ return;
}
List<RemoteOp<Void>> opList = new ArrayList<RemoteOp<Void>>();
for (final String nodeId : ccs.getNodeMap().keySet()) {
opList.add(new ApplicationDestroyer(nodeId, appName));
}
- RemoteOp[] ops = opList.toArray(new RemoteOp[opList.size()]);
- RemoteRunner.runRemote(ccs, ops, null);
- appCtx.deinitialize();
+ final RemoteOp[] ops = opList.toArray(new RemoteOp[opList.size()]);
+ ccs.getExecutor().execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ RemoteRunner.runRemote(ccs, ops, null);
+ } catch (Exception e) {
+ fv.setException(e);
+ return;
+ }
+ ccs.getJobQueue().schedule(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ appCtx.deinitialize();
+ } catch (Exception e) {
+ fv.setException(e);
+ }
+ fv.setValue(null);
+ }
+ });
+ }
+ });
}
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/ApplicationStartEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/ApplicationStartEvent.java
index db9e150..9f4ad8f 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/ApplicationStartEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/ApplicationStartEvent.java
@@ -20,36 +20,53 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.jobqueue.SynchronizableRunnable;
+import edu.uci.ics.hyracks.control.cc.jobqueue.FutureValue;
import edu.uci.ics.hyracks.control.cc.remote.RemoteOp;
import edu.uci.ics.hyracks.control.cc.remote.RemoteRunner;
import edu.uci.ics.hyracks.control.cc.remote.ops.ApplicationStarter;
import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
-public class ApplicationStartEvent extends SynchronizableRunnable {
+public class ApplicationStartEvent implements Runnable {
private final ClusterControllerService ccs;
private final String appName;
+ private final FutureValue fv;
- public ApplicationStartEvent(ClusterControllerService ccs, String appName) {
+ public ApplicationStartEvent(ClusterControllerService ccs, String appName, FutureValue fv) {
this.ccs = ccs;
this.appName = appName;
+ this.fv = fv;
}
@Override
- protected void doRun() throws Exception {
+ public void run() {
ApplicationContext appCtx = ccs.getApplicationMap().get(appName);
if (appCtx == null) {
- throw new HyracksException("No application with name: " + appName);
+ fv.setException(new HyracksException("No application with name: " + appName));
+ return;
}
- appCtx.initializeClassPath();
- appCtx.initialize();
- final byte[] distributedState = JavaSerializationUtils.serialize(appCtx.getDestributedState());
- final boolean deployHar = appCtx.containsHar();
- List<RemoteOp<Void>> opList = new ArrayList<RemoteOp<Void>>();
- for (final String nodeId : ccs.getNodeMap().keySet()) {
- opList.add(new ApplicationStarter(nodeId, appName, deployHar, distributedState));
+ try {
+ appCtx.initializeClassPath();
+ appCtx.initialize();
+ final byte[] distributedState = JavaSerializationUtils.serialize(appCtx.getDestributedState());
+ final boolean deployHar = appCtx.containsHar();
+ List<RemoteOp<Void>> opList = new ArrayList<RemoteOp<Void>>();
+ for (final String nodeId : ccs.getNodeMap().keySet()) {
+ opList.add(new ApplicationStarter(nodeId, appName, deployHar, distributedState));
+ }
+ final RemoteOp[] ops = opList.toArray(new RemoteOp[opList.size()]);
+ ccs.getExecutor().execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ RemoteRunner.runRemote(ccs, ops, null);
+ fv.setValue(null);
+ } catch (Exception e) {
+ fv.setException(e);
+ }
+ }
+ });
+ } catch (Exception e) {
+ fv.setException(e);
}
- RemoteOp[] ops = opList.toArray(new RemoteOp[opList.size()]);
- RemoteRunner.runRemote(ccs, ops, null);
}
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobAttemptStartEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobAttemptStartEvent.java
index 9cc01bb..7dc6496 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobAttemptStartEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobAttemptStartEvent.java
@@ -33,8 +33,11 @@
@Override
public void run() {
JobRun run = ccs.getRunMap().get(jobId);
- int maxRetries = run.getJobPlan().getJobSpecification().getMaxRetries();
- if (run.getAttempts().size() > maxRetries) {
+ int maxAttempts = run.getJobPlan().getJobSpecification().getMaxAttempts();
+ if (maxAttempts == 0) {
+ maxAttempts = ccs.getConfig().defaultMaxJobAttempts;
+ }
+ if (run.getAttempts().size() > maxAttempts) {
run.setStatus(JobStatus.FAILURE);
return;
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCleanupEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCleanupEvent.java
index 718b913..c4ed077 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCleanupEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCleanupEvent.java
@@ -14,7 +14,6 @@
*/
package edu.uci.ics.hyracks.control.cc.job.manager.events;
-import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
@@ -33,43 +32,55 @@
private ClusterControllerService ccs;
private UUID jobId;
private int attempt;
+ private JobStatus status;
- public JobCleanupEvent(ClusterControllerService ccs, UUID jobId, int attempt) {
+ public JobCleanupEvent(ClusterControllerService ccs, UUID jobId, int attempt, JobStatus status) {
this.ccs = ccs;
this.jobId = jobId;
this.attempt = attempt;
+ this.status = status;
}
@Override
public void run() {
- JobRun run = ccs.getRunMap().get(jobId);
- JobAttempt ja = run.getAttempts().get(attempt);
+ final JobRun run = ccs.getRunMap().get(jobId);
+ final JobAttempt ja = run.getAttempts().get(attempt);
Set<String> targetNodes = ja.getParticipatingNodeIds();
- if (!targetNodes.isEmpty()) {
- JobCompleteNotifier[] jcns = new JobCompleteNotifier[targetNodes.size()];
- int i = 0;
- for (String n : targetNodes) {
- jcns[i++] = new JobCompleteNotifier(n, jobId);
- }
- try {
- RemoteRunner.runRemote(ccs, jcns, null);
- } catch (Exception e) {
- e.printStackTrace();
- }
- CCApplicationContext appCtx = ccs.getApplicationMap().get(ja.getPlan().getApplicationName());
- if (appCtx != null) {
- try {
- appCtx.notifyJobFinish(jobId);
- } catch (HyracksException e) {
- e.printStackTrace();
- }
- }
- Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
- for (String nodeId : ja.getParticipatingNodeIds()) {
- NodeControllerState state = nodeMap.get(nodeId);
- state.getActiveJobIds().remove(jobId);
- }
+ final JobCompleteNotifier[] jcns = new JobCompleteNotifier[targetNodes.size()];
+ int i = 0;
+ for (String n : targetNodes) {
+ jcns[i++] = new JobCompleteNotifier(n, jobId);
}
- run.setStatus(JobStatus.TERMINATED);
+ ccs.getExecutor().execute(new Runnable() {
+ @Override
+ public void run() {
+ if (jcns.length > 0) {
+ try {
+ RemoteRunner.runRemote(ccs, jcns, null);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ ccs.getJobQueue().schedule(new Runnable() {
+ @Override
+ public void run() {
+ CCApplicationContext appCtx = ccs.getApplicationMap().get(ja.getPlan().getApplicationName());
+ if (appCtx != null) {
+ try {
+ appCtx.notifyJobFinish(jobId);
+ } catch (HyracksException e) {
+ e.printStackTrace();
+ }
+ }
+ Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
+ for (String nodeId : ja.getParticipatingNodeIds()) {
+ NodeControllerState state = nodeMap.get(nodeId);
+ state.getActiveJobIds().remove(jobId);
+ }
+ run.setStatus(status);
+ }
+ });
+ }
+ });
}
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java
index 43e8e0e..11850ee 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java
@@ -18,11 +18,16 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.logging.Logger;
+import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.NodeControllerState;
+import edu.uci.ics.hyracks.control.cc.job.JobRun;
public class RemoveDeadNodesEvent implements Runnable {
+ private static Logger LOGGER = Logger.getLogger(RemoveDeadNodesEvent.class.getName());
+
private final ClusterControllerService ccs;
public RemoveDeadNodesEvent(ClusterControllerService ccs) {
@@ -37,11 +42,13 @@
NodeControllerState state = e.getValue();
if (state.incrementLastHeartbeatDuration() >= ccs.getConfig().maxHeartbeatLapsePeriods) {
deadNodes.add(e.getKey());
+ LOGGER.info(e.getKey() + " considered dead");
}
}
for (String deadNode : deadNodes) {
NodeControllerState state = nodeMap.remove(deadNode);
for (final UUID jid : state.getActiveJobIds()) {
+ LOGGER.info("Aborting: " + jid);
ccs.getJobQueue().schedule(new JobAbortEvent(ccs, jid));
}
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/ScheduleRunnableStagesEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/ScheduleRunnableStagesEvent.java
index 1a4cbc5..c4839da 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/ScheduleRunnableStagesEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/ScheduleRunnableStagesEvent.java
@@ -27,6 +27,7 @@
import edu.uci.ics.hyracks.api.dataflow.PortInstanceId;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobPlan;
+import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.job.JobAttempt;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
@@ -64,7 +65,7 @@
+ scheduledStages);
if (pendingStages.size() == 1 && scheduledStages.isEmpty()) {
LOGGER.info(jobId + ":" + attempt + ":No more runnable stages");
- ccs.getJobQueue().schedule(new JobCleanupEvent(ccs, jobId, attempt));
+ ccs.getJobQueue().schedule(new JobCleanupEvent(ccs, jobId, attempt, JobStatus.TERMINATED));
return;
}
@@ -90,13 +91,14 @@
} catch (HyracksException e) {
e.printStackTrace();
ccs.getJobQueue().schedule(new JobAbortEvent(ccs, jobId));
+ return;
}
- JobPlan plan = run.getJobPlan();
- for (JobStageAttempt jsa : runnableStageAttempts) {
+ final JobPlan plan = run.getJobPlan();
+ for (final JobStageAttempt jsa : runnableStageAttempts) {
ISchedule schedule = jsa.getSchedule();
- Map<OperatorDescriptorId, Integer> partCountMap = new HashMap<OperatorDescriptorId, Integer>();
- Map<String, Map<ActivityNodeId, Set<Integer>>> targetMap = new HashMap<String, Map<ActivityNodeId, Set<Integer>>>();
+ final Map<OperatorDescriptorId, Integer> partCountMap = new HashMap<OperatorDescriptorId, Integer>();
+ final Map<String, Map<ActivityNodeId, Set<Integer>>> targetMap = new HashMap<String, Map<ActivityNodeId, Set<Integer>>>();
for (ActivityNodeId aid : jsa.getJobStage().getTasks()) {
String[] locations = schedule.getPartitions(aid);
partCountMap.put(aid.getOperatorDescriptorId(), locations.length);
@@ -116,40 +118,50 @@
}
}
- Phase1Installer p1is[] = new Phase1Installer[targetMap.size()];
- int i = 0;
for (String nid : targetMap.keySet()) {
- p1is[i] = new Phase1Installer(nid, plan.getJobId(), plan.getApplicationName(), plan, jsa.getJobStage()
- .getId(), jsa.getJobAttempt().getAttempt(), targetMap.get(nid), partCountMap);
- ++i;
+ ccs.getNodeMap().get(nid).getActiveJobIds().add(jobId);
}
- LOGGER.info("Stage start - Phase 1");
- try {
- Map<PortInstanceId, Endpoint> globalPortMap = RemoteRunner.runRemote(ccs, p1is,
- new PortMapMergingAccumulator());
- Phase2Installer[] p2is = new Phase2Installer[targetMap.size()];
- Phase3Installer[] p3is = new Phase3Installer[targetMap.size()];
- StageStarter[] ss = new StageStarter[targetMap.size()];
+ ccs.getExecutor().execute(new Runnable() {
+ @Override
+ public void run() {
+ Phase1Installer p1is[] = new Phase1Installer[targetMap.size()];
+ int i = 0;
+ for (String nid : targetMap.keySet()) {
+ p1is[i] = new Phase1Installer(nid, plan.getJobId(), plan.getApplicationName(), plan, jsa
+ .getJobStage().getId(), jsa.getJobAttempt().getAttempt(), targetMap.get(nid),
+ partCountMap);
+ ++i;
+ }
+ LOGGER.info("Stage start - Phase 1");
+ try {
+ Map<PortInstanceId, Endpoint> globalPortMap = RemoteRunner.runRemote(ccs, p1is,
+ new PortMapMergingAccumulator());
- i = 0;
- for (String nid : targetMap.keySet()) {
- p2is[i] = new Phase2Installer(nid, plan.getJobId(), plan.getApplicationName(), plan, jsa
- .getJobStage().getId(), targetMap.get(nid), partCountMap, globalPortMap);
- p3is[i] = new Phase3Installer(nid, plan.getJobId(), jsa.getJobStage().getId());
- ss[i] = new StageStarter(nid, plan.getJobId(), jsa.getJobStage().getId());
- ++i;
+ Phase2Installer[] p2is = new Phase2Installer[targetMap.size()];
+ Phase3Installer[] p3is = new Phase3Installer[targetMap.size()];
+ StageStarter[] ss = new StageStarter[targetMap.size()];
+
+ i = 0;
+ for (String nid : targetMap.keySet()) {
+ p2is[i] = new Phase2Installer(nid, plan.getJobId(), plan.getApplicationName(), plan, jsa
+ .getJobStage().getId(), targetMap.get(nid), partCountMap, globalPortMap);
+ p3is[i] = new Phase3Installer(nid, plan.getJobId(), jsa.getJobStage().getId());
+ ss[i] = new StageStarter(nid, plan.getJobId(), jsa.getJobStage().getId());
+ ++i;
+ }
+ LOGGER.info("Stage start - Phase 2");
+ RemoteRunner.runRemote(ccs, p2is, null);
+ LOGGER.info("Stage start - Phase 3");
+ RemoteRunner.runRemote(ccs, p3is, null);
+ LOGGER.info("Stage start");
+ RemoteRunner.runRemote(ccs, ss, null);
+ LOGGER.info("Stage started");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
}
- LOGGER.info("Stage start - Phase 2");
- RemoteRunner.runRemote(ccs, p2is, null);
- LOGGER.info("Stage start - Phase 3");
- RemoteRunner.runRemote(ccs, p3is, null);
- LOGGER.info("Stage start");
- RemoteRunner.runRemote(ccs, ss, null);
- LOGGER.info("Stage started");
- } catch (Exception e) {
- e.printStackTrace();
- }
+ });
}
}
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/jobqueue/FutureValue.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/jobqueue/FutureValue.java
new file mode 100644
index 0000000..25378c6
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/jobqueue/FutureValue.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.jobqueue;
+
+public class FutureValue {
+ private boolean done;
+
+ private Object value;
+
+ private Exception e;
+
+ public FutureValue() {
+ done = false;
+ value = null;
+ e = null;
+ }
+
+ public synchronized void setValue(Object value) {
+ done = true;
+ this.value = value;
+ e = null;
+ notifyAll();
+ }
+
+ public synchronized void setException(Exception e) {
+ done = true;
+ this.e = e;
+ value = null;
+ notifyAll();
+ }
+
+ public synchronized void reset() {
+ done = false;
+ value = null;
+ e = null;
+ notifyAll();
+ }
+
+ public synchronized Object get() throws Exception {
+ while (!done) {
+ wait();
+ }
+ if (e != null) {
+ throw e;
+ }
+ return value;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/RemoteRunner.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/RemoteRunner.java
index 168f1c2..9bd4375 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/RemoteRunner.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/RemoteRunner.java
@@ -28,8 +28,6 @@
final Semaphore installComplete = new Semaphore(remoteOps.length);
final List<Exception> errors = new Vector<Exception>();
for (final RemoteOp<T> remoteOp : remoteOps) {
- System.err.println(ccs.getNodeMap());
- System.err.println(remoteOp.getNodeId());
NodeControllerState nodeState = ccs.getNodeMap().get(remoteOp.getNodeId());
final INodeController node = nodeState.getNodeController();
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/naive/NaiveScheduler.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/naive/NaiveScheduler.java
index 89ebe02..f40e8cd 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/naive/NaiveScheduler.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/naive/NaiveScheduler.java
@@ -54,6 +54,9 @@
private ISchedule computeSchedule(JobStageAttempt jsa, Set<OperatorDescriptorId> operators) throws HyracksException {
Set<String> nodeSet = ccs.getNodeMap().keySet();
+ if (nodeSet.isEmpty()) {
+ throw new HyracksException("0 usable nodes found");
+ }
String[] liveNodes = ccs.getNodeMap().keySet().toArray(new String[nodeSet.size()]);
JobAttempt ja = jsa.getJobAttempt();
final JobAttemptState jas = (JobAttemptState) ja.getSchedulerState();
@@ -101,6 +104,9 @@
}
}
}
+ if (unassignedPartsIds.get(part)) {
+ throw new HyracksException("Unsatisfiable constraint for operator: " + oid);
+ }
}
}
}
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 2c262e5..2335c4e 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -561,7 +561,7 @@
@Override
public synchronized void abortJoblet(UUID jobId) throws Exception {
- Joblet ji = jobletMap.get(jobId);
+ Joblet ji = jobletMap.remove(jobId);
if (ji != null) {
for (Stagelet stagelet : ji.getStageletMap().values()) {
stagelet.abort();
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
index 858ff6c..2bb1d80 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -43,7 +43,6 @@
CCConfig ccConfig = new CCConfig();
ccConfig.port = 39001;
ccConfig.profileDumpPeriod = 1000;
- ccConfig.useJOL = true;
cc = new ClusterControllerService(ccConfig);
cc.start();
diff --git a/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java b/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
index 121e126..b2eccdf 100644
--- a/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
+++ b/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
@@ -1,8 +1,25 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package edu.uci.ics.hyracks.examples.tpch.client;
import java.io.File;
import java.util.UUID;
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
+
import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
@@ -30,51 +47,80 @@
import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.FrameFileWriterOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.group.HashGroupOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.join.InMemoryHashJoinOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;
public class Main {
- public static void main(String[] args) throws Exception {
- String appName = args[0];
- String host;
- int port = 1099;
- switch (args.length) {
- case 3:
- port = Integer.parseInt(args[2]);
- case 2:
- host = args[1];
- break;
- default:
- System.err.println("One or Two arguments expected: <cchost> [<ccport>]");
- return;
- }
- IHyracksClientConnection hcc = new HyracksRMIConnection(host, port);
+ private static class Options {
+ @Option(name = "-host", usage = "Hyracks Cluster Controller Host name", required = true)
+ public String host;
- JobSpecification job = createJob();
+ @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1099)", required = false)
+ public int port = 1099;
+
+ @Option(name = "-app", usage = "Hyracks Application name", required = true)
+ public String app;
+
+ @Option(name = "-infile-customer-splits", usage = "Comma separated list of file-splits for the CUSTOMER input. A file-split is <node-name>:<path>", required = true)
+ public String inFileCustomerSplits;
+
+ @Option(name = "-infile-order-splits", usage = "Comma separated list of file-splits for the ORDER input. A file-split is <node-name>:<path>", required = true)
+ public String inFileOrderSplits;
+
+ @Option(name = "-outfile-splits", usage = "Comma separated list of file-splits for the output", required = true)
+ public String outFileSplits;
+
+ @Option(name = "-num-join-partitions", usage = "Number of Join partitions to use (default: 1)", required = false)
+ public int numJoinPartitions = 1;
+ }
+
+ public static void main(String[] args) throws Exception {
+ Options options = new Options();
+ CmdLineParser parser = new CmdLineParser(options);
+ parser.parseArgument(args);
+
+ IHyracksClientConnection hcc = new HyracksRMIConnection(options.host, options.port);
+
+ JobSpecification job = createJob(parseFileSplits(options.inFileCustomerSplits),
+ parseFileSplits(options.inFileOrderSplits), parseFileSplits(options.outFileSplits),
+ options.numJoinPartitions);
long start = System.currentTimeMillis();
- UUID jobId = hcc.createJob(appName, job);
+ UUID jobId = hcc.createJob(options.app, job);
hcc.start(jobId);
hcc.waitForCompletion(jobId);
long end = System.currentTimeMillis();
System.err.println(start + " " + end + " " + (end - start));
}
- private static JobSpecification createJob() {
+ private static FileSplit[] parseFileSplits(String fileSplits) {
+ String[] splits = fileSplits.split(",");
+ FileSplit[] fSplits = new FileSplit[splits.length];
+ for (int i = 0; i < splits.length; ++i) {
+ String s = splits[i].trim();
+ int idx = s.indexOf(':');
+ if (idx < 0) {
+ throw new IllegalArgumentException("File split " + s + " not well formed");
+ }
+ fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(new File(s.substring(idx + 1))));
+ }
+ return fSplits;
+ }
+
+ private static JobSpecification createJob(FileSplit[] customerSplits, FileSplit[] orderSplits,
+ FileSplit[] resultSplits, int numJoinPartitions) {
JobSpecification spec = new JobSpecification();
- FileSplit[] custSplits = createCustomerFileSplits();
- IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
+ IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(customerSplits);
RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
- FileSplit[] ordersSplits = createOrdersFileSplits();
- IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+ IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(orderSplits);
RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
@@ -99,7 +145,7 @@
UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
- createRRPartitionConstraint(spec, ordScanner, 2);
+ createPartitionConstraint(spec, ordScanner, orderSplits);
FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
@@ -107,13 +153,13 @@
UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
- createRRPartitionConstraint(spec, custScanner, 2);
+ createPartitionConstraint(spec, custScanner, customerSplits);
InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 0 },
new int[] { 1 }, new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc,
6000000);
- PartitionConstraintHelper.addPartitionCountConstraint(spec, join, 4);
+ PartitionConstraintHelper.addPartitionCountConstraint(spec, join, numJoinPartitions);
RecordDescriptor groupResultDesc = new RecordDescriptor(new ISerializerDeserializer[] {
UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
@@ -126,10 +172,11 @@
new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
new MultiAggregatorFactory(new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
groupResultDesc, 16);
- PartitionConstraintHelper.addPartitionCountConstraint(spec, gby, 4);
+ createPartitionConstraint(spec, gby, resultSplits);
- PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
- PartitionConstraintHelper.addPartitionCountConstraint(spec, printer, 4);
+ IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(resultSplits);
+ FrameFileWriterOperatorDescriptor writer = new FrameFileWriterOperatorDescriptor(spec, outSplitProvider);
+ createPartitionConstraint(spec, writer, resultSplits);
IConnectorDescriptor ordJoinConn = new MToNHashPartitioningConnectorDescriptor(spec,
new FieldHashPartitionComputerFactory(new int[] { 1 },
@@ -147,44 +194,17 @@
spec.connect(joinGroupConn, join, 0, gby, 0);
IConnectorDescriptor gbyPrinterConn = new OneToOneConnectorDescriptor(spec);
- spec.connect(gbyPrinterConn, gby, 0, printer, 0);
+ spec.connect(gbyPrinterConn, gby, 0, writer, 0);
- spec.addRoot(printer);
+ spec.addRoot(writer);
return spec;
}
- private static FileSplit[] createOrdersFileSplits() {
- FileSplit fss[] = new FileSplit[2];
- for (int i = 0; i < fss.length; ++i) {
- fss[i] = new FileSplit("foo", new FileReference(new File("data/tpch0.001/orders-part" + (i + 1) + ".tbl")));
+ private static void createPartitionConstraint(JobSpecification spec, IOperatorDescriptor op, FileSplit[] splits) {
+ String[] parts = new String[splits.length];
+ for (int i = 0; i < splits.length; ++i) {
+ parts[i] = splits[i].getNodeName();
}
- return fss;
- }
-
- private static FileSplit[] createCustomerFileSplits() {
- FileSplit fss[] = new FileSplit[2];
- for (int i = 0; i < fss.length; ++i) {
- fss[i] = new FileSplit("foo",
- new FileReference(new File("data/tpch0.001/customer-part" + (i + 1) + ".tbl")));
- }
- return fss;
- }
-
- private static final String[] NODES = { "NC1", "NC2" };
-
- private static void createRRPartitionConstraint(JobSpecification spec, IOperatorDescriptor op, int nChoices) {
- String[][] choices = new String[2][];
- for (int i = 0; i < choices.length; ++i) {
- choices[i] = createRRSteppedChoiceConstraint(i, nChoices);
- }
- PartitionConstraintHelper.addLocationChoiceConstraint(spec, op, choices);
- }
-
- private static String[] createRRSteppedChoiceConstraint(int index, int nChoices) {
- String[] lcs = new String[nChoices];
- for (int i = 0; i < nChoices; ++i) {
- lcs[i] = NODES[(index + i) % NODES.length];
- }
- return lcs;
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, op, parts);
}
}
\ No newline at end of file