Refactor ResultWriterOperatorDescriptor to use the new interface names and use the rewritten FrameOutput with different arguments to be passed.
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@2980 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
index 66b8fbd..7518449 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
@@ -15,24 +15,23 @@
package edu.uci.ics.hyracks.dataflow.std.result;
import java.io.IOException;
+import java.io.PrintStream;
import java.nio.ByteBuffer;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import edu.uci.ics.hyracks.api.dataflow.value.IResultSerializedAppender;
-import edu.uci.ics.hyracks.api.dataflow.value.IResultSerializedAppenderFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IResultSerializer;
+import edu.uci.ics.hyracks.api.dataflow.value.IResultSerializerFactory;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
-import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameOutputStream;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameOutputStream;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
@@ -45,8 +44,7 @@
private final RecordDescriptor recordDescriptor;
- private final IResultSerializedAppenderFactory resultSerializedAppenderFactory;
-
+ private final IResultSerializerFactory resultSerializerFactory;
public ResultWriterOperatorDescriptor(IOperatorDescriptorRegistry spec, ResultSetId rsId, boolean ordered,
RecordDescriptor recordDescriptor, IResultSerializerFactory resultSerializerFactory) throws IOException {
@@ -54,7 +52,7 @@
this.rsId = rsId;
this.ordered = ordered;
this.recordDescriptor = recordDescriptor;
- this.resultSerializedAppenderFactory = resultSerializedAppenderFactory;
+ this.resultSerializerFactory = resultSerializerFactory;
}
@Override
@@ -62,16 +60,13 @@
IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) {
final IDatasetPartitionManager dpm = ctx.getDatasetPartitionManager();
- final ByteBuffer tempBuffer = ctx.allocateFrame();
- tempBuffer.clear();
+ final ByteBuffer outputBuffer = ctx.allocateFrame();
- final FrameTupleAppender frameTupleAppender = new FrameTupleAppender(ctx.getFrameSize());
- frameTupleAppender.reset(tempBuffer, true);
+ final FrameOutputStream frameOutputStream = new FrameOutputStream(ctx.getFrameSize());
+ frameOutputStream.reset(outputBuffer, true);
+ PrintStream printStream = new PrintStream(frameOutputStream);
- final FrameOutputStream frameOutputStream = new FrameOutputStream(frameTupleAppender);
-
- final IResultSerializedAppender resultSerializedAppender = resultSerializedAppenderFactory
- .createResultSerializer(frameOutputStream);
+ final IResultSerializer resultSerializer = resultSerializerFactory.createResultSerializer(printStream);
final FrameTupleAccessor frameTupleAccessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
@@ -92,14 +87,16 @@
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
frameTupleAccessor.reset(buffer);
for (int tIndex = 0; tIndex < frameTupleAccessor.getTupleCount(); tIndex++) {
- if (!resultSerializedAppender.appendTuple(frameTupleAccessor, tIndex)) {
- datasetPartitionWriter.nextFrame(tempBuffer);
- frameTupleAppender.reset(tempBuffer, true);
+ resultSerializer.appendTuple(frameTupleAccessor, tIndex);
+ if (!frameOutputStream.appendTuple()) {
+ datasetPartitionWriter.nextFrame(outputBuffer);
+ frameOutputStream.reset(outputBuffer, true);
- /* TODO(madhusudancs): This works under the assumption that no single JSON-ified record is
+ /* TODO(madhusudancs): This works under the assumption that no single serialized record is
* longer than the buffer size.
*/
- resultSerializedAppender.appendTuple(frameTupleAccessor, tIndex);
+ resultSerializer.appendTuple(frameTupleAccessor, tIndex);
+ frameOutputStream.appendTuple();
}
}
}
@@ -111,9 +108,9 @@
@Override
public void close() throws HyracksDataException {
- if (frameTupleAppender.getTupleCount() > 0) {
- datasetPartitionWriter.nextFrame(tempBuffer);
- frameTupleAppender.reset(tempBuffer, true);
+ if (frameOutputStream.getTupleCount() > 0) {
+ datasetPartitionWriter.nextFrame(outputBuffer);
+ frameOutputStream.reset(outputBuffer, true);
}
datasetPartitionWriter.close();
}