Made open non-blocking

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@1017 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java
index 9403736..44e61f1 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java
@@ -32,6 +32,8 @@
 
     private IFrameWriter delegate;
 
+    private boolean pendingConnection;
+
     private boolean failed;
 
     public PipelinedPartition(PartitionManager manager, PartitionId pid, TaskAttemptId taId) {
@@ -57,32 +59,44 @@
     }
 
     @Override
-    public synchronized void open() throws HyracksDataException {
+    public void open() throws HyracksDataException {
         manager.registerPartition(pid, taId, this, PartitionState.STARTED);
         failed = false;
-        while (delegate == null) {
-            try {
-                wait();
-            } catch (InterruptedException e) {
-                throw new HyracksDataException(e);
-            }
-        }
-        delegate.open();
+        pendingConnection = true;
     }
 
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        ensureConnected();
         delegate.nextFrame(buffer);
     }
 
+    private void ensureConnected() throws HyracksDataException {
+        if (pendingConnection) {
+            synchronized (this) {
+                while (delegate == null) {
+                    try {
+                        wait();
+                    } catch (InterruptedException e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+            }
+            delegate.open();
+        }
+        pendingConnection = false;
+    }
+
     @Override
     public void fail() throws HyracksDataException {
+        ensureConnected();
         failed = true;
         delegate.fail();
     }
 
     @Override
     public void close() throws HyracksDataException {
+        ensureConnected();
         if (!failed) {
             manager.updatePartitionState(pid, taId, this, PartitionState.COMMITTED);
         }