Added job event callback support

git-svn-id: https://hyracks.googlecode.com/svn/trunk@573 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobletEventListener.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobletEventListener.java
new file mode 100644
index 0000000..bfb70ea
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobletEventListener.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job;
+
+public interface IJobletEventListener {
+    public void jobletStart();
+
+    public void jobletFinish();
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobletEventListenerFactory.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobletEventListenerFactory.java
new file mode 100644
index 0000000..3b3cacb
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobletEventListenerFactory.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.context.IHyracksJobletContext;
+
+public interface IJobletEventListenerFactory extends Serializable {
+    public IJobletEventListener createListener(IHyracksJobletContext ctx);
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
index 59072f8..0c9ea26 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
@@ -55,6 +55,8 @@
 
     private int maxAttempts;
 
+    private IJobletEventListenerFactory jobletEventListenerFactory;
+
     public JobSpecification() {
         roots = new ArrayList<OperatorDescriptorId>();
         opMap = new HashMap<OperatorDescriptorId, IOperatorDescriptor>();
@@ -189,6 +191,14 @@
         return userConstraints;
     }
 
+    public IJobletEventListenerFactory getJobletEventListenerFactory() {
+        return jobletEventListenerFactory;
+    }
+
+    public void setJobletEventListenerFactory(IJobletEventListenerFactory jobletEventListenerFactory) {
+        this.jobletEventListenerFactory = jobletEventListenerFactory;
+    }
+
     private <K, V> void insertIntoIndexedMap(Map<K, List<V>> map, K key, int index, V value) {
         List<V> vList = map.get(key);
         if (vList == null) {
diff --git a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index 413243e..bb112ed 100644
--- a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -29,6 +29,7 @@
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.io.IIOManager;
 import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
+import edu.uci.ics.hyracks.api.job.IJobletEventListener;
 import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
 import edu.uci.ics.hyracks.api.job.profiling.counters.ICounter;
 import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
@@ -61,6 +62,8 @@
 
     private final IWorkspaceFileFactory fileFactory;
 
+    private IJobletEventListener jobletEventListener;
+
     public Joblet(NodeControllerService nodeController, UUID jobId, int attempt, INCApplicationContext appCtx) {
         this.nodeController = nodeController;
         this.appCtx = appCtx;
@@ -210,4 +213,12 @@
             return counter;
         }
     }
+
+    public IJobletEventListener getJobletEventListener() {
+        return jobletEventListener;
+    }
+
+    public void setJobletEventListener(IJobletEventListener jobletEventListener) {
+        this.jobletEventListener = jobletEventListener;
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index be48ae6..b2f2364 100644
--- a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -73,6 +73,8 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.io.IODeviceHandle;
+import edu.uci.ics.hyracks.api.job.IJobletEventListener;
+import edu.uci.ics.hyracks.api.job.IJobletEventListenerFactory;
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobPlan;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
@@ -231,7 +233,7 @@
                 }
             };
 
-            final Joblet joblet = getOrCreateLocalJoblet(jobId, attempt, appCtx);
+            final Joblet joblet = getOrCreateLocalJoblet(jobId, attempt, appCtx, plan);
 
             Stagelet stagelet = new Stagelet(joblet, stageId, attempt, id);
             joblet.setStagelet(stageId, stagelet);
@@ -441,11 +443,18 @@
         return ji;
     }
 
-    private Joblet getOrCreateLocalJoblet(UUID jobId, int attempt, INCApplicationContext appCtx) throws Exception {
+    private Joblet getOrCreateLocalJoblet(UUID jobId, int attempt, INCApplicationContext appCtx, JobPlan plan)
+            throws Exception {
         synchronized (jobletMap) {
             Joblet ji = jobletMap.get(jobId);
             if (ji == null || ji.getAttempt() != attempt) {
                 ji = new Joblet(this, jobId, attempt, appCtx);
+                IJobletEventListenerFactory jelf = plan.getJobSpecification().getJobletEventListenerFactory();
+                if (jelf != null) {
+                    IJobletEventListener listener = jelf.createListener(ji);
+                    ji.setJobletEventListener(listener);
+                    listener.jobletStart();
+                }
                 jobletMap.put(jobId, ji);
             }
             return ji;
@@ -463,6 +472,10 @@
         }
         Joblet joblet = jobletMap.remove(jobId);
         if (joblet != null) {
+            IJobletEventListener listener = joblet.getJobletEventListener();
+            if (listener != null) {
+                listener.jobletFinish();
+            }
             joblet.close();
         }
         connectionManager.dumpStats();