Merge branch 'gerrit/stabilization-667a908755' into neo

Change-Id: I631115c8e29803fc32188559c48ebbbd16d5ccfa
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameOutputStream.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameOutputStream.java
index d3af00d..241e3e2 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameOutputStream.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameOutputStream.java
@@ -51,9 +51,7 @@
         if (LOGGER.isTraceEnabled()) {
             LOGGER.trace("appendTuple(): tuple size: " + count);
         }
-        boolean appended = frameTupleAppender.append(buf, 0, count);
-        count = 0;
-        return appended;
+        return frameTupleAppender.append(buf, 0, count);
     }
 
     public void flush(IFrameWriter writer) throws HyracksDataException {
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
index a4cfa13..426121c 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
@@ -31,6 +31,7 @@
 import org.apache.hyracks.api.dataflow.value.IResultSerializer;
 import org.apache.hyracks.api.dataflow.value.IResultSerializerFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
@@ -43,14 +44,12 @@
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 
 public class ResultWriterOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
     private static final long serialVersionUID = 1L;
 
     private final ResultSetId rsId;
-
     private final IResultMetadata metadata;
-
     private final boolean asyncMode;
-
     private final IResultSerializerFactory resultSerializerFactory;
     private final long maxReads;
 
@@ -105,10 +104,12 @@
                     resultSerializer.appendTuple(frameTupleAccessor, tIndex);
                     if (!frameOutputStream.appendTuple()) {
                         frameOutputStream.flush(resultPartitionWriter);
-
-                        resultSerializer.appendTuple(frameTupleAccessor, tIndex);
-                        frameOutputStream.appendTuple();
+                        if (!frameOutputStream.appendTuple()) {
+                            throw HyracksDataException.create(ErrorCode.TUPLE_CANNOT_FIT_INTO_EMPTY_FRAME,
+                                    frameOutputStream.getLength());
+                        }
                     }
+                    frameOutputStream.reset();
                 }
             }