Decrement the number of available frames after each read and check for the available frames count each time.

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@2983 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetInputChannelMonitor.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetInputChannelMonitor.java
index 4561dde..65ba1c7 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetInputChannelMonitor.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetInputChannelMonitor.java
@@ -22,4 +22,6 @@
     public boolean failed();
 
     public int getNFramesAvailable();
+
+    public void notifyFrameRead();
 }
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDataset.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDataset.java
index 94779a6..314b621 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDataset.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDataset.java
@@ -135,7 +135,7 @@
                 }
             }
 
-            if (lastMonitor.eosReached()) {
+            if (lastMonitor.getNFramesAvailable() <= 0 && lastMonitor.eosReached()) {
                 if ((lastReadPartition == knownRecords.length - 1)) {
                     break;
                 } else {
@@ -164,6 +164,7 @@
                 }
             } else {
                 readBuffer = resultChannel.getNextBuffer();
+                lastMonitor.notifyFrameRead();
                 if (readBuffer != null) {
                     buffer.put(readBuffer);
                     buffer.flip();
@@ -251,5 +252,11 @@
         public synchronized int getNFramesAvailable() {
             return nAvailableFrames.get();
         }
+
+        @Override
+        public synchronized void notifyFrameRead() {
+            nAvailableFrames.decrementAndGet();
+        }
+
     }
 }