Synchronize reads and writes to not let the reader attempt to read the file before it is written by the writer.
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@2525 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
index 60aafc2..ba5a2cf 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
@@ -16,6 +16,7 @@
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -42,6 +43,8 @@
private final Executor executor;
+ private final AtomicBoolean eos;
+
private FileReference fRef;
private IFileHandle handle;
@@ -56,6 +59,7 @@
this.manager = manager;
this.partition = partition;
this.executor = executor;
+ eos = new AtomicBoolean(false);
}
@Override
@@ -71,8 +75,9 @@
}
@Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ public synchronized void nextFrame(ByteBuffer buffer) throws HyracksDataException {
size += ctx.getIOManager().syncWrite(handle, size, buffer);
+ notifyAll();
}
@Override
@@ -81,10 +86,12 @@
}
@Override
- public void close() throws HyracksDataException {
+ public synchronized void close() throws HyracksDataException {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("close(" + partition + ")");
}
+ eos.set(true);
+ notifyAll();
/* TODO(madhusudancs): Do something more intelligent here than closing the file handle because read still
* wants it :-P
*/
@@ -96,7 +103,14 @@
return ctx;
}
- private long read(long offset, ByteBuffer buffer) throws HyracksDataException {
+ private synchronized long read(long offset, ByteBuffer buffer) throws HyracksDataException {
+ while (offset >= size && !eos.get()) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
return ctx.getIOManager().syncRead(handle, offset, buffer);
}