Synchronize the hashmap access of result states correctly.
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_lsm_staging@3293 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
index 1e58b5c..af9a607 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -72,16 +72,18 @@
DatasetPartitionWriter dpw = null;
JobId jobId = ctx.getJobletContext().getJobId();
try {
- ncs.getClusterController().registerResultPartitionLocation(jobId, rsId, orderedResult, partition,
- nPartitions, ncs.getDatasetNetworkManager().getNetworkAddress());
- dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, partition, datasetMemoryManager);
+ synchronized (partitionResultStateMap) {
+ ncs.getClusterController().registerResultPartitionLocation(jobId, rsId, orderedResult, partition,
+ nPartitions, ncs.getDatasetNetworkManager().getNetworkAddress());
+ dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, partition, datasetMemoryManager);
- ResultState[] resultStates = partitionResultStateMap.get(jobId);
- if (resultStates == null) {
- resultStates = new ResultState[nPartitions];
- partitionResultStateMap.put(jobId, resultStates);
+ ResultState[] resultStates = partitionResultStateMap.get(jobId);
+ if (resultStates == null) {
+ resultStates = new ResultState[nPartitions];
+ partitionResultStateMap.put(jobId, resultStates);
+ }
+ resultStates[partition] = dpw.getResultState();
}
- resultStates[partition] = dpw.getResultState();
} catch (Exception e) {
throw new HyracksException(e);
}
@@ -110,15 +112,18 @@
@Override
public void initializeDatasetPartitionReader(JobId jobId, int partition, IFrameWriter writer)
throws HyracksException {
- ResultState[] resultStates = partitionResultStateMap.get(jobId);
+ ResultState resultState;
+ synchronized (partitionResultStateMap) {
+ ResultState[] resultStates = partitionResultStateMap.get(jobId);
- if (resultStates == null) {
- throw new HyracksException("Unknown JobId " + jobId);
- }
+ if (resultStates == null) {
+ throw new HyracksException("Unknown JobId " + jobId);
+ }
- ResultState resultState = resultStates[partition];
- if (resultState == null) {
- throw new HyracksException("No DatasetPartitionWriter for partition " + partition);
+ resultState = resultStates[partition];
+ if (resultState == null) {
+ throw new HyracksException("No DatasetPartitionWriter for partition " + partition);
+ }
}
IDatasetPartitionReader dpr = new DatasetPartitionReader(datasetMemoryManager, executor, resultState);