Enforce ordering in exposing the directory records for partitions to the clients if ordering constraint is imposed by the ResultWriterOperatoryDescriptor.

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@2550 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
index e0f1600..ef0bb1d 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -68,26 +68,91 @@
     @Override
     public synchronized DatasetDirectoryRecord[] getResultPartitionLocations(JobId jobId, ResultSetId rsId,
             DatasetDirectoryRecord[] knownRecords) throws HyracksDataException {
-        while (!newRecords(jobId, rsId, knownRecords)) {
+        DatasetDirectoryRecord[] newRecords;
+        while ((newRecords = updatedRecords(jobId, rsId, knownRecords)) == null) {
             try {
                 wait();
             } catch (InterruptedException e) {
                 throw new HyracksDataException(e);
             }
         }
-        return jobResultLocationsMap.get(jobId).get(rsId).getRight();
+        return newRecords;
     }
 
-    private boolean newRecords(JobId jobId, ResultSetId rsId, DatasetDirectoryRecord[] knownRecords)
+    /**
+     * Compares the records already known by the client for the given job's result set id with the records that the
+     * dataset directory service knows and if there are any newly discovered records returns a whole array with the
+     * new records filled in.
+     * This method has a very convoluted logic. Here is the explanation of how it works.
+     * If the ordering constraint has to be enforced, the method obtains the first null record in the known records in
+     * the order of the partitions. It always traverses the array in the first to last order!
+     * If known records array or the first element in that array is null in the but the record for that partition now
+     * known to the directory service, the method fills in that record in the array and returns the array back.
+     * However, if the first known null record is not a first element in the array, by induction, all the previous
+     * known records should be known already be known to client and none of the records for the partitions ahead is
+     * known by the client yet. So, we check if the client has reached the end of stream for the partition corresponding
+     * to the record before the first known null record, i.e. the last known non-null record. If not, we just return
+     * null because we cannot expose any new locations until the client reaches end of stream for the last known record.
+     * If the client has reached the end of stream record for the last known non-null record, we check if the next record
+     * is discovered by the dataset directory service and if so, we fill it in the records array and return it back or
+     * send null otherwise.
+     * If the ordering is not required, we are free to return any newly discovered records back, so we just check if
+     * arrays are equal and if they are not we send the entire new updated array.
+     * 
+     * @param jobId
+     *            - Id of the job for which the directory records should be retrieved.
+     * @param rsId
+     *            - Id of the result set for which the directory records should be retrieved.
+     * @param knownRecords
+     *            - An array of directory records that the client is already aware of.
+     * @return
+     *         - Returns null if there aren't any newly discovered partitions enforcing the ordering constraint
+     * @throws HyracksDataException
+     *             TODO(madhusudancs): Think about caching (and still be stateless) instead of this ugly O(n) iterations for
+     *             every check. This already looks very expensive.
+     */
+    private DatasetDirectoryRecord[] updatedRecords(JobId jobId, ResultSetId rsId, DatasetDirectoryRecord[] knownRecords)
             throws HyracksDataException {
         Map<ResultSetId, Pair<Boolean, DatasetDirectoryRecord[]>> rsMap = jobResultLocationsMap.get(jobId);
         if (rsMap == null) {
-            return false;
+            return null;
         }
+
         Pair<Boolean, DatasetDirectoryRecord[]> resultSetPair = rsMap.get(rsId);
         if (resultSetPair == null || resultSetPair.getRight() == null) {
             throw new HyracksDataException("ResultSet locations uninitialized when it is expected to be initialized.");
         }
-        return !Arrays.equals(resultSetPair.getRight(), knownRecords);
+
+        boolean ordered = resultSetPair.getLeft();
+        DatasetDirectoryRecord[] records = resultSetPair.getRight();
+        /* If ordering is required, we should expose the dataset directory records only in the order, otherwise
+         * we can simply check if there are any newly discovered records and send the whole array back if there are.
+         */
+        if (ordered) {
+            // Iterate over the known records and find the last record which is not null.
+            int i = 0;
+            for (i = 0; i < records.length; i++) {
+                if (knownRecords == null) {
+                    if (records[0] != null) {
+                        knownRecords = new DatasetDirectoryRecord[records.length];
+                        knownRecords[0] = records[0];
+                        return knownRecords;
+                    }
+                    return null;
+                }
+                if (knownRecords[i] == null) {
+                    if ((i == 0 || knownRecords[i - 1].getEOS()) && records[i] != null) {
+                        knownRecords[i] = records[i];
+                        return knownRecords;
+                    }
+                    return null;
+                }
+            }
+        } else {
+            if (!Arrays.equals(records, knownRecords)) {
+                return records;
+            }
+        }
+        return null;
     }
 }
\ No newline at end of file