Fix for issue 367. Project ops between the commit and the first insert op will push the frames without waiting until they get full.
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/ProjectOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/ProjectOperator.java
index 459e5dd..d2593cb 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/ProjectOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/ProjectOperator.java
@@ -30,14 +30,17 @@
 public class ProjectOperator extends AbstractLogicalOperator {
 
     private final List<LogicalVariable> variables;
+    private boolean flushFramesRapidly;
 
     public ProjectOperator(List<LogicalVariable> variables) {
         this.variables = variables;
+        this.flushFramesRapidly = false;
     }
 
     public ProjectOperator(LogicalVariable v) {
         this.variables = new ArrayList<LogicalVariable>(1);
         this.getVariables().add(v);
+        this.flushFramesRapidly = false;
     }
 
     @Override
@@ -75,6 +78,14 @@
         return variables;
     }
 
+    public boolean isRapidFrameFlush() {
+        return flushFramesRapidly;
+    }
+    
+    public void setRapidFrameFlush(boolean flushFramesRapidly) {
+        this.flushFramesRapidly = flushFramesRapidly;
+    }
+
     @Override
     public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
         return createPropagatingAllInputsTypeEnvironment(ctx);
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java
index 54c0505..90c58a0 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java
@@ -61,8 +61,10 @@
             }
             projectionList[i++] = pos;
         }
-        StreamProjectRuntimeFactory runtime = new StreamProjectRuntimeFactory(projectionList);
-        RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
+        StreamProjectRuntimeFactory runtime = new StreamProjectRuntimeFactory(projectionList,
+                project.isRapidFrameFlush());
+        RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema,
+                context);
         builder.contributeMicroOperator(project, runtime, recDesc);
         ILogicalOperator src = project.getInputs().get(0).getValue();
         builder.contributeGraphEdge(src, 0, project, 0);
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
index cd9931b..78d2bed 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
@@ -64,6 +64,23 @@
         }
     }
 
+    protected void appendProjectionToFrame(int tIndex, int[] projectionList, boolean flushFrame)
+            throws HyracksDataException {
+        if (!appender.appendProjection(tAccess, tIndex, projectionList)) {
+            FrameUtils.flushFrame(frame, writer);
+            appender.reset(frame, true);
+            if (!appender.appendProjection(tAccess, tIndex, projectionList)) {
+                throw new IllegalStateException(
+                        "Could not write frame (AbstractOneInputOneOutputOneFramePushRuntime.appendProjectionToFrame).");
+            }
+            return;
+        }
+        if (flushFrame) {
+            FrameUtils.flushFrame(frame, writer);
+            appender.reset(frame, true);
+        }
+    }
+
     protected void appendTupleToFrame(int tIndex) throws HyracksDataException {
         if (!appender.append(tAccess, tIndex)) {
             FrameUtils.flushFrame(frame, writer);
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
index 455ad8e..da881ec 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
@@ -26,9 +26,16 @@
 public class StreamProjectRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
 
     private static final long serialVersionUID = 1L;
+    private final boolean flushFramesRapidly;
+
+    public StreamProjectRuntimeFactory(int[] projectionList, boolean flushFramesRapidly) {
+        super(projectionList);
+        this.flushFramesRapidly = flushFramesRapidly;
+    }
 
     public StreamProjectRuntimeFactory(int[] projectionList) {
         super(projectionList);
+        this.flushFramesRapidly = false;
     }
 
     @Override
@@ -57,8 +64,18 @@
             public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
                 tAccess.reset(buffer);
                 int nTuple = tAccess.getTupleCount();
-                for (int t = 0; t < nTuple; t++) {
-                    appendProjectionToFrame(t, projectionList);
+                if (flushFramesRapidly) {
+                    // Whenever all the tuples in the incoming frame have been consumed, the project operator 
+                    // will push its frame to the next operator; i.e., it won't wait until the frame gets full. 
+                    int t = 0;
+                    for (; t < nTuple - 1; t++) {
+                        appendProjectionToFrame(t, projectionList);
+                    }
+                    appendProjectionToFrame(t, projectionList, true);
+                } else {
+                    for (int t = 0; t < nTuple; t++) {
+                        appendProjectionToFrame(t, projectionList);
+                    }
                 }
 
             }