commit | c9a82295fbadaa4f8075f72dea257fd5f55b018b | [log] [tgz] |
---|---|---|
author | madhusudancs@gmail.com <madhusudancs@gmail.com@123451ca-8445-de46-9d55-352943316053> | Sat Feb 23 11:48:11 2013 +0000 |
committer | madhusudancs@gmail.com <madhusudancs@gmail.com@123451ca-8445-de46-9d55-352943316053> | Sat Feb 23 11:48:11 2013 +0000 |
tree | b56c44f26060f7b2ab1b74622437ea9aad938d75 | |
parent | ca27294d9ee3d623e4ba5ffef26a56d5ee731434 [diff] |
Decrement the number of available frames after each read and check for the available frames count each time. git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@2983 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetInputChannelMonitor.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetInputChannelMonitor.java index 4561dde..65ba1c7 100644 --- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetInputChannelMonitor.java +++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetInputChannelMonitor.java
@@ -22,4 +22,6 @@ public boolean failed(); public int getNFramesAvailable(); + + public void notifyFrameRead(); }
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDataset.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDataset.java index 94779a6..314b621 100644 --- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDataset.java +++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDataset.java
@@ -135,7 +135,7 @@ } } - if (lastMonitor.eosReached()) { + if (lastMonitor.getNFramesAvailable() <= 0 && lastMonitor.eosReached()) { if ((lastReadPartition == knownRecords.length - 1)) { break; } else { @@ -164,6 +164,7 @@ } } else { readBuffer = resultChannel.getNextBuffer(); + lastMonitor.notifyFrameRead(); if (readBuffer != null) { buffer.put(readBuffer); buffer.flip(); @@ -251,5 +252,11 @@ public synchronized int getNFramesAvailable() { return nAvailableFrames.get(); } + + @Override + public synchronized void notifyFrameRead() { + nAvailableFrames.decrementAndGet(); + } + } }