[NO ISSUE][RT] Always write frame on flush

- user model changes: no
- storage format changes: no
- interface changes: no

Details:

- When flush is called on a frame appender, always push
  the frame to the next writer even if the frame has no
  tuples. This gives each operator a chance to perform
  any non-tuple related operations even if the frame is
  empty.
- Update operators to properly handle empty frames.
- Add test case for join with empty dataset.

Change-Id: I269c3a532e2327d83e22e7276db3856d371f7105
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/7764
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Dmitry Lychagin <dmitry.lychagin@couchbase.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/join/join-with-empty-dataset/join-with-empty-dataset.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/join/join-with-empty-dataset/join-with-empty-dataset.1.ddl.sqlpp
new file mode 100644
index 0000000..7502f37
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/join/join-with-empty-dataset/join-with-empty-dataset.1.ddl.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+create type openType as {id: string};
+create dataset ds1(openType) primary key id;
+create dataset ds2(openType) primary key id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/join/join-with-empty-dataset/join-with-empty-dataset.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/join/join-with-empty-dataset/join-with-empty-dataset.2.update.sqlpp
new file mode 100644
index 0000000..ec06976
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/join/join-with-empty-dataset/join-with-empty-dataset.2.update.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+insert into ds1 {"id": "1", "f": 3};
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/join/join-with-empty-dataset/join-with-empty-dataset.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/join/join-with-empty-dataset/join-with-empty-dataset.3.query.sqlpp
new file mode 100644
index 0000000..57ab8c1
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/join/join-with-empty-dataset/join-with-empty-dataset.3.query.sqlpp
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+from ds1 left outer join ds2 on ds1.f = ds2.f
+select ds1, ds2;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/join/join-with-empty-dataset/join-with-empty-dataset.4.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/join/join-with-empty-dataset/join-with-empty-dataset.4.query.sqlpp
new file mode 100644
index 0000000..d0b21b8
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/join/join-with-empty-dataset/join-with-empty-dataset.4.query.sqlpp
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+from ds2 left outer join ds1 on ds2.f = ds1.f
+select ds1, ds2;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/join/join-with-empty-dataset/join-with-empty-dataset.5.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/join/join-with-empty-dataset/join-with-empty-dataset.5.query.sqlpp
new file mode 100644
index 0000000..3dba765
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/join/join-with-empty-dataset/join-with-empty-dataset.5.query.sqlpp
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+from ds1 join ds2 on ds1.f = ds2.f
+select ds1, ds2;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/join/join-with-empty-dataset/join-with-empty-dataset.6.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/join/join-with-empty-dataset/join-with-empty-dataset.6.query.sqlpp
new file mode 100644
index 0000000..851be3a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/join/join-with-empty-dataset/join-with-empty-dataset.6.query.sqlpp
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+from ds2 join ds1 on ds2.f = ds1.f
+select ds1, ds2;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/join/join-with-empty-dataset/join-with-empty-dataset.03.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/join/join-with-empty-dataset/join-with-empty-dataset.03.adm
new file mode 100644
index 0000000..2cddb05
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/join/join-with-empty-dataset/join-with-empty-dataset.03.adm
@@ -0,0 +1 @@
+{ "ds1": { "id": "1", "f": 3 } }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/join/join-with-empty-dataset/join-with-empty-dataset.04.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/join/join-with-empty-dataset/join-with-empty-dataset.04.adm
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/join/join-with-empty-dataset/join-with-empty-dataset.04.adm
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/join/join-with-empty-dataset/join-with-empty-dataset.05.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/join/join-with-empty-dataset/join-with-empty-dataset.05.adm
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/join/join-with-empty-dataset/join-with-empty-dataset.05.adm
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/join/join-with-empty-dataset/join-with-empty-dataset.06.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/join/join-with-empty-dataset/join-with-empty-dataset.06.adm
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/join/join-with-empty-dataset/join-with-empty-dataset.06.adm
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index a390875..9e83b57 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -6188,6 +6188,11 @@
         <output-dir compare="Text">hash_join_record</output-dir>
       </compilation-unit>
     </test-case>
+    <test-case FilePath="join">
+      <compilation-unit name="join-with-empty-dataset">
+        <output-dir compare="Text">join-with-empty-dataset</output-dir>
+      </compilation-unit>
+    </test-case>
   </test-group>
   <test-group name="list">
     <test-case FilePath="list">
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
index 71b44d3..9f4541f5 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
@@ -70,13 +70,11 @@
     }
 
     protected void flushAndReset() throws HyracksDataException {
-        if (appender.getTupleCount() > 0) {
-            appender.write(writer, true);
-        }
+        appender.write(writer, true);
     }
 
     protected void flushIfNotFailed() throws HyracksDataException {
-        if (!failed) {
+        if (!failed && appender.getTupleCount() > 0) {
             flushAndReset();
         }
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
index 3cee12d..6e5c2c9 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
@@ -224,7 +224,9 @@
                 for (int t = 0; t < nTuple; t++) {
                     appendConcat(tRef.getFrameTupleAccessor(), tRef.getTupleIndex(), ta, t);
                 }
-                smthWasWritten = true;
+                if (nTuple > 0) {
+                    smthWasWritten = true;
+                }
             }
 
             @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
index a9371e9..59ab2f4 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
@@ -110,14 +110,6 @@
         return false;
     }
 
-    @Override
-    public void flush(IFrameWriter writer) throws HyracksDataException {
-        if (tupleCount > 0) {
-            write(writer, true);
-        }
-        writer.flush();
-    }
-
     public void flush(IFrameWriter writer, ITracer tracer, String name, long traceCategory, String args)
             throws HyracksDataException {
         final long tid = tracer.durationB(name, traceCategory, args);
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
index fb2d4e9..4b1e4aa 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
@@ -112,10 +112,13 @@
     }
 
     public void build(ByteBuffer buffer) throws HyracksDataException {
-        buffers.add(buffer);
-        int bIndex = buffers.size() - 1;
         accessorBuild.reset(buffer);
         int tCount = accessorBuild.getTupleCount();
+        if (tCount <= 0) {
+            return;
+        }
+        buffers.add(buffer);
+        int bIndex = buffers.size() - 1;
         for (int i = 0; i < tCount; ++i) {
             int entry = tpcBuild.partition(accessorBuild, i, table.getTableSize());
             storedTuplePointer.reset(bIndex, i);
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
index 361d1ee..b7693c7 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
@@ -96,7 +96,10 @@
     }
 
     public void cache(ByteBuffer buffer) throws HyracksDataException {
-        runFileWriter.nextFrame(buffer);
+        accessorInner.reset(buffer);
+        if (accessorInner.getTupleCount() > 0) {
+            runFileWriter.nextFrame(buffer);
+        }
     }
 
     /**
@@ -109,6 +112,10 @@
     }
 
     public void join(ByteBuffer outerBuffer, IFrameWriter writer) throws HyracksDataException {
+        accessorOuter.reset(outerBuffer);
+        if (accessorOuter.getTupleCount() <= 0) {
+            return;
+        }
         if (outerBufferMngr.insertFrame(outerBuffer) < 0) {
             RunFileReader runFileReader = runFileWriter.createReader();
             try {
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java
index 74223d8..b121fbb 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java
@@ -139,6 +139,9 @@
     @Override
     public boolean insertFrame(ByteBuffer inputBuffer) throws HyracksDataException {
         inputTupleAccessor.reset(inputBuffer);
+        if (inputTupleAccessor.getTupleCount() <= 0) {
+            return true;
+        }
         long requiredMemory = getRequiredMemory(inputTupleAccessor);
         if (totalMemoryUsed + requiredMemory <= maxSortMemory && bufferManager.insertFrame(inputBuffer) >= 0) {
             // we have enough memory