Merged -r572:573 from trunk
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@836 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobletEventListener.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobletEventListener.java
new file mode 100644
index 0000000..bfb70ea
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobletEventListener.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job;
+
+public interface IJobletEventListener {
+ public void jobletStart();
+
+ public void jobletFinish();
+}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobletEventListenerFactory.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobletEventListenerFactory.java
new file mode 100644
index 0000000..3b3cacb
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobletEventListenerFactory.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.context.IHyracksJobletContext;
+
+public interface IJobletEventListenerFactory extends Serializable {
+ public IJobletEventListener createListener(IHyracksJobletContext ctx);
+}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
index ef44033..23b33f0 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
@@ -58,6 +58,8 @@
private int maxAttempts;
+ private IJobletEventListenerFactory jobletEventListenerFactory;
+
private transient int operatorIdCounter;
private transient int connectorIdCounter;
@@ -215,6 +217,14 @@
return userConstraints;
}
+ public IJobletEventListenerFactory getJobletEventListenerFactory() {
+ return jobletEventListenerFactory;
+ }
+
+ public void setJobletEventListenerFactory(IJobletEventListenerFactory jobletEventListenerFactory) {
+ this.jobletEventListenerFactory = jobletEventListenerFactory;
+ }
+
private <K, V> void insertIntoIndexedMap(Map<K, List<V>> map, K key, int index, V value) {
List<V> vList = map.get(key);
if (vList == null) {
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index 381c44b..7d39bfd 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -35,6 +35,7 @@
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.io.IIOManager;
import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
+import edu.uci.ics.hyracks.api.job.IJobletEventListener;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.profiling.counters.ICounter;
@@ -72,6 +73,8 @@
private final IWorkspaceFileFactory fileFactory;
+ private IJobletEventListener jobletEventListener;
+
public Joblet(NodeControllerService nodeController, JobId jobId, INCApplicationContext appCtx) {
this.nodeController = nodeController;
this.appCtx = appCtx;
@@ -235,4 +238,12 @@
collector.addPartitions(Collections.singleton(channel));
}
}
+
+ public IJobletEventListener getJobletEventListener() {
+ return jobletEventListener;
+ }
+
+ public void setJobletEventListener(IJobletEventListener jobletEventListener) {
+ this.jobletEventListener = jobletEventListener;
+ }
}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java
index 6549a74..e8c4052 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java
@@ -56,7 +56,6 @@
task.abort();
}
}
- ji.close();
}
}
}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobWork.java
index 50c05df..7ae27ba 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobWork.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobWork.java
@@ -18,6 +18,7 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import edu.uci.ics.hyracks.api.job.IJobletEventListener;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
import edu.uci.ics.hyracks.control.nc.Joblet;
@@ -43,6 +44,10 @@
Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
Joblet joblet = jobletMap.remove(jobId);
if (joblet != null) {
+ IJobletEventListener listener = joblet.getJobletEventListener();
+ if (listener != null) {
+ listener.jobletFinish();
+ }
ncs.getPartitionManager().unregisterPartitions(jobId);
joblet.close();
}
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
index 153eaff..034be8b 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
@@ -37,6 +37,8 @@
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IJobletEventListener;
+import edu.uci.ics.hyracks.api.job.IJobletEventListenerFactory;
import edu.uci.ics.hyracks.api.job.JobActivityGraph;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
@@ -98,7 +100,7 @@
}
};
- final Joblet joblet = getOrCreateLocalJoblet(jobId, appCtx);
+ final Joblet joblet = getOrCreateLocalJoblet(jobId, appCtx, jag);
for (TaskAttemptDescriptor td : taskDescriptors) {
TaskAttemptId taId = td.getTaskAttemptId();
@@ -157,11 +159,18 @@
}
}
- private Joblet getOrCreateLocalJoblet(JobId jobId, INCApplicationContext appCtx) throws Exception {
+ private Joblet getOrCreateLocalJoblet(JobId jobId, INCApplicationContext appCtx, JobActivityGraph jag)
+ throws Exception {
Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
Joblet ji = jobletMap.get(jobId);
if (ji == null) {
ji = new Joblet(ncs, jobId, appCtx);
+ IJobletEventListenerFactory jelf = jag.getJobSpecification().getJobletEventListenerFactory();
+ if (jelf != null) {
+ IJobletEventListener listener = jelf.createListener(ji);
+ ji.setJobletEventListener(listener);
+ listener.jobletStart();
+ }
jobletMap.put(jobId, ji);
}
return ji;