Synchronize reads and writes to not let the reader attempt to read the file before it is written by the writer.

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@2525 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
index 60aafc2..ba5a2cf 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
@@ -16,6 +16,7 @@
 
 import java.nio.ByteBuffer;
 import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -42,6 +43,8 @@
 
     private final Executor executor;
 
+    private final AtomicBoolean eos;
+
     private FileReference fRef;
 
     private IFileHandle handle;
@@ -56,6 +59,7 @@
         this.manager = manager;
         this.partition = partition;
         this.executor = executor;
+        eos = new AtomicBoolean(false);
     }
 
     @Override
@@ -71,8 +75,9 @@
     }
 
     @Override
-    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+    public synchronized void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         size += ctx.getIOManager().syncWrite(handle, size, buffer);
+        notifyAll();
     }
 
     @Override
@@ -81,10 +86,12 @@
     }
 
     @Override
-    public void close() throws HyracksDataException {
+    public synchronized void close() throws HyracksDataException {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("close(" + partition + ")");
         }
+        eos.set(true);
+        notifyAll();
         /* TODO(madhusudancs): Do something more intelligent here than closing the file handle because read still
          * wants it :-P
          */
@@ -96,7 +103,14 @@
         return ctx;
     }
 
-    private long read(long offset, ByteBuffer buffer) throws HyracksDataException {
+    private synchronized long read(long offset, ByteBuffer buffer) throws HyracksDataException {
+        while (offset >= size && !eos.get()) {
+            try {
+                wait();
+            } catch (InterruptedException e) {
+                throw new HyracksDataException(e);
+            }
+        }
         return ctx.getIOManager().syncRead(handle, offset, buffer);
     }