Synchronize the hashmap access of result states correctly.

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_lsm_staging@3293 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
index 1e58b5c..af9a607 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -72,16 +72,18 @@
         DatasetPartitionWriter dpw = null;
         JobId jobId = ctx.getJobletContext().getJobId();
         try {
-            ncs.getClusterController().registerResultPartitionLocation(jobId, rsId, orderedResult, partition,
-                    nPartitions, ncs.getDatasetNetworkManager().getNetworkAddress());
-            dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, partition, datasetMemoryManager);
+            synchronized (partitionResultStateMap) {
+                ncs.getClusterController().registerResultPartitionLocation(jobId, rsId, orderedResult, partition,
+                        nPartitions, ncs.getDatasetNetworkManager().getNetworkAddress());
+                dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, partition, datasetMemoryManager);
 
-            ResultState[] resultStates = partitionResultStateMap.get(jobId);
-            if (resultStates == null) {
-                resultStates = new ResultState[nPartitions];
-                partitionResultStateMap.put(jobId, resultStates);
+                ResultState[] resultStates = partitionResultStateMap.get(jobId);
+                if (resultStates == null) {
+                    resultStates = new ResultState[nPartitions];
+                    partitionResultStateMap.put(jobId, resultStates);
+                }
+                resultStates[partition] = dpw.getResultState();
             }
-            resultStates[partition] = dpw.getResultState();
         } catch (Exception e) {
             throw new HyracksException(e);
         }
@@ -110,15 +112,18 @@
     @Override
     public void initializeDatasetPartitionReader(JobId jobId, int partition, IFrameWriter writer)
             throws HyracksException {
-        ResultState[] resultStates = partitionResultStateMap.get(jobId);
+        ResultState resultState;
+        synchronized (partitionResultStateMap) {
+            ResultState[] resultStates = partitionResultStateMap.get(jobId);
 
-        if (resultStates == null) {
-            throw new HyracksException("Unknown JobId " + jobId);
-        }
+            if (resultStates == null) {
+                throw new HyracksException("Unknown JobId " + jobId);
+            }
 
-        ResultState resultState = resultStates[partition];
-        if (resultState == null) {
-            throw new HyracksException("No DatasetPartitionWriter for partition " + partition);
+            resultState = resultStates[partition];
+            if (resultState == null) {
+                throw new HyracksException("No DatasetPartitionWriter for partition " + partition);
+            }
         }
 
         IDatasetPartitionReader dpr = new DatasetPartitionReader(datasetMemoryManager, executor, resultState);