Enforce ordering in exposing the directory records for partitions to the clients if ordering constraint is imposed by the ResultWriterOperatoryDescriptor.
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@2550 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
index e0f1600..ef0bb1d 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -68,26 +68,91 @@
@Override
public synchronized DatasetDirectoryRecord[] getResultPartitionLocations(JobId jobId, ResultSetId rsId,
DatasetDirectoryRecord[] knownRecords) throws HyracksDataException {
- while (!newRecords(jobId, rsId, knownRecords)) {
+ DatasetDirectoryRecord[] newRecords;
+ while ((newRecords = updatedRecords(jobId, rsId, knownRecords)) == null) {
try {
wait();
} catch (InterruptedException e) {
throw new HyracksDataException(e);
}
}
- return jobResultLocationsMap.get(jobId).get(rsId).getRight();
+ return newRecords;
}
- private boolean newRecords(JobId jobId, ResultSetId rsId, DatasetDirectoryRecord[] knownRecords)
+ /**
+ * Compares the records already known by the client for the given job's result set id with the records that the
+ * dataset directory service knows and if there are any newly discovered records returns a whole array with the
+ * new records filled in.
+ * This method has a very convoluted logic. Here is the explanation of how it works.
+ * If the ordering constraint has to be enforced, the method obtains the first null record in the known records in
+ * the order of the partitions. It always traverses the array in the first to last order!
+ * If known records array or the first element in that array is null in the but the record for that partition now
+ * known to the directory service, the method fills in that record in the array and returns the array back.
+ * However, if the first known null record is not a first element in the array, by induction, all the previous
+ * known records should be known already be known to client and none of the records for the partitions ahead is
+ * known by the client yet. So, we check if the client has reached the end of stream for the partition corresponding
+ * to the record before the first known null record, i.e. the last known non-null record. If not, we just return
+ * null because we cannot expose any new locations until the client reaches end of stream for the last known record.
+ * If the client has reached the end of stream record for the last known non-null record, we check if the next record
+ * is discovered by the dataset directory service and if so, we fill it in the records array and return it back or
+ * send null otherwise.
+ * If the ordering is not required, we are free to return any newly discovered records back, so we just check if
+ * arrays are equal and if they are not we send the entire new updated array.
+ *
+ * @param jobId
+ * - Id of the job for which the directory records should be retrieved.
+ * @param rsId
+ * - Id of the result set for which the directory records should be retrieved.
+ * @param knownRecords
+ * - An array of directory records that the client is already aware of.
+ * @return
+ * - Returns null if there aren't any newly discovered partitions enforcing the ordering constraint
+ * @throws HyracksDataException
+ * TODO(madhusudancs): Think about caching (and still be stateless) instead of this ugly O(n) iterations for
+ * every check. This already looks very expensive.
+ */
+ private DatasetDirectoryRecord[] updatedRecords(JobId jobId, ResultSetId rsId, DatasetDirectoryRecord[] knownRecords)
throws HyracksDataException {
Map<ResultSetId, Pair<Boolean, DatasetDirectoryRecord[]>> rsMap = jobResultLocationsMap.get(jobId);
if (rsMap == null) {
- return false;
+ return null;
}
+
Pair<Boolean, DatasetDirectoryRecord[]> resultSetPair = rsMap.get(rsId);
if (resultSetPair == null || resultSetPair.getRight() == null) {
throw new HyracksDataException("ResultSet locations uninitialized when it is expected to be initialized.");
}
- return !Arrays.equals(resultSetPair.getRight(), knownRecords);
+
+ boolean ordered = resultSetPair.getLeft();
+ DatasetDirectoryRecord[] records = resultSetPair.getRight();
+ /* If ordering is required, we should expose the dataset directory records only in the order, otherwise
+ * we can simply check if there are any newly discovered records and send the whole array back if there are.
+ */
+ if (ordered) {
+ // Iterate over the known records and find the last record which is not null.
+ int i = 0;
+ for (i = 0; i < records.length; i++) {
+ if (knownRecords == null) {
+ if (records[0] != null) {
+ knownRecords = new DatasetDirectoryRecord[records.length];
+ knownRecords[0] = records[0];
+ return knownRecords;
+ }
+ return null;
+ }
+ if (knownRecords[i] == null) {
+ if ((i == 0 || knownRecords[i - 1].getEOS()) && records[i] != null) {
+ knownRecords[i] = records[i];
+ return knownRecords;
+ }
+ return null;
+ }
+ }
+ } else {
+ if (!Arrays.equals(records, knownRecords)) {
+ return records;
+ }
+ }
+ return null;
}
}
\ No newline at end of file