Merge fullstack_rsd_fixes from r3309:r3314.
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_lsm_staging@3315 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
index 78bcf20..6419983 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
@@ -116,7 +116,7 @@
}
}
- while (readSize <= 0 && !((lastReadPartition == knownRecords.length - 1) && (lastMonitor.eosReached()))) {
+ while (readSize <= 0 && !(isLastPartitionReadComplete())) {
synchronized (lastMonitor) {
while (lastMonitor.getNFramesAvailable() <= 0 && !lastMonitor.eosReached()) {
try {
@@ -127,7 +127,7 @@
}
}
- if (lastMonitor.getNFramesAvailable() <= 0 && lastMonitor.eosReached()) {
+ if (isPartitionReadComplete(lastMonitor)) {
knownRecords[lastReadPartition].readEOS();
if ((lastReadPartition == knownRecords.length - 1)) {
break;
@@ -182,6 +182,14 @@
return false;
}
+ private boolean isPartitionReadComplete(IDatasetInputChannelMonitor monitor) {
+ return (monitor.getNFramesAvailable() <= 0) && (monitor.eosReached());
+ }
+
+ private boolean isLastPartitionReadComplete() {
+ return ((lastReadPartition == knownRecords.length - 1) && isPartitionReadComplete(lastMonitor));
+ }
+
private SocketAddress getSocketAddress(DatasetDirectoryRecord addr) throws UnknownHostException {
NetworkAddress netAddr = addr.getNetworkAddress();
return new InetSocketAddress(InetAddress.getByAddress(netAddr.getIpAddress()), netAddr.getPort());
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
index cdcdf4c..e648733 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -51,7 +51,7 @@
}
@Override
- public void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf) throws HyracksException {
+ public synchronized void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf) throws HyracksException {
DatasetJobRecord djr = jobResultLocations.get(jobId);
if (djr == null) {
djr = new DatasetJobRecord();
@@ -188,7 +188,7 @@
DatasetJobRecord djr = jobResultLocations.get(jobId);
if (djr == null) {
- throw new HyracksDataException("Requested JobId " + jobId + "doesn't exist");
+ throw new HyracksDataException("Requested JobId " + jobId + " doesn't exist");
}
if (djr.getStatus() == Status.FAILED) {