Added flush call to IFrameWriter

git-svn-id: https://hyracks.googlecode.com/svn/trunk@107 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameWriter.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameWriter.java
index eccd6cb..0ded474 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameWriter.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameWriter.java
@@ -23,5 +23,7 @@
 
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException;
 
+    public void flush() throws HyracksDataException;
+
     public void close() throws HyracksDataException;
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index b80d0f5..5677827 100644
--- a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -390,6 +390,11 @@
                         .put("framecount." + conn.getConnectorId().getId() + ".sender." + senderIndex + "."
                                 + receiverIndex, String.valueOf(frameCount));
             }
+
+            @Override
+            public void flush() throws HyracksDataException {
+                writer.flush();
+            }
         } : writer;
     }
 
diff --git a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/comm/ConnectionManager.java b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/comm/ConnectionManager.java
index b4868c0..927d67b 100644
--- a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/comm/ConnectionManager.java
+++ b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/comm/ConnectionManager.java
@@ -229,6 +229,10 @@
         @Override
         public void open() throws HyracksDataException {
         }
+
+        @Override
+        public void flush() {
+        }
     }
 
     private final class ConnectionListenerThread extends Thread {
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameDeserializingDataWriter.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameDeserializingDataWriter.java
index ca7867f..0a56bea 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameDeserializingDataWriter.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameDeserializingDataWriter.java
@@ -50,4 +50,8 @@
     public void open() throws HyracksDataException {
         writer.open();
     }
+
+    @Override
+    public void flush() {
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractUnaryOutputSourceOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractUnaryOutputSourceOperatorNodePushable.java
index bebf7d4..f5180f8 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractUnaryOutputSourceOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractUnaryOutputSourceOperatorNodePushable.java
@@ -23,4 +23,8 @@
     public final void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         throw new UnsupportedOperationException();
     }
+
+    @Override
+    public final void flush() throws HyracksDataException {
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/HashDataWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/HashDataWriter.java
index 0bbf252..5e934c8 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/HashDataWriter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/HashDataWriter.java
@@ -92,4 +92,18 @@
             }
         }
     }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        for (int i = 0; i < appenders.length; ++i) {
+            FrameTupleAppender appender = appenders[i];
+            if (appender.getTupleCount() > 0) {
+                ByteBuffer buffer = appender.getBuffer();
+                IFrameWriter frameWriter = epWriters[i];
+                flushFrame(buffer, frameWriter);
+                epWriters[i].flush();
+                appender.reset(buffer, true);
+            }
+        }
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNRangePartitioningConnectorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNRangePartitioningConnectorDescriptor.java
index de8abb4..5346334 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNRangePartitioningConnectorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNRangePartitioningConnectorDescriptor.java
@@ -90,6 +90,20 @@
                 appenders[i].reset(appenders[i].getBuffer(), true);
             }
         }
+
+        @Override
+        public void flush() throws HyracksDataException {
+            for (int i = 0; i < appenders.length; ++i) {
+                FrameTupleAppender appender = appenders[i];
+                if (appender.getTupleCount() > 0) {
+                    ByteBuffer buffer = appender.getBuffer();
+                    IFrameWriter frameWriter = epWriters[i];
+                    flushFrame(buffer, frameWriter);
+                    epWriters[i].flush();
+                    appender.reset(buffer, true);
+                }
+            }
+        }
     }
 
     private final int partitioningField;
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java
index 505c616..1ee7fda 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java
@@ -35,8 +35,9 @@
     private static final long serialVersionUID = 1L;
 
     @Override
-    public IFrameWriter createSendSideWriter(IHyracksContext ctx, RecordDescriptor recordDesc, IEndpointDataWriterFactory edwFactory,
-        int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
+    public IFrameWriter createSendSideWriter(IHyracksContext ctx, RecordDescriptor recordDesc,
+            IEndpointDataWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
+            throws HyracksDataException {
         final IFrameWriter[] epWriters = new IFrameWriter[nConsumerPartitions];
         for (int i = 0; i < nConsumerPartitions; ++i) {
             epWriters[i] = edwFactory.createFrameWriter(i);
@@ -66,12 +67,20 @@
                     epWriters[i].open();
                 }
             }
+
+            @Override
+            public void flush() throws HyracksDataException {
+                for (int i = 0; i < epWriters.length; ++i) {
+                    epWriters[i].flush();
+                }
+            }
         };
     }
 
     @Override
-    public IFrameReader createReceiveSideReader(IHyracksContext ctx, RecordDescriptor recordDesc, IConnectionDemultiplexer demux,
-        int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
+    public IFrameReader createReceiveSideReader(IHyracksContext ctx, RecordDescriptor recordDesc,
+            IConnectionDemultiplexer demux, int index, int nProducerPartitions, int nConsumerPartitions)
+            throws HyracksDataException {
         return new NonDeterministicFrameReader(ctx, demux);
     }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java
index 59d6ed5..a568781 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java
@@ -31,6 +31,7 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 
 public class HashGroupOperatorDescriptor extends AbstractOperatorDescriptor {
     private static final String HASHTABLE = "hashtable";
@@ -104,6 +105,10 @@
                 public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
                     throw new IllegalArgumentException();
                 }
+
+                @Override
+                public void flush() throws HyracksDataException {
+                }
             };
         }
 
@@ -119,9 +124,7 @@
         @Override
         public IOperatorNodePushable createPushRuntime(IHyracksContext ctx, final IOperatorEnvironment env,
                 IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
-            return new IOperatorNodePushable() {
-                private IFrameWriter writer;
-
+            return new AbstractUnaryOutputSourceOperatorNodePushable() {
                 @Override
                 public void open() throws HyracksDataException {
                     GroupingHashTable table = (GroupingHashTable) env.get(HASHTABLE);
@@ -132,22 +135,9 @@
                 }
 
                 @Override
-                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                    throw new IllegalStateException();
-                }
-
-                @Override
                 public void close() throws HyracksDataException {
                     // do nothing
                 }
-
-                @Override
-                public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
-                    if (index != 0) {
-                        throw new IllegalArgumentException();
-                    }
-                    this.writer = writer;
-                }
             };
         }
 
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
index 0b254da..c124cb3 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
@@ -42,6 +42,7 @@
 import edu.uci.ics.hyracks.dataflow.common.data.partition.RepartitionComputerFactory;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 
 public class GraceHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
     private static final String SMALLRELATION = "RelR";
@@ -213,6 +214,9 @@
                     }
                 }
 
+                @Override
+                public void flush() throws HyracksDataException {
+                }
             };
             return op;
         }
@@ -236,10 +240,9 @@
             final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
             final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
 
-            IOperatorNodePushable op = new IOperatorNodePushable() {
+            IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
                 private InMemoryHashJoin joiner;
 
-                private IFrameWriter writer;
                 private FileChannel[] channelsR;
                 private FileChannel[] channelsS;
                 private int numPartitions;
@@ -318,24 +321,11 @@
                 }
 
                 @Override
-                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                    throw new IllegalStateException();
-                }
-
-                @Override
                 public void close() throws HyracksDataException {
                     env.set(LARGERELATION, null);
                     env.set(SMALLRELATION, null);
                     env.set(NUM_PARTITION, null);
                 }
-
-                @Override
-                public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
-                    if (index != 0) {
-                        throw new IllegalStateException();
-                    }
-                    this.writer = writer;
-                }
             };
             return op;
         }
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
index 24b655d9..1da6b61 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
@@ -313,6 +313,9 @@
                     ftappender.reset(inBuffer, true);
                 }
 
+                @Override
+                public void flush() throws HyracksDataException {
+                }
             };
             return op;
         }
@@ -553,6 +556,11 @@
                     }
                     this.writer = writer;
                 }
+
+                @Override
+                public void flush() throws HyracksDataException {
+                    writer.flush();
+                }
             };
             return op;
         }
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
index cf9a373..7e64312 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
@@ -116,6 +116,10 @@
                 public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
                     throw new IllegalArgumentException();
                 }
+
+                @Override
+                public void flush() throws HyracksDataException {
+                }
             };
             return op;
         }
@@ -161,6 +165,11 @@
                     }
                     this.writer = writer;
                 }
+
+                @Override
+                public void flush() throws HyracksDataException {
+                    writer.flush();
+                }
             };
             return op;
         }
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
index 6b3de09..0bfe83d 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
@@ -32,6 +32,7 @@
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 
 public class MaterializingOperatorDescriptor extends AbstractOperatorDescriptor {
     private static final long serialVersionUID = 1L;
@@ -110,6 +111,10 @@
                 public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
                     throw new IllegalArgumentException();
                 }
+
+                @Override
+                public void flush() throws HyracksDataException {
+                }
             };
         }
 
@@ -125,9 +130,7 @@
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksContext ctx, final IOperatorEnvironment env,
                 IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
-            return new IOperatorNodePushable() {
-                private IFrameWriter writer;
-
+            return new AbstractUnaryOutputSourceOperatorNodePushable() {
                 @Override
                 public void open() throws HyracksDataException {
                     try {
@@ -154,22 +157,9 @@
                 }
 
                 @Override
-                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                    throw new IllegalStateException();
-                }
-
-                @Override
                 public void close() throws HyracksDataException {
                     env.set(MATERIALIZED_FILE, null);
                 }
-
-                @Override
-                public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
-                    if (index != 0) {
-                        throw new IllegalArgumentException();
-                    }
-                    this.writer = writer;
-                }
             };
         }
 
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java
index 3b7b02c..b35954f 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java
@@ -38,6 +38,10 @@
             @Override
             public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
             }
+
+            @Override
+            public void flush() throws HyracksDataException {
+            }
         };
     }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
index ea2a556..4594685 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
@@ -43,6 +43,7 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 import edu.uci.ics.hyracks.dataflow.std.util.ReferenceEntry;
 import edu.uci.ics.hyracks.dataflow.std.util.ReferencedPriorityQueue;
 
@@ -275,6 +276,10 @@
                 public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
                     throw new IllegalArgumentException();
                 }
+
+                @Override
+                public void flush() throws HyracksDataException {
+                }
             };
             return op;
         }
@@ -295,8 +300,7 @@
             for (int i = 0; i < comparatorFactories.length; ++i) {
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
             }
-            IOperatorNodePushable op = new IOperatorNodePushable() {
-                private IFrameWriter writer;
+            IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
                 private List<ByteBuffer> inFrames;
                 private ByteBuffer outFrame;
                 LinkedList<File> runs;
@@ -336,23 +340,10 @@
                 }
 
                 @Override
-                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                    throw new IllegalStateException();
-                }
-
-                @Override
                 public void close() throws HyracksDataException {
                     // do nothing
                 }
 
-                @Override
-                public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
-                    if (index != 0) {
-                        throw new IllegalArgumentException();
-                    }
-                    this.writer = writer;
-                }
-
                 // creates a new run from runs that can fit in memory.
                 private void doPass(LinkedList<File> runs, int passCount) throws ClassNotFoundException, Exception {
                     File newRun = null;
@@ -559,6 +550,10 @@
                 throw new HyracksDataException(e);
             }
         }
+
+        @Override
+        public void flush() throws HyracksDataException {
+        }
     }
 
     public static class RunFileReader implements IFrameReader {
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
index 0b95a5a..be81ecd 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
@@ -35,6 +35,7 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 
 public class InMemorySortOperatorDescriptor extends AbstractOperatorDescriptor {
     private static final String BUFFERS = "buffers";
@@ -208,6 +209,10 @@
                 public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
                     throw new IllegalArgumentException();
                 }
+
+                @Override
+                public void flush() throws HyracksDataException {
+                }
             };
             return op;
         }
@@ -224,9 +229,7 @@
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksContext ctx, final IOperatorEnvironment env,
                 IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
-            IOperatorNodePushable op = new IOperatorNodePushable() {
-                private IFrameWriter writer;
-
+            IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
                 @Override
                 public void open() throws HyracksDataException {
                     List<ByteBuffer> buffers = (List<ByteBuffer>) env.get(BUFFERS);
@@ -265,22 +268,9 @@
                 }
 
                 @Override
-                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                    throw new IllegalStateException();
-                }
-
-                @Override
                 public void close() throws HyracksDataException {
                     // do nothing
                 }
-
-                @Override
-                public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
-                    if (index != 0) {
-                        throw new IllegalArgumentException();
-                    }
-                    this.writer = writer;
-                }
             };
             return op;
         }
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/DeserializedOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/DeserializedOperatorNodePushable.java
index 13bba15..037e546 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/DeserializedOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/DeserializedOperatorNodePushable.java
@@ -61,4 +61,8 @@
     public void open() throws HyracksDataException {
         delegate.open();
     }
+
+    @Override
+    public void flush() throws HyracksDataException {
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/comm/SerializationDeserializationTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/comm/SerializationDeserializationTest.java
index 6ddc8ec..56569d9 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/comm/SerializationDeserializationTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/comm/SerializationDeserializationTest.java
@@ -69,6 +69,10 @@
                 public void close() throws HyracksDataException {
 
                 }
+
+                @Override
+                public void flush() throws HyracksDataException {
+                }
             });
         }
 
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeBulkLoadOperatorNodePushable.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeBulkLoadOperatorNodePushable.java
index e88cca4..d6b0036 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeBulkLoadOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeBulkLoadOperatorNodePushable.java
@@ -68,5 +68,8 @@
 			e.printStackTrace();
 		}
 	}
-	
+
+    @Override
+    public void flush() throws HyracksDataException {
+    }	
 }
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDiskOrderScanOperatorNodePushable.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDiskOrderScanOperatorNodePushable.java
index 47c612b..74c878a 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDiskOrderScanOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDiskOrderScanOperatorNodePushable.java
@@ -90,4 +90,8 @@
 	@Override
 	public void close() throws HyracksDataException {            	
 	}
+
+    @Override
+    public void flush() throws HyracksDataException {
+    }
 }
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeInsertOperatorNodePushable.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeInsertOperatorNodePushable.java
index 6cc64ee..eddbbfa 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeInsertOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeInsertOperatorNodePushable.java
@@ -63,5 +63,9 @@
 		} catch (Exception e) {
 			e.printStackTrace();
 		}
-	}	
+	}
+
+    @Override
+    public void flush() throws HyracksDataException {
+    }	
 }
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
index 69f6490..e360be2 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
@@ -108,5 +108,9 @@
 	
 	@Override
 	public void close() throws HyracksDataException {            	
-	}			
-}
+	}
+
+    @Override
+    public void flush() throws HyracksDataException {
+    }			
+}
\ No newline at end of file