Implement the client side of the RPC chain to fetch the job status from the client.

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@2842 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
index f27a44e..ddf542e 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -33,6 +33,7 @@
         GET_JOB_STATUS,
         START_JOB,
         GET_DATASET_DIRECTORY_SERIVICE_INFO,
+        GET_DATASET_RESULT_STATUS,
         GET_DATASET_RECORD_DESCRIPTOR,
         GET_DATASET_RESULT_LOCATIONS,
         WAIT_FOR_COMPLETION,
@@ -170,6 +171,32 @@
         }
     }
 
+    public static class GetDatasetResultStatusFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+
+        private final ResultSetId rsId;
+
+        public GetDatasetResultStatusFunction(JobId jobId, ResultSetId rsId) {
+            this.jobId = jobId;
+            this.rsId = rsId;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.GET_DATASET_RESULT_STATUS;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+
+        public ResultSetId getResultSetId() {
+            return rsId;
+        }
+    }
+
     public static class GetDatasetRecordDescriptorFunction extends Function {
         private static final long serialVersionUID = 1L;
 
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
index 9217858..1325150 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
@@ -15,6 +15,7 @@
 package edu.uci.ics.hyracks.api.dataset;
 
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.JobId;
 
@@ -26,6 +27,8 @@
 
     public void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition);
 
+    public Status getResultStatus(JobId jobId, ResultSetId rsId) throws HyracksDataException;
+
     public byte[] getRecordDescriptor(JobId jobId, ResultSetId rsId) throws HyracksDataException;
 
     public DatasetDirectoryRecord[] getResultPartitionLocations(JobId jobId, ResultSetId rsId,
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDataset.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDataset.java
index 23b15f6..b437975 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDataset.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDataset.java
@@ -17,12 +17,15 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.JobId;
 
 public interface IHyracksDataset {
     public void open(JobId jobId, ResultSetId resultSetId) throws IOException;
 
+    public Status getResultStatus();
+
     public byte[] getSerializedRecordDescriptor();
 
     public int read(ByteBuffer buffer) throws HyracksDataException;
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceConnection.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceConnection.java
index beb10bd..8bde85e 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceConnection.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceConnection.java
@@ -14,10 +14,23 @@
  */
 package edu.uci.ics.hyracks.api.dataset;
 
+import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
 import edu.uci.ics.hyracks.api.job.JobId;
 
 public interface IHyracksDatasetDirectoryServiceConnection {
     /**
+     * Gets the result status for the given result set.
+     * 
+     * @param jobId
+     *            ID of the job
+     * @param rsId
+     *            ID of the result set
+     * @return {@link Status}
+     * @throws Exception
+     */
+    public Status getDatasetResultStatus(JobId jobId, ResultSetId rsId) throws Exception;
+
+    /**
      * Gets the record descriptor for the given result set.
      * 
      * @param jobId
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceInterface.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceInterface.java
index 12fde5c..110c87e 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceInterface.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceInterface.java
@@ -14,10 +14,23 @@
  */
 package edu.uci.ics.hyracks.api.dataset;
 
+import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
 import edu.uci.ics.hyracks.api.job.JobId;
 
 public interface IHyracksDatasetDirectoryServiceInterface {
     /**
+     * Gets the result status for the given result set.
+     * 
+     * @param jobId
+     *            ID of the job
+     * @param rsId
+     *            ID of the result set
+     * @return {@link Status}
+     * @throws Exception
+     */
+    public Status getDatasetResultStatus(JobId jobId, ResultSetId rsId) throws Exception;
+
+    /**
      * Gets the record descriptor for the given result set.
      * 
      * @param jobId
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDataset.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDataset.java
index c11ae59..5ab2ca5 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDataset.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDataset.java
@@ -27,6 +27,7 @@
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
+import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
 import edu.uci.ics.hyracks.api.dataset.IDatasetInputChannelMonitor;
 import edu.uci.ics.hyracks.api.dataset.IHyracksDataset;
 import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetDirectoryServiceConnection;
@@ -86,6 +87,17 @@
     }
 
     @Override
+    public Status getResultStatus() {
+        Status status = null;
+        try {
+            status = datasetDirectoryServiceConnection.getDatasetResultStatus(jobId, resultSetId);
+        } catch (Exception e) {
+            // TODO(madhusudancs): Decide what to do in case of error
+        }
+        return status;
+    }
+
+    @Override
     public byte[] getSerializedRecordDescriptor() {
         byte[] serializedRecordDescriptor = null;
         try {
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
index 1050983..ad76e98 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
@@ -17,6 +17,7 @@
 import java.net.InetSocketAddress;
 
 import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
+import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
 import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetDirectoryServiceConnection;
 import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetDirectoryServiceInterface;
 import edu.uci.ics.hyracks.api.dataset.ResultSetId;
@@ -40,6 +41,11 @@
     }
 
     @Override
+    public Status getDatasetResultStatus(JobId jobId, ResultSetId rsId) throws Exception {
+        return ddsi.getDatasetResultStatus(jobId, rsId);
+    }
+
+    @Override
     public byte[] getDatasetSerializedRecordDescriptor(JobId jobId, ResultSetId rsId) throws Exception {
         return ddsi.getDatasetSerializedRecordDescriptorFunction(jobId, rsId);
     }
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
index 9de26dd..dfcfdee 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
@@ -16,6 +16,7 @@
 
 import edu.uci.ics.hyracks.api.client.HyracksClientInterfaceFunctions;
 import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
+import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
 import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetDirectoryServiceInterface;
 import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.job.JobId;
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index f968612..93c9685 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -38,6 +38,7 @@
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.context.ICCContext;
 import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
+import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
 import edu.uci.ics.hyracks.api.dataset.IDatasetDirectoryService;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobStatus;
@@ -58,6 +59,7 @@
 import edu.uci.ics.hyracks.control.cc.work.GetNodeControllersInfoWork;
 import edu.uci.ics.hyracks.control.cc.work.GetRecordDescriptorWork;
 import edu.uci.ics.hyracks.control.cc.work.GetResultPartitionLocationsWork;
+import edu.uci.ics.hyracks.control.cc.work.GetResultStatusWork;
 import edu.uci.ics.hyracks.control.cc.work.JobStartWork;
 import edu.uci.ics.hyracks.control.cc.work.JobletCleanupNotificationWork;
 import edu.uci.ics.hyracks.control.cc.work.NodeHeartbeatWork;
@@ -348,6 +350,13 @@
                     return;
                 }
 
+                case GET_DATASET_RESULT_STATUS: {
+                    HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction gdrlf = (HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction) fn;
+                    workQueue.schedule(new GetResultStatusWork(ClusterControllerService.this, gdrlf.getJobId(), gdrlf
+                            .getResultSetId(), new IPCResponder<Status>(handle, mid)));
+                    return;
+                }
+
                 case GET_DATASET_RECORD_DESCRIPTOR: {
                     HyracksClientInterfaceFunctions.GetDatasetRecordDescriptorFunction gdrdf = (HyracksClientInterfaceFunctions.GetDatasetRecordDescriptorFunction) fn;
                     workQueue.schedule(new GetRecordDescriptorWork(ClusterControllerService.this, gdrdf.getJobId(),
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
index 2505031..35c5401 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -14,12 +14,14 @@
  */
 package edu.uci.ics.hyracks.control.cc.dataset;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
+import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
 import edu.uci.ics.hyracks.api.dataset.IDatasetDirectoryService;
 import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -76,6 +78,51 @@
     }
 
     @Override
+    public synchronized Status getResultStatus(JobId jobId, ResultSetId rsId) throws HyracksDataException {
+        Map<ResultSetId, ResultSetMetaData> rsMap;
+        while ((rsMap = jobResultLocationsMap.get(jobId)) == null) {
+            try {
+                wait();
+            } catch (InterruptedException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+
+        ResultSetMetaData resultSetMetaData = rsMap.get(rsId);
+        if (resultSetMetaData == null || resultSetMetaData.getRecords() == null) {
+            throw new HyracksDataException("ResultSet locations uninitialized when it is expected to be initialized.");
+        }
+        DatasetDirectoryRecord[] records = resultSetMetaData.getRecords();
+
+        ArrayList<Status> statuses = new ArrayList<Status>(records.length);
+        for (int i = 0; i < records.length; i++) {
+            statuses.add(records[i].getStatus());
+        }
+
+        // Default status is idle
+        Status status = Status.IDLE;
+        if (statuses.contains(Status.FAILED)) {
+            // Even if there is at least one failed entry we should return failed status.
+            return Status.FAILED;
+        } else if (statuses.contains(Status.RUNNING)) {
+            // If there are not failed entry and if there is at least one running entry we should return running status.
+            return Status.RUNNING;
+        } else {
+            // If each and every partition has reported success do we report success as the status.
+            int successCount = 0;
+            for (int i = 0; i < statuses.size(); i++) {
+                if (statuses.get(i) == Status.SUCCESS) {
+                    successCount++;
+                }
+            }
+            if (successCount == statuses.size()) {
+                return Status.SUCCESS;
+            }
+        }
+        return status;
+    }
+
+    @Override
     public synchronized byte[] getRecordDescriptor(JobId jobId, ResultSetId rsId) throws HyracksDataException {
         Map<ResultSetId, ResultSetMetaData> rsMap;
         while ((rsMap = jobResultLocationsMap.get(jobId)) == null) {
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultStatusWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultStatusWork.java
new file mode 100644
index 0000000..d2dadf5
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultStatusWork.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.work;
+
+import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.common.work.IResultCallback;
+import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+
+public class GetResultStatusWork extends SynchronizableWork {
+    private final ClusterControllerService ccs;
+
+    private final JobId jobId;
+
+    private final ResultSetId rsId;
+
+    private final IResultCallback<Status> callback;
+
+    public GetResultStatusWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId,
+            IResultCallback<Status> callback) {
+        this.ccs = ccs;
+        this.jobId = jobId;
+        this.rsId = rsId;
+        this.callback = callback;
+    }
+
+    @Override
+    public void doRun() {
+        try {
+            Status status = ccs.getDatasetDirectoryService().getResultStatus(jobId, rsId);
+            callback.setValue(status);
+        } catch (HyracksDataException e) {
+            callback.setException(e);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "JobId@" + jobId + " ResultSetId@" + rsId;
+    }
+}