Added job event callback support
git-svn-id: https://hyracks.googlecode.com/svn/trunk@573 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobletEventListener.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobletEventListener.java
new file mode 100644
index 0000000..bfb70ea
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobletEventListener.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job;
+
+public interface IJobletEventListener {
+ public void jobletStart();
+
+ public void jobletFinish();
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobletEventListenerFactory.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobletEventListenerFactory.java
new file mode 100644
index 0000000..3b3cacb
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobletEventListenerFactory.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.context.IHyracksJobletContext;
+
+public interface IJobletEventListenerFactory extends Serializable {
+ public IJobletEventListener createListener(IHyracksJobletContext ctx);
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
index 59072f8..0c9ea26 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
@@ -55,6 +55,8 @@
private int maxAttempts;
+ private IJobletEventListenerFactory jobletEventListenerFactory;
+
public JobSpecification() {
roots = new ArrayList<OperatorDescriptorId>();
opMap = new HashMap<OperatorDescriptorId, IOperatorDescriptor>();
@@ -189,6 +191,14 @@
return userConstraints;
}
+ public IJobletEventListenerFactory getJobletEventListenerFactory() {
+ return jobletEventListenerFactory;
+ }
+
+ public void setJobletEventListenerFactory(IJobletEventListenerFactory jobletEventListenerFactory) {
+ this.jobletEventListenerFactory = jobletEventListenerFactory;
+ }
+
private <K, V> void insertIntoIndexedMap(Map<K, List<V>> map, K key, int index, V value) {
List<V> vList = map.get(key);
if (vList == null) {
diff --git a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index 413243e..bb112ed 100644
--- a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -29,6 +29,7 @@
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.io.IIOManager;
import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
+import edu.uci.ics.hyracks.api.job.IJobletEventListener;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
import edu.uci.ics.hyracks.api.job.profiling.counters.ICounter;
import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
@@ -61,6 +62,8 @@
private final IWorkspaceFileFactory fileFactory;
+ private IJobletEventListener jobletEventListener;
+
public Joblet(NodeControllerService nodeController, UUID jobId, int attempt, INCApplicationContext appCtx) {
this.nodeController = nodeController;
this.appCtx = appCtx;
@@ -210,4 +213,12 @@
return counter;
}
}
+
+ public IJobletEventListener getJobletEventListener() {
+ return jobletEventListener;
+ }
+
+ public void setJobletEventListener(IJobletEventListener jobletEventListener) {
+ this.jobletEventListener = jobletEventListener;
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index be48ae6..b2f2364 100644
--- a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -73,6 +73,8 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.io.IODeviceHandle;
+import edu.uci.ics.hyracks.api.job.IJobletEventListener;
+import edu.uci.ics.hyracks.api.job.IJobletEventListenerFactory;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobPlan;
import edu.uci.ics.hyracks.api.job.JobSpecification;
@@ -231,7 +233,7 @@
}
};
- final Joblet joblet = getOrCreateLocalJoblet(jobId, attempt, appCtx);
+ final Joblet joblet = getOrCreateLocalJoblet(jobId, attempt, appCtx, plan);
Stagelet stagelet = new Stagelet(joblet, stageId, attempt, id);
joblet.setStagelet(stageId, stagelet);
@@ -441,11 +443,18 @@
return ji;
}
- private Joblet getOrCreateLocalJoblet(UUID jobId, int attempt, INCApplicationContext appCtx) throws Exception {
+ private Joblet getOrCreateLocalJoblet(UUID jobId, int attempt, INCApplicationContext appCtx, JobPlan plan)
+ throws Exception {
synchronized (jobletMap) {
Joblet ji = jobletMap.get(jobId);
if (ji == null || ji.getAttempt() != attempt) {
ji = new Joblet(this, jobId, attempt, appCtx);
+ IJobletEventListenerFactory jelf = plan.getJobSpecification().getJobletEventListenerFactory();
+ if (jelf != null) {
+ IJobletEventListener listener = jelf.createListener(ji);
+ ji.setJobletEventListener(listener);
+ listener.jobletStart();
+ }
jobletMap.put(jobId, ji);
}
return ji;
@@ -463,6 +472,10 @@
}
Joblet joblet = jobletMap.remove(jobId);
if (joblet != null) {
+ IJobletEventListener listener = joblet.getJobletEventListener();
+ if (listener != null) {
+ listener.jobletFinish();
+ }
joblet.close();
}
connectionManager.dumpStats();