Made open non-blocking
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@1017 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java
index 9403736..44e61f1 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java
@@ -32,6 +32,8 @@
private IFrameWriter delegate;
+ private boolean pendingConnection;
+
private boolean failed;
public PipelinedPartition(PartitionManager manager, PartitionId pid, TaskAttemptId taId) {
@@ -57,32 +59,44 @@
}
@Override
- public synchronized void open() throws HyracksDataException {
+ public void open() throws HyracksDataException {
manager.registerPartition(pid, taId, this, PartitionState.STARTED);
failed = false;
- while (delegate == null) {
- try {
- wait();
- } catch (InterruptedException e) {
- throw new HyracksDataException(e);
- }
- }
- delegate.open();
+ pendingConnection = true;
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ ensureConnected();
delegate.nextFrame(buffer);
}
+ private void ensureConnected() throws HyracksDataException {
+ if (pendingConnection) {
+ synchronized (this) {
+ while (delegate == null) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+ delegate.open();
+ }
+ pendingConnection = false;
+ }
+
@Override
public void fail() throws HyracksDataException {
+ ensureConnected();
failed = true;
delegate.fail();
}
@Override
public void close() throws HyracksDataException {
+ ensureConnected();
if (!failed) {
manager.updatePartitionState(pid, taId, this, PartitionState.COMMITTED);
}