Rewrite the dataset directory service.
The changes are:
1. Use a class for metadata instead of keeping it in pairs.
2. Store the serialized record descriptor received per job in the dataset
directory records.
3. Implement a method to return the serialized record descriptor when
requested by the client.
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@2830 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
index ef0bb1d..9cbbb6b 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -18,8 +18,6 @@
import java.util.HashMap;
import java.util.Map;
-import org.apache.commons.lang3.tuple.Pair;
-
import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
import edu.uci.ics.hyracks.api.dataset.IDatasetDirectoryService;
@@ -35,29 +33,29 @@
* job.
*/
public class DatasetDirectoryService implements IDatasetDirectoryService {
- private final Map<JobId, Map<ResultSetId, Pair<Boolean, DatasetDirectoryRecord[]>>> jobResultLocationsMap;
+ private final Map<JobId, Map<ResultSetId, ResultSetMetaData>> jobResultLocationsMap;
public DatasetDirectoryService() {
- jobResultLocationsMap = new HashMap<JobId, Map<ResultSetId, Pair<Boolean, DatasetDirectoryRecord[]>>>();
+ jobResultLocationsMap = new HashMap<JobId, Map<ResultSetId, ResultSetMetaData>>();
}
@Override
- public synchronized void registerResultPartitionLocation(JobId jobId, boolean orderedResult, ResultSetId rsId,
- int partition, int nPartitions, NetworkAddress networkAddress) {
- Map<ResultSetId, Pair<Boolean, DatasetDirectoryRecord[]>> rsMap = jobResultLocationsMap.get(jobId);
+ public synchronized void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult,
+ byte[] serializedRecordDescriptor, int partition, int nPartitions, NetworkAddress networkAddress) {
+ Map<ResultSetId, ResultSetMetaData> rsMap = jobResultLocationsMap.get(jobId);
if (rsMap == null) {
- rsMap = new HashMap<ResultSetId, Pair<Boolean, DatasetDirectoryRecord[]>>();
+ rsMap = new HashMap<ResultSetId, ResultSetMetaData>();
jobResultLocationsMap.put(jobId, rsMap);
}
- Pair<Boolean, DatasetDirectoryRecord[]> resultSetPair = rsMap.get(rsId);
- if (resultSetPair == null) {
- resultSetPair = Pair.<Boolean, DatasetDirectoryRecord[]> of(orderedResult,
- new DatasetDirectoryRecord[nPartitions]);
- rsMap.put(rsId, resultSetPair);
+ ResultSetMetaData resultSetMetaData = rsMap.get(rsId);
+ if (resultSetMetaData == null) {
+ resultSetMetaData = new ResultSetMetaData(orderedResult, new DatasetDirectoryRecord[nPartitions],
+ serializedRecordDescriptor);
+ rsMap.put(rsId, resultSetMetaData);
}
- DatasetDirectoryRecord[] records = resultSetPair.getRight();
+ DatasetDirectoryRecord[] records = resultSetMetaData.getRecords();
if (records[partition] == null) {
records[partition] = new DatasetDirectoryRecord();
}
@@ -66,6 +64,25 @@
}
@Override
+ public synchronized byte[] getRecordDescriptor(JobId jobId, ResultSetId rsId) throws HyracksDataException {
+ Map<ResultSetId, ResultSetMetaData> rsMap;
+ while ((rsMap = jobResultLocationsMap.get(jobId)) == null) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ ResultSetMetaData resultSetMetaData = rsMap.get(rsId);
+ if (resultSetMetaData == null || resultSetMetaData.getRecords() == null) {
+ throw new HyracksDataException("ResultSet locations uninitialized when it is expected to be initialized.");
+ }
+
+ return resultSetMetaData.getSerializedRecordDescriptor();
+ }
+
+ @Override
public synchronized DatasetDirectoryRecord[] getResultPartitionLocations(JobId jobId, ResultSetId rsId,
DatasetDirectoryRecord[] knownRecords) throws HyracksDataException {
DatasetDirectoryRecord[] newRecords;
@@ -113,18 +130,18 @@
*/
private DatasetDirectoryRecord[] updatedRecords(JobId jobId, ResultSetId rsId, DatasetDirectoryRecord[] knownRecords)
throws HyracksDataException {
- Map<ResultSetId, Pair<Boolean, DatasetDirectoryRecord[]>> rsMap = jobResultLocationsMap.get(jobId);
+ Map<ResultSetId, ResultSetMetaData> rsMap = jobResultLocationsMap.get(jobId);
if (rsMap == null) {
return null;
}
- Pair<Boolean, DatasetDirectoryRecord[]> resultSetPair = rsMap.get(rsId);
- if (resultSetPair == null || resultSetPair.getRight() == null) {
+ ResultSetMetaData resultSetMetaData = rsMap.get(rsId);
+ if (resultSetMetaData == null || resultSetMetaData.getRecords() == null) {
throw new HyracksDataException("ResultSet locations uninitialized when it is expected to be initialized.");
}
- boolean ordered = resultSetPair.getLeft();
- DatasetDirectoryRecord[] records = resultSetPair.getRight();
+ boolean ordered = resultSetMetaData.getOrderedResult();
+ DatasetDirectoryRecord[] records = resultSetMetaData.getRecords();
/* If ordering is required, we should expose the dataset directory records only in the order, otherwise
* we can simply check if there are any newly discovered records and send the whole array back if there are.
*/
@@ -155,4 +172,30 @@
}
return null;
}
+
+ private class ResultSetMetaData {
+ private final boolean ordered;
+
+ private final DatasetDirectoryRecord[] records;
+
+ private final byte[] serializedRecordDescriptor;
+
+ public ResultSetMetaData(boolean ordered, DatasetDirectoryRecord[] records, byte[] serializedRecordDescriptor) {
+ this.ordered = ordered;
+ this.records = records;
+ this.serializedRecordDescriptor = serializedRecordDescriptor;
+ }
+
+ public boolean getOrderedResult() {
+ return ordered;
+ }
+
+ public DatasetDirectoryRecord[] getRecords() {
+ return records;
+ }
+
+ public byte[] getSerializedRecordDescriptor() {
+ return serializedRecordDescriptor;
+ }
+ }
}
\ No newline at end of file