Complete the implementation of basic DatasetPartitionWriter class to work with the network layer.
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@2517 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
index 0ce4ef3..60aafc2 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
@@ -15,26 +15,132 @@
package edu.uci.ics.hyracks.control.nc.dataset;
import java.nio.ByteBuffer;
+import java.util.concurrent.Executor;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IFileHandle;
+import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.partitions.IPartition;
+import edu.uci.ics.hyracks.comm.channels.NetworkOutputChannel;
-public class DatasetPartitionWriter implements IFrameWriter {
+public class DatasetPartitionWriter implements IFrameWriter, IPartition {
+ private static final Logger LOGGER = Logger.getLogger(DatasetPartitionWriter.class.getName());
+
+ private static final String FILE_PREFIX = "result_";
+
+ private final IHyracksTaskContext ctx;
+
+ private final IDatasetPartitionManager manager;
+
+ private final int partition;
+
+ private final Executor executor;
+
+ private FileReference fRef;
+
+ private IFileHandle handle;
+
+ private long size;
+
+ private boolean failed;
+
+ public DatasetPartitionWriter(IHyracksTaskContext ctx, IDatasetPartitionManager manager, int partition,
+ Executor executor) {
+ this.ctx = ctx;
+ this.manager = manager;
+ this.partition = partition;
+ this.executor = executor;
+ }
@Override
public void open() throws HyracksDataException {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("open(" + partition + ")");
+ }
+ fRef = manager.getFileFactory().createUnmanagedWorkspaceFile(FILE_PREFIX + String.valueOf(partition));
+ handle = ctx.getIOManager().open(fRef, IIOManager.FileReadWriteMode.READ_WRITE,
+ IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+ size = 0;
+ failed = false;
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ size += ctx.getIOManager().syncWrite(handle, size, buffer);
}
@Override
public void fail() throws HyracksDataException {
+ failed = true;
}
@Override
public void close() throws HyracksDataException {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("close(" + partition + ")");
+ }
+ /* TODO(madhusudancs): Do something more intelligent here than closing the file handle because read still
+ * wants it :-P
+ */
+ // ctx.getIOManager().close(handle);
}
+ @Override
+ public IHyracksTaskContext getTaskContext() {
+ return ctx;
+ }
+
+ private long read(long offset, ByteBuffer buffer) throws HyracksDataException {
+ return ctx.getIOManager().syncRead(handle, offset, buffer);
+ }
+
+ @Override
+ public void writeTo(final IFrameWriter writer) {
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ NetworkOutputChannel channel = (NetworkOutputChannel) writer;
+ channel.setTaskContext(ctx);
+ try {
+ channel.open();
+ try {
+ long offset = 0;
+ ByteBuffer buffer = ctx.allocateFrame();
+ while (true) {
+ buffer.clear();
+ long size = read(offset, buffer);
+ if (size < 0) {
+ break;
+ } else if (size < buffer.capacity()) {
+ throw new HyracksDataException("Premature end of file");
+ }
+ offset += size;
+ buffer.flip();
+ channel.nextFrame(buffer);
+ }
+ } finally {
+ channel.close();
+ }
+ } catch (HyracksDataException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ }
+
+ @Override
+ public boolean isReusable() {
+ return true;
+ }
+
+ @Override
+ public void deallocate() {
+
+ }
}