Merge branch 'master' into yingyi/fullstack_fix
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
index aea04b2..55da00e 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
@@ -38,6 +38,8 @@
 
 public class AssignPOperator extends AbstractPhysicalOperator {
 
+    private boolean flushFramesRapidly;
+
     @Override
     public PhysicalOperatorTag getOperatorTag() {
         return PhysicalOperatorTag.ASSIGN;
@@ -76,7 +78,8 @@
         // TODO push projections into the operator
         int[] projectionList = JobGenHelper.projectAllVariables(opSchema);
 
-        AssignRuntimeFactory runtime = new AssignRuntimeFactory(outColumns, evalFactories, projectionList);
+        AssignRuntimeFactory runtime = new AssignRuntimeFactory(outColumns, evalFactories, projectionList,
+                flushFramesRapidly);
 
         // contribute one Asterix framewriter
         RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
@@ -92,4 +95,8 @@
         return true;
     }
 
+    public void setRapidFrameFlush(boolean flushFramesRapidly) {
+        this.flushFramesRapidly = flushFramesRapidly;
+    }
+
 }
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
index 082e98a..6c1dd5e 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
@@ -43,6 +43,10 @@
     }
 
     protected void appendToFrameFromTupleBuilder(ArrayTupleBuilder tb) throws HyracksDataException {
+        appendToFrameFromTupleBuilder(tb, false);
+    }
+
+    protected void appendToFrameFromTupleBuilder(ArrayTupleBuilder tb, boolean flushFrame) throws HyracksDataException {
         if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
             FrameUtils.flushFrame(frame, writer);
             appender.reset(frame, true);
@@ -51,6 +55,10 @@
                         "Could not write frame (AbstractOneInputOneOutputOneFramePushRuntime.appendToFrameFromTupleBuilder).");
             }
         }
+        if (flushFrame) {
+            FrameUtils.flushFrame(frame, writer);
+            appender.reset(frame, true);
+        }
     }
 
     protected void appendProjectionToFrame(int tIndex, int[] projectionList) throws HyracksDataException {
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
index fa99c15..fb889ea 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
@@ -36,6 +36,7 @@
 
     private int[] outColumns;
     private IScalarEvaluatorFactory[] evalFactories;
+    private final boolean flushFramesRapidly;
 
     /**
      * @param outColumns
@@ -46,9 +47,15 @@
      */
 
     public AssignRuntimeFactory(int[] outColumns, IScalarEvaluatorFactory[] evalFactories, int[] projectionList) {
+        this(outColumns, evalFactories, projectionList, false);
+    }
+
+    public AssignRuntimeFactory(int[] outColumns, IScalarEvaluatorFactory[] evalFactories, int[] projectionList,
+            boolean flushFramesRapidly) {
         super(projectionList);
         this.outColumns = outColumns;
         this.evalFactories = evalFactories;
+        this.flushFramesRapidly = flushFramesRapidly;
     }
 
     @Override
@@ -107,9 +114,22 @@
             public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
                 tAccess.reset(buffer);
                 int nTuple = tAccess.getTupleCount();
-                for (int t = 0; t < nTuple; t++) {
-                    tRef.reset(tAccess, t);
-                    produceTuple(tupleBuilder, tAccess, t, tRef);
+                int t = 0;
+                if (nTuple > 1) {
+                    for (; t < nTuple - 1; t++) {
+                        tRef.reset(tAccess, t);
+                        produceTuple(tupleBuilder, tAccess, t, tRef);
+                        appendToFrameFromTupleBuilder(tupleBuilder);
+                    }
+                }
+
+                tRef.reset(tAccess, t);
+                produceTuple(tupleBuilder, tAccess, t, tRef);
+                if (flushFramesRapidly) {
+                    // Whenever all the tuples in the incoming frame have been consumed, the assign operator 
+                    // will push its frame to the next operator; i.e., it won't wait until the frame gets full. 
+                    appendToFrameFromTupleBuilder(tupleBuilder, true);
+                } else {
                     appendToFrameFromTupleBuilder(tupleBuilder);
                 }
             }
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
index 7f10948..3e87f31 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
@@ -34,8 +34,7 @@
     }
 
     public StreamProjectRuntimeFactory(int[] projectionList) {
-        super(projectionList);
-        this.flushFramesRapidly = false;
+        this(projectionList, false);
     }
 
     @Override
@@ -66,8 +65,10 @@
                 int nTuple = tAccess.getTupleCount();
 
                 int t = 0;
-                for (; t < nTuple - 1; t++) {
-                    appendProjectionToFrame(t, projectionList);
+                if (nTuple > 1) {
+                    for (; t < nTuple - 1; t++) {
+                        appendProjectionToFrame(t, projectionList);
+                    }
                 }
                 if (flushFramesRapidly) {
                     // Whenever all the tuples in the incoming frame have been consumed, the project operator 
@@ -76,10 +77,8 @@
                 } else {
                     appendProjectionToFrame(t, projectionList);
                 }
-
             }
 
         };
     }
-
 }