diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
index ac5a627..3b76000 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
@@ -19,6 +19,7 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
@@ -36,6 +37,7 @@
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 
 public class MaterializingOperatorDescriptor extends AbstractOperatorDescriptor {
@@ -43,24 +45,45 @@
 
     private final static int MATERIALIZER_ACTIVITY_ID = 0;
     private final static int READER_ACTIVITY_ID = 1;
+    private final static int MATERIALIZER_READER_ACTIVITY_ID = 2;
+
+    private boolean isSingleActivity;
 
     public MaterializingOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor recordDescriptor) {
         super(spec, 1, 1);
         recordDescriptors[0] = recordDescriptor;
+        isSingleActivity = false;
+    }
+
+    public MaterializingOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor recordDescriptor,
+            boolean isSingleActivity) {
+        super(spec, 1, 1);
+        recordDescriptors[0] = recordDescriptor;
+        this.isSingleActivity = isSingleActivity;
     }
 
     @Override
     public void contributeActivities(IActivityGraphBuilder builder) {
-        MaterializerActivityNode ma = new MaterializerActivityNode(new ActivityId(odId, MATERIALIZER_ACTIVITY_ID));
-        ReaderActivityNode ra = new ReaderActivityNode(new ActivityId(odId, READER_ACTIVITY_ID));
+        if (isSingleActivity) {
+            MaterializerReaderActivityNode mra = new MaterializerReaderActivityNode(new ActivityId(odId,
+                    MATERIALIZER_READER_ACTIVITY_ID));
 
-        builder.addActivity(this, ma);
-        builder.addSourceEdge(0, ma, 0);
+            builder.addActivity(this, mra);
+            builder.addSourceEdge(0, mra, 0);
+            builder.addTargetEdge(0, mra, 0);
+        } else {
+            MaterializerActivityNode ma = new MaterializerActivityNode(new ActivityId(odId, MATERIALIZER_ACTIVITY_ID));
+            ReaderActivityNode ra = new ReaderActivityNode(new ActivityId(odId, READER_ACTIVITY_ID));
 
-        builder.addActivity(this, ra);
-        builder.addTargetEdge(0, ra, 0);
+            builder.addActivity(this, ma);
+            builder.addSourceEdge(0, ma, 0);
 
-        builder.addBlockingEdge(ma, ra);
+            builder.addActivity(this, ra);
+            builder.addTargetEdge(0, ra, 0);
+
+            builder.addBlockingEdge(ma, ra);
+        }
+
     }
 
     public static class MaterializerTaskState extends AbstractStateObject {
@@ -82,6 +105,72 @@
         public void fromBytes(DataInput in) throws IOException {
 
         }
+
+        public void appendFrame(ByteBuffer buffer) throws HyracksDataException {
+            out.nextFrame(buffer);
+        }
+
+        public void writeOut(IFrameWriter writer, ByteBuffer frame) throws HyracksDataException {
+            RunFileReader in = out.createReader();
+            writer.open();
+            try {
+                in.open();
+                while (in.nextFrame(frame)) {
+                    frame.flip();
+                    writer.nextFrame(frame);
+                    frame.clear();
+                }
+                in.close();
+            } catch (Exception e) {
+                writer.fail();
+                throw new HyracksDataException(e);
+            } finally {
+                writer.close();
+            }
+        }
+    }
+
+    private final class MaterializerReaderActivityNode extends AbstractActivityNode {
+        private static final long serialVersionUID = 1L;
+
+        public MaterializerReaderActivityNode(ActivityId id) {
+            super(id);
+        }
+
+        @Override
+        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+                IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
+            return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+                private MaterializerTaskState state;
+
+                @Override
+                public void open() throws HyracksDataException {
+                    state = new MaterializerTaskState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
+                            partition));
+                    FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
+                            MaterializingOperatorDescriptor.class.getSimpleName());
+                    state.out = new RunFileWriter(file, ctx.getIOManager());
+                    state.out.open();
+                }
+
+                @Override
+                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                    state.appendFrame(buffer);
+                }
+
+                @Override
+                public void fail() throws HyracksDataException {
+                }
+
+                @Override
+                public void close() throws HyracksDataException {
+                    state.out.close();
+                    ByteBuffer frame = ctx.allocateFrame();
+                    state.writeOut(writer, frame);
+                }
+
+            };
+        }
     }
 
     private final class MaterializerActivityNode extends AbstractActivityNode {
@@ -109,7 +198,7 @@
 
                 @Override
                 public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                    state.out.nextFrame(buffer);
+                    state.appendFrame(buffer);
                 }
 
                 @Override
@@ -141,22 +230,7 @@
                     ByteBuffer frame = ctx.allocateFrame();
                     MaterializerTaskState state = (MaterializerTaskState) ctx.getStateObject(new TaskId(new ActivityId(
                             getOperatorId(), MATERIALIZER_ACTIVITY_ID), partition));
-                    RunFileReader in = state.out.createReader();
-                    writer.open();
-                    try {
-                        in.open();
-                        while (in.nextFrame(frame)) {
-                            frame.flip();
-                            writer.nextFrame(frame);
-                            frame.clear();
-                        }
-                        in.close();
-                    } catch (Exception e) {
-                        writer.fail();
-                        throw new HyracksDataException(e);
-                    } finally {
-                        writer.close();
-                    }
+                    state.writeOut(writer, frame);
                 }
 
                 @Override
