Added a single activity materializing operator.
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
index ac5a627..3b76000 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
@@ -19,6 +19,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
@@ -36,6 +37,7 @@
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
public class MaterializingOperatorDescriptor extends AbstractOperatorDescriptor {
@@ -43,24 +45,45 @@
private final static int MATERIALIZER_ACTIVITY_ID = 0;
private final static int READER_ACTIVITY_ID = 1;
+ private final static int MATERIALIZER_READER_ACTIVITY_ID = 2;
+
+ private boolean isSingleActivity;
public MaterializingOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor recordDescriptor) {
super(spec, 1, 1);
recordDescriptors[0] = recordDescriptor;
+ isSingleActivity = false;
+ }
+
+ public MaterializingOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor recordDescriptor,
+ boolean isSingleActivity) {
+ super(spec, 1, 1);
+ recordDescriptors[0] = recordDescriptor;
+ this.isSingleActivity = isSingleActivity;
}
@Override
public void contributeActivities(IActivityGraphBuilder builder) {
- MaterializerActivityNode ma = new MaterializerActivityNode(new ActivityId(odId, MATERIALIZER_ACTIVITY_ID));
- ReaderActivityNode ra = new ReaderActivityNode(new ActivityId(odId, READER_ACTIVITY_ID));
+ if (isSingleActivity) {
+ MaterializerReaderActivityNode mra = new MaterializerReaderActivityNode(new ActivityId(odId,
+ MATERIALIZER_READER_ACTIVITY_ID));
- builder.addActivity(this, ma);
- builder.addSourceEdge(0, ma, 0);
+ builder.addActivity(this, mra);
+ builder.addSourceEdge(0, mra, 0);
+ builder.addTargetEdge(0, mra, 0);
+ } else {
+ MaterializerActivityNode ma = new MaterializerActivityNode(new ActivityId(odId, MATERIALIZER_ACTIVITY_ID));
+ ReaderActivityNode ra = new ReaderActivityNode(new ActivityId(odId, READER_ACTIVITY_ID));
- builder.addActivity(this, ra);
- builder.addTargetEdge(0, ra, 0);
+ builder.addActivity(this, ma);
+ builder.addSourceEdge(0, ma, 0);
- builder.addBlockingEdge(ma, ra);
+ builder.addActivity(this, ra);
+ builder.addTargetEdge(0, ra, 0);
+
+ builder.addBlockingEdge(ma, ra);
+ }
+
}
public static class MaterializerTaskState extends AbstractStateObject {
@@ -82,6 +105,72 @@
public void fromBytes(DataInput in) throws IOException {
}
+
+ public void appendFrame(ByteBuffer buffer) throws HyracksDataException {
+ out.nextFrame(buffer);
+ }
+
+ public void writeOut(IFrameWriter writer, ByteBuffer frame) throws HyracksDataException {
+ RunFileReader in = out.createReader();
+ writer.open();
+ try {
+ in.open();
+ while (in.nextFrame(frame)) {
+ frame.flip();
+ writer.nextFrame(frame);
+ frame.clear();
+ }
+ in.close();
+ } catch (Exception e) {
+ writer.fail();
+ throw new HyracksDataException(e);
+ } finally {
+ writer.close();
+ }
+ }
+ }
+
+ private final class MaterializerReaderActivityNode extends AbstractActivityNode {
+ private static final long serialVersionUID = 1L;
+
+ public MaterializerReaderActivityNode(ActivityId id) {
+ super(id);
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
+ return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+ private MaterializerTaskState state;
+
+ @Override
+ public void open() throws HyracksDataException {
+ state = new MaterializerTaskState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
+ partition));
+ FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
+ MaterializingOperatorDescriptor.class.getSimpleName());
+ state.out = new RunFileWriter(file, ctx.getIOManager());
+ state.out.open();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ state.appendFrame(buffer);
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ state.out.close();
+ ByteBuffer frame = ctx.allocateFrame();
+ state.writeOut(writer, frame);
+ }
+
+ };
+ }
}
private final class MaterializerActivityNode extends AbstractActivityNode {
@@ -109,7 +198,7 @@
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- state.out.nextFrame(buffer);
+ state.appendFrame(buffer);
}
@Override
@@ -141,22 +230,7 @@
ByteBuffer frame = ctx.allocateFrame();
MaterializerTaskState state = (MaterializerTaskState) ctx.getStateObject(new TaskId(new ActivityId(
getOperatorId(), MATERIALIZER_ACTIVITY_ID), partition));
- RunFileReader in = state.out.createReader();
- writer.open();
- try {
- in.open();
- while (in.nextFrame(frame)) {
- frame.flip();
- writer.nextFrame(frame);
- frame.clear();
- }
- in.close();
- } catch (Exception e) {
- writer.fail();
- throw new HyracksDataException(e);
- } finally {
- writer.close();
- }
+ state.writeOut(writer, frame);
}
@Override