Rewrite the dataset directory service.

The changes are:
1. Use a class for metadata instead of keeping it in pairs.
2. Store the serialized record descriptor received per job in the dataset
   directory records.
3. Implement a method to return the serialized record descriptor when
   requested by the client.

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@2830 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
index ef0bb1d..9cbbb6b 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -18,8 +18,6 @@
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.commons.lang3.tuple.Pair;
-
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
 import edu.uci.ics.hyracks.api.dataset.IDatasetDirectoryService;
@@ -35,29 +33,29 @@
  * job.
  */
 public class DatasetDirectoryService implements IDatasetDirectoryService {
-    private final Map<JobId, Map<ResultSetId, Pair<Boolean, DatasetDirectoryRecord[]>>> jobResultLocationsMap;
+    private final Map<JobId, Map<ResultSetId, ResultSetMetaData>> jobResultLocationsMap;
 
     public DatasetDirectoryService() {
-        jobResultLocationsMap = new HashMap<JobId, Map<ResultSetId, Pair<Boolean, DatasetDirectoryRecord[]>>>();
+        jobResultLocationsMap = new HashMap<JobId, Map<ResultSetId, ResultSetMetaData>>();
     }
 
     @Override
-    public synchronized void registerResultPartitionLocation(JobId jobId, boolean orderedResult, ResultSetId rsId,
-            int partition, int nPartitions, NetworkAddress networkAddress) {
-        Map<ResultSetId, Pair<Boolean, DatasetDirectoryRecord[]>> rsMap = jobResultLocationsMap.get(jobId);
+    public synchronized void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult,
+            byte[] serializedRecordDescriptor, int partition, int nPartitions, NetworkAddress networkAddress) {
+        Map<ResultSetId, ResultSetMetaData> rsMap = jobResultLocationsMap.get(jobId);
         if (rsMap == null) {
-            rsMap = new HashMap<ResultSetId, Pair<Boolean, DatasetDirectoryRecord[]>>();
+            rsMap = new HashMap<ResultSetId, ResultSetMetaData>();
             jobResultLocationsMap.put(jobId, rsMap);
         }
 
-        Pair<Boolean, DatasetDirectoryRecord[]> resultSetPair = rsMap.get(rsId);
-        if (resultSetPair == null) {
-            resultSetPair = Pair.<Boolean, DatasetDirectoryRecord[]> of(orderedResult,
-                    new DatasetDirectoryRecord[nPartitions]);
-            rsMap.put(rsId, resultSetPair);
+        ResultSetMetaData resultSetMetaData = rsMap.get(rsId);
+        if (resultSetMetaData == null) {
+            resultSetMetaData = new ResultSetMetaData(orderedResult, new DatasetDirectoryRecord[nPartitions],
+                    serializedRecordDescriptor);
+            rsMap.put(rsId, resultSetMetaData);
         }
 
-        DatasetDirectoryRecord[] records = resultSetPair.getRight();
+        DatasetDirectoryRecord[] records = resultSetMetaData.getRecords();
         if (records[partition] == null) {
             records[partition] = new DatasetDirectoryRecord();
         }
@@ -66,6 +64,25 @@
     }
 
     @Override
+    public synchronized byte[] getRecordDescriptor(JobId jobId, ResultSetId rsId) throws HyracksDataException {
+        Map<ResultSetId, ResultSetMetaData> rsMap;
+        while ((rsMap = jobResultLocationsMap.get(jobId)) == null) {
+            try {
+                wait();
+            } catch (InterruptedException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+
+        ResultSetMetaData resultSetMetaData = rsMap.get(rsId);
+        if (resultSetMetaData == null || resultSetMetaData.getRecords() == null) {
+            throw new HyracksDataException("ResultSet locations uninitialized when it is expected to be initialized.");
+        }
+
+        return resultSetMetaData.getSerializedRecordDescriptor();
+    }
+
+    @Override
     public synchronized DatasetDirectoryRecord[] getResultPartitionLocations(JobId jobId, ResultSetId rsId,
             DatasetDirectoryRecord[] knownRecords) throws HyracksDataException {
         DatasetDirectoryRecord[] newRecords;
@@ -113,18 +130,18 @@
      */
     private DatasetDirectoryRecord[] updatedRecords(JobId jobId, ResultSetId rsId, DatasetDirectoryRecord[] knownRecords)
             throws HyracksDataException {
-        Map<ResultSetId, Pair<Boolean, DatasetDirectoryRecord[]>> rsMap = jobResultLocationsMap.get(jobId);
+        Map<ResultSetId, ResultSetMetaData> rsMap = jobResultLocationsMap.get(jobId);
         if (rsMap == null) {
             return null;
         }
 
-        Pair<Boolean, DatasetDirectoryRecord[]> resultSetPair = rsMap.get(rsId);
-        if (resultSetPair == null || resultSetPair.getRight() == null) {
+        ResultSetMetaData resultSetMetaData = rsMap.get(rsId);
+        if (resultSetMetaData == null || resultSetMetaData.getRecords() == null) {
             throw new HyracksDataException("ResultSet locations uninitialized when it is expected to be initialized.");
         }
 
-        boolean ordered = resultSetPair.getLeft();
-        DatasetDirectoryRecord[] records = resultSetPair.getRight();
+        boolean ordered = resultSetMetaData.getOrderedResult();
+        DatasetDirectoryRecord[] records = resultSetMetaData.getRecords();
         /* If ordering is required, we should expose the dataset directory records only in the order, otherwise
          * we can simply check if there are any newly discovered records and send the whole array back if there are.
          */
@@ -155,4 +172,30 @@
         }
         return null;
     }
+
+    private class ResultSetMetaData {
+        private final boolean ordered;
+
+        private final DatasetDirectoryRecord[] records;
+
+        private final byte[] serializedRecordDescriptor;
+
+        public ResultSetMetaData(boolean ordered, DatasetDirectoryRecord[] records, byte[] serializedRecordDescriptor) {
+            this.ordered = ordered;
+            this.records = records;
+            this.serializedRecordDescriptor = serializedRecordDescriptor;
+        }
+
+        public boolean getOrderedResult() {
+            return ordered;
+        }
+
+        public DatasetDirectoryRecord[] getRecords() {
+            return records;
+        }
+
+        public byte[] getSerializedRecordDescriptor() {
+            return serializedRecordDescriptor;
+        }
+    }
 }
\ No newline at end of file