Merged -r572:573 from trunk

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@836 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobletEventListener.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobletEventListener.java
new file mode 100644
index 0000000..bfb70ea
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobletEventListener.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job;
+
+public interface IJobletEventListener {
+    public void jobletStart();
+
+    public void jobletFinish();
+}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobletEventListenerFactory.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobletEventListenerFactory.java
new file mode 100644
index 0000000..3b3cacb
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobletEventListenerFactory.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.context.IHyracksJobletContext;
+
+public interface IJobletEventListenerFactory extends Serializable {
+    public IJobletEventListener createListener(IHyracksJobletContext ctx);
+}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
index ef44033..23b33f0 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
@@ -58,6 +58,8 @@
 
     private int maxAttempts;
 
+    private IJobletEventListenerFactory jobletEventListenerFactory;
+
     private transient int operatorIdCounter;
 
     private transient int connectorIdCounter;
@@ -215,6 +217,14 @@
         return userConstraints;
     }
 
+    public IJobletEventListenerFactory getJobletEventListenerFactory() {
+        return jobletEventListenerFactory;
+    }
+
+    public void setJobletEventListenerFactory(IJobletEventListenerFactory jobletEventListenerFactory) {
+        this.jobletEventListenerFactory = jobletEventListenerFactory;
+    }
+
     private <K, V> void insertIntoIndexedMap(Map<K, List<V>> map, K key, int index, V value) {
         List<V> vList = map.get(key);
         if (vList == null) {
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index 381c44b..7d39bfd 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -35,6 +35,7 @@
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.io.IIOManager;
 import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
+import edu.uci.ics.hyracks.api.job.IJobletEventListener;
 import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.profiling.counters.ICounter;
@@ -72,6 +73,8 @@
 
     private final IWorkspaceFileFactory fileFactory;
 
+    private IJobletEventListener jobletEventListener;
+
     public Joblet(NodeControllerService nodeController, JobId jobId, INCApplicationContext appCtx) {
         this.nodeController = nodeController;
         this.appCtx = appCtx;
@@ -235,4 +238,12 @@
             collector.addPartitions(Collections.singleton(channel));
         }
     }
+
+    public IJobletEventListener getJobletEventListener() {
+        return jobletEventListener;
+    }
+
+    public void setJobletEventListener(IJobletEventListener jobletEventListener) {
+        this.jobletEventListener = jobletEventListener;
+    }
 }
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java
index 6549a74..e8c4052 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java
@@ -56,7 +56,6 @@
                     task.abort();
                 }
             }
-            ji.close();
         }
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobWork.java
index 50c05df..7ae27ba 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobWork.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobWork.java
@@ -18,6 +18,7 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import edu.uci.ics.hyracks.api.job.IJobletEventListener;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
 import edu.uci.ics.hyracks.control.nc.Joblet;
@@ -43,6 +44,10 @@
         Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
         Joblet joblet = jobletMap.remove(jobId);
         if (joblet != null) {
+            IJobletEventListener listener = joblet.getJobletEventListener();
+            if (listener != null) {
+                listener.jobletFinish();
+            }
             ncs.getPartitionManager().unregisterPartitions(jobId);
             joblet.close();
         }
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
index 153eaff..034be8b 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
@@ -37,6 +37,8 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IJobletEventListener;
+import edu.uci.ics.hyracks.api.job.IJobletEventListenerFactory;
 import edu.uci.ics.hyracks.api.job.JobActivityGraph;
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
@@ -98,7 +100,7 @@
                 }
             };
 
-            final Joblet joblet = getOrCreateLocalJoblet(jobId, appCtx);
+            final Joblet joblet = getOrCreateLocalJoblet(jobId, appCtx, jag);
 
             for (TaskAttemptDescriptor td : taskDescriptors) {
                 TaskAttemptId taId = td.getTaskAttemptId();
@@ -157,11 +159,18 @@
         }
     }
 
-    private Joblet getOrCreateLocalJoblet(JobId jobId, INCApplicationContext appCtx) throws Exception {
+    private Joblet getOrCreateLocalJoblet(JobId jobId, INCApplicationContext appCtx, JobActivityGraph jag)
+            throws Exception {
         Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
         Joblet ji = jobletMap.get(jobId);
         if (ji == null) {
             ji = new Joblet(ncs, jobId, appCtx);
+            IJobletEventListenerFactory jelf = jag.getJobSpecification().getJobletEventListenerFactory();
+            if (jelf != null) {
+                IJobletEventListener listener = jelf.createListener(ji);
+                ji.setJobletEventListener(listener);
+                listener.jobletStart();
+            }
             jobletMap.put(jobId, ji);
         }
         return ji;