Added pipelining while materializing partition data

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@727 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
index e579da7..b742316 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
@@ -24,6 +24,7 @@
 import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.partitions.IPartition;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
 
 public class MaterializedPartitionInputChannel implements IInputChannel {
@@ -84,7 +85,7 @@
 
     @Override
     public void open() throws HyracksDataException {
-        MaterializedPartition partition = (MaterializedPartition) manager.getPartition(pid);
+        IPartition partition = manager.getPartition(pid);
         partition.writeTo(writer);
     }
 
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
index b989805..1a1b15f 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
@@ -33,15 +33,15 @@
 public class MaterializedPartitionWriter implements IFrameWriter {
     private static final Logger LOGGER = Logger.getLogger(MaterializedPartitionWriter.class.getName());
 
-    protected final IHyracksRootContext ctx;
+    private final IHyracksRootContext ctx;
 
-    protected final PartitionManager manager;
+    private final PartitionManager manager;
 
-    protected final PartitionId pid;
+    private final PartitionId pid;
 
-    protected final TaskAttemptId taId;
+    private final TaskAttemptId taId;
 
-    protected final Executor executor;
+    private final Executor executor;
 
     private FileReference fRef;
 
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
new file mode 100644
index 0000000..143fd2c
--- /dev/null
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
@@ -0,0 +1,175 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.partitions;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.Executor;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileHandle;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.partitions.IPartition;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.job.PartitionState;
+import edu.uci.ics.hyracks.control.nc.io.IOManager;
+
+public class MaterializingPipelinedPartition implements IFrameWriter, IPartition {
+    private static final Logger LOGGER = Logger.getLogger(MaterializingPipelinedPartition.class.getName());
+
+    private final IHyracksRootContext ctx;
+
+    private final Executor executor;
+
+    private final IOManager ioManager;
+
+    private final PartitionManager manager;
+
+    private final PartitionId pid;
+
+    private final TaskAttemptId taId;
+
+    private FileReference fRef;
+
+    private FileHandle handle;
+
+    private long size;
+
+    private boolean eos;
+
+    private boolean failed;
+
+    public MaterializingPipelinedPartition(IHyracksRootContext ctx, PartitionManager manager, PartitionId pid,
+            TaskAttemptId taId, Executor executor) {
+        this.ctx = ctx;
+        this.executor = executor;
+        this.ioManager = (IOManager) ctx.getIOManager();
+        this.manager = manager;
+        this.pid = pid;
+        this.taId = taId;
+    }
+
+    @Override
+    public void deallocate() {
+        fRef.delete();
+    }
+
+    @Override
+    public void writeTo(final IFrameWriter writer) {
+        executor.execute(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    FileHandle fh = ioManager.open(fRef, IIOManager.FileReadWriteMode.READ_ONLY,
+                            IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+                    try {
+                        writer.open();
+                        try {
+                            long offset = 0;
+                            ByteBuffer buffer = ctx.allocateFrame();
+                            boolean fail = false;
+                            boolean done = false;
+                            while (!fail && !done) {
+                                synchronized (MaterializingPipelinedPartition.this) {
+                                    while (offset >= size && !eos && !failed) {
+                                        try {
+                                            MaterializingPipelinedPartition.this.wait();
+                                        } catch (InterruptedException e) {
+                                            throw new HyracksDataException(e);
+                                        }
+                                    }
+                                    fail = failed;
+                                    done = eos && offset >= size;
+                                }
+                                if (fail) {
+                                    writer.fail();
+                                } else if (!done) {
+                                    buffer.clear();
+                                    long readLen = ioManager.syncRead(fh, offset, buffer);
+                                    if (readLen < buffer.capacity()) {
+                                        throw new HyracksDataException("Premature end of file");
+                                    }
+                                    offset += readLen;
+                                    buffer.flip();
+                                    writer.nextFrame(buffer);
+                                }
+                            }
+                        } finally {
+                            writer.close();
+                        }
+                    } finally {
+                        ioManager.close(fh);
+                    }
+                } catch (HyracksDataException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        });
+    }
+
+    @Override
+    public boolean isReusable() {
+        return true;
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("open(" + pid + " by " + taId);
+        }
+        fRef = manager.getFileFactory().createUnmanagedWorkspaceFile(pid.toString());
+        handle = ctx.getIOManager().open(fRef, IIOManager.FileReadWriteMode.READ_WRITE,
+                IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+        size = 0;
+        eos = false;
+        failed = false;
+        manager.registerPartition(pid, taId, this, PartitionState.STARTED);
+    }
+
+    @Override
+    public synchronized void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        size += ctx.getIOManager().syncWrite(handle, size, buffer);
+        notifyAll();
+    }
+
+    @Override
+    public synchronized void fail() throws HyracksDataException {
+        failed = true;
+        notifyAll();
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("close(" + pid + " by " + taId);
+        }
+        boolean commit = false;
+        synchronized (this) {
+            eos = true;
+            ctx.getIOManager().close(handle);
+            handle = null;
+            commit = !failed;
+            notifyAll();
+        }
+        if (commit) {
+            manager.updatePartitionState(pid, taId, this, PartitionState.COMMITTED);
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
index ed83451..51c829a 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
@@ -46,6 +46,7 @@
 import edu.uci.ics.hyracks.control.nc.Task;
 import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
 import edu.uci.ics.hyracks.control.nc.partitions.MaterializedPartitionWriter;
+import edu.uci.ics.hyracks.control.nc.partitions.MaterializingPipelinedPartition;
 import edu.uci.ics.hyracks.control.nc.partitions.PipelinedPartition;
 import edu.uci.ics.hyracks.control.nc.partitions.ReceiveSideMaterializingCollector;
 
@@ -182,14 +183,25 @@
     private IPartitionWriterFactory createPartitionWriterFactory(IConnectorPolicy cPolicy, final JobId jobId,
             final IConnectorDescriptor conn, final int senderIndex, final TaskAttemptId taId) {
         if (cPolicy.materializeOnSendSide()) {
-            return new IPartitionWriterFactory() {
-                @Override
-                public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
-                    return new MaterializedPartitionWriter(ncs.getRootContext(), ncs.getPartitionManager(),
-                            new PartitionId(jobId, conn.getConnectorId(), senderIndex, receiverIndex), taId,
-                            ncs.getExecutor());
-                }
-            };
+            if (cPolicy.consumerWaitsForProducerToFinish()) {
+                return new IPartitionWriterFactory() {
+                    @Override
+                    public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
+                        return new MaterializedPartitionWriter(ncs.getRootContext(), ncs.getPartitionManager(),
+                                new PartitionId(jobId, conn.getConnectorId(), senderIndex, receiverIndex), taId,
+                                ncs.getExecutor());
+                    }
+                };
+            } else {
+                return new IPartitionWriterFactory() {
+                    @Override
+                    public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
+                        return new MaterializingPipelinedPartition(ncs.getRootContext(), ncs.getPartitionManager(),
+                                new PartitionId(jobId, conn.getConnectorId(), senderIndex, receiverIndex), taId,
+                                ncs.getExecutor());
+                    }
+                };
+            }
         } else {
             return new IPartitionWriterFactory() {
                 @Override