Added pipelining while materializing partition data
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@727 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
index e579da7..b742316 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
@@ -24,6 +24,7 @@
import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.partitions.IPartition;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
public class MaterializedPartitionInputChannel implements IInputChannel {
@@ -84,7 +85,7 @@
@Override
public void open() throws HyracksDataException {
- MaterializedPartition partition = (MaterializedPartition) manager.getPartition(pid);
+ IPartition partition = manager.getPartition(pid);
partition.writeTo(writer);
}
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
index b989805..1a1b15f 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
@@ -33,15 +33,15 @@
public class MaterializedPartitionWriter implements IFrameWriter {
private static final Logger LOGGER = Logger.getLogger(MaterializedPartitionWriter.class.getName());
- protected final IHyracksRootContext ctx;
+ private final IHyracksRootContext ctx;
- protected final PartitionManager manager;
+ private final PartitionManager manager;
- protected final PartitionId pid;
+ private final PartitionId pid;
- protected final TaskAttemptId taId;
+ private final TaskAttemptId taId;
- protected final Executor executor;
+ private final Executor executor;
private FileReference fRef;
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
new file mode 100644
index 0000000..143fd2c
--- /dev/null
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
@@ -0,0 +1,175 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.partitions;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.Executor;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileHandle;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.partitions.IPartition;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.job.PartitionState;
+import edu.uci.ics.hyracks.control.nc.io.IOManager;
+
+public class MaterializingPipelinedPartition implements IFrameWriter, IPartition {
+ private static final Logger LOGGER = Logger.getLogger(MaterializingPipelinedPartition.class.getName());
+
+ private final IHyracksRootContext ctx;
+
+ private final Executor executor;
+
+ private final IOManager ioManager;
+
+ private final PartitionManager manager;
+
+ private final PartitionId pid;
+
+ private final TaskAttemptId taId;
+
+ private FileReference fRef;
+
+ private FileHandle handle;
+
+ private long size;
+
+ private boolean eos;
+
+ private boolean failed;
+
+ public MaterializingPipelinedPartition(IHyracksRootContext ctx, PartitionManager manager, PartitionId pid,
+ TaskAttemptId taId, Executor executor) {
+ this.ctx = ctx;
+ this.executor = executor;
+ this.ioManager = (IOManager) ctx.getIOManager();
+ this.manager = manager;
+ this.pid = pid;
+ this.taId = taId;
+ }
+
+ @Override
+ public void deallocate() {
+ fRef.delete();
+ }
+
+ @Override
+ public void writeTo(final IFrameWriter writer) {
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ FileHandle fh = ioManager.open(fRef, IIOManager.FileReadWriteMode.READ_ONLY,
+ IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+ try {
+ writer.open();
+ try {
+ long offset = 0;
+ ByteBuffer buffer = ctx.allocateFrame();
+ boolean fail = false;
+ boolean done = false;
+ while (!fail && !done) {
+ synchronized (MaterializingPipelinedPartition.this) {
+ while (offset >= size && !eos && !failed) {
+ try {
+ MaterializingPipelinedPartition.this.wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ fail = failed;
+ done = eos && offset >= size;
+ }
+ if (fail) {
+ writer.fail();
+ } else if (!done) {
+ buffer.clear();
+ long readLen = ioManager.syncRead(fh, offset, buffer);
+ if (readLen < buffer.capacity()) {
+ throw new HyracksDataException("Premature end of file");
+ }
+ offset += readLen;
+ buffer.flip();
+ writer.nextFrame(buffer);
+ }
+ }
+ } finally {
+ writer.close();
+ }
+ } finally {
+ ioManager.close(fh);
+ }
+ } catch (HyracksDataException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ }
+
+ @Override
+ public boolean isReusable() {
+ return true;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("open(" + pid + " by " + taId);
+ }
+ fRef = manager.getFileFactory().createUnmanagedWorkspaceFile(pid.toString());
+ handle = ctx.getIOManager().open(fRef, IIOManager.FileReadWriteMode.READ_WRITE,
+ IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+ size = 0;
+ eos = false;
+ failed = false;
+ manager.registerPartition(pid, taId, this, PartitionState.STARTED);
+ }
+
+ @Override
+ public synchronized void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ size += ctx.getIOManager().syncWrite(handle, size, buffer);
+ notifyAll();
+ }
+
+ @Override
+ public synchronized void fail() throws HyracksDataException {
+ failed = true;
+ notifyAll();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("close(" + pid + " by " + taId);
+ }
+ boolean commit = false;
+ synchronized (this) {
+ eos = true;
+ ctx.getIOManager().close(handle);
+ handle = null;
+ commit = !failed;
+ notifyAll();
+ }
+ if (commit) {
+ manager.updatePartitionState(pid, taId, this, PartitionState.COMMITTED);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
index ed83451..51c829a 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
@@ -46,6 +46,7 @@
import edu.uci.ics.hyracks.control.nc.Task;
import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
import edu.uci.ics.hyracks.control.nc.partitions.MaterializedPartitionWriter;
+import edu.uci.ics.hyracks.control.nc.partitions.MaterializingPipelinedPartition;
import edu.uci.ics.hyracks.control.nc.partitions.PipelinedPartition;
import edu.uci.ics.hyracks.control.nc.partitions.ReceiveSideMaterializingCollector;
@@ -182,14 +183,25 @@
private IPartitionWriterFactory createPartitionWriterFactory(IConnectorPolicy cPolicy, final JobId jobId,
final IConnectorDescriptor conn, final int senderIndex, final TaskAttemptId taId) {
if (cPolicy.materializeOnSendSide()) {
- return new IPartitionWriterFactory() {
- @Override
- public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
- return new MaterializedPartitionWriter(ncs.getRootContext(), ncs.getPartitionManager(),
- new PartitionId(jobId, conn.getConnectorId(), senderIndex, receiverIndex), taId,
- ncs.getExecutor());
- }
- };
+ if (cPolicy.consumerWaitsForProducerToFinish()) {
+ return new IPartitionWriterFactory() {
+ @Override
+ public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
+ return new MaterializedPartitionWriter(ncs.getRootContext(), ncs.getPartitionManager(),
+ new PartitionId(jobId, conn.getConnectorId(), senderIndex, receiverIndex), taId,
+ ncs.getExecutor());
+ }
+ };
+ } else {
+ return new IPartitionWriterFactory() {
+ @Override
+ public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
+ return new MaterializingPipelinedPartition(ncs.getRootContext(), ncs.getPartitionManager(),
+ new PartitionId(jobId, conn.getConnectorId(), senderIndex, receiverIndex), taId,
+ ncs.getExecutor());
+ }
+ };
+ }
} else {
return new IPartitionWriterFactory() {
@Override