Merge fullstack_rsd_fixes from r3309:r3314.

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_lsm_staging@3315 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
index 78bcf20..6419983 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
@@ -116,7 +116,7 @@
             }
         }
 
-        while (readSize <= 0 && !((lastReadPartition == knownRecords.length - 1) && (lastMonitor.eosReached()))) {
+        while (readSize <= 0 && !(isLastPartitionReadComplete())) {
             synchronized (lastMonitor) {
                 while (lastMonitor.getNFramesAvailable() <= 0 && !lastMonitor.eosReached()) {
                     try {
@@ -127,7 +127,7 @@
                 }
             }
 
-            if (lastMonitor.getNFramesAvailable() <= 0 && lastMonitor.eosReached()) {
+            if (isPartitionReadComplete(lastMonitor)) {
                 knownRecords[lastReadPartition].readEOS();
                 if ((lastReadPartition == knownRecords.length - 1)) {
                     break;
@@ -182,6 +182,14 @@
         return false;
     }
 
+    private boolean isPartitionReadComplete(IDatasetInputChannelMonitor monitor) {
+        return (monitor.getNFramesAvailable() <= 0) && (monitor.eosReached());
+    }
+
+    private boolean isLastPartitionReadComplete() {
+        return ((lastReadPartition == knownRecords.length - 1) && isPartitionReadComplete(lastMonitor));
+    }
+
     private SocketAddress getSocketAddress(DatasetDirectoryRecord addr) throws UnknownHostException {
         NetworkAddress netAddr = addr.getNetworkAddress();
         return new InetSocketAddress(InetAddress.getByAddress(netAddr.getIpAddress()), netAddr.getPort());
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
index cdcdf4c..e648733 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -51,7 +51,7 @@
     }
 
     @Override
-    public void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf) throws HyracksException {
+    public synchronized void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf) throws HyracksException {
         DatasetJobRecord djr = jobResultLocations.get(jobId);
         if (djr == null) {
             djr = new DatasetJobRecord();
@@ -188,7 +188,7 @@
         DatasetJobRecord djr = jobResultLocations.get(jobId);
 
         if (djr == null) {
-            throw new HyracksDataException("Requested JobId " + jobId + "doesn't exist");
+            throw new HyracksDataException("Requested JobId " + jobId + " doesn't exist");
         }
 
         if (djr.getStatus() == Status.FAILED) {