[NO ISSUE][RT] Collect Tuple Stats in External Scan

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
- Report number of procssed tuples in external scan operator.
- Add test case.

Change-Id: I5dda25f1fc53581dcc5663f2516e79b9b66fd0a5
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/6224
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/metrics.xml b/asterixdb/asterix-app/src/test/resources/runtimets/metrics.xml
index da7ba31..a2c3ae5 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/metrics.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/metrics.xml
@@ -40,5 +40,10 @@
         <output-dir compare="Text">secondary-index-index-only</output-dir>
       </compilation-unit>
     </test-case>
+    <test-case FilePath="metrics">
+      <compilation-unit name="external-dataset">
+        <output-dir compare="Text">external-dataset</output-dir>
+      </compilation-unit>
+    </test-case>
   </test-group>
 </test-suite>
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/external-dataset/external-dataset.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/external-dataset/external-dataset.1.ddl.sqlpp
new file mode 100644
index 0000000..a105dfe
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/external-dataset/external-dataset.1.ddl.sqlpp
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Processed objects metrics on external dataset
+ * Expected Res : Success
+ */
+
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+
+USE test;
+
+CREATE TYPE t1 AS {f1: string, f2: string, f3: string, f4: string, f5: string};
+
+CREATE EXTERNAL DATASET ds1(t1) USING localfs(("path"="asterix_nc1://data/csv/sample_09.csv"), ("format"="CSV"), ("header"="FALSE"));
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/external-dataset/external-dataset.2.metrics.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/external-dataset/external-dataset.2.metrics.sqlpp
new file mode 100644
index 0000000..e57e938
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/external-dataset/external-dataset.2.metrics.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Processed objects metrics on external dataset
+ * Expected Res : Success
+ */
+
+USE test;
+
+SELECT COUNT(*) from ds1;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/external-dataset/external-datasett.3.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/external-dataset/external-datasett.3.ddl.sqlpp
new file mode 100644
index 0000000..0bf95e4
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/external-dataset/external-datasett.3.ddl.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Processed objects metrics on external dataset
+ * Expected Res : Success
+ */
+
+DROP DATAVERSE test;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/metrics/external-dataset/external-dataset.2.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/metrics/external-dataset/external-dataset.2.regexadm
new file mode 100644
index 0000000..ae84a71
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/metrics/external-dataset/external-dataset.2.regexadm
@@ -0,0 +1 @@
+.*"processedObjects":15.*
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IDataSourceAdapter.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IDataSourceAdapter.java
index 18f59f2..8fc70b8 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IDataSourceAdapter.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IDataSourceAdapter.java
@@ -49,4 +49,11 @@
      * @throws Exception
      */
     public void start(int partition, IFrameWriter writer) throws HyracksDataException, InterruptedException;
+
+    /**
+     * @return The number of processed tuples by this adapter
+     */
+    default long getProcessedTuples() {
+        throw new UnsupportedOperationException();
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
index f59b82e..ccc420b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
@@ -43,4 +43,11 @@
     public default boolean stop(long timeout) throws HyracksDataException {
         throw new RuntimeDataException(ErrorCode.OPERATION_NOT_SUPPORTED);
     }
+
+    /**
+     * @return The number of processed tuples by this controller
+     */
+    default long getProcessedTuples() {
+        throw new UnsupportedOperationException();
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractDataFlowController.java
index a324496..95024e1 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractDataFlowController.java
@@ -24,8 +24,14 @@
 public abstract class AbstractDataFlowController implements IDataFlowController {
 
     protected final IHyracksTaskContext ctx;
+    protected long processedTuples = 0;
 
     public AbstractDataFlowController(IHyracksTaskContext ctx) {
         this.ctx = ctx;
     }
+
+    @Override
+    public long getProcessedTuples() {
+        return processedTuples;
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java
index 34379e9..2c19f9d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java
@@ -44,6 +44,7 @@
     @Override
     public void start(IFrameWriter writer) throws HyracksDataException {
         try {
+            processedTuples = 0;
             ArrayTupleBuilder tb = new ArrayTupleBuilder(numOfTupleFields);
             TupleForwarder tupleForwarder = new TupleForwarder(ctx, writer);
             while (recordReader.hasNext()) {
@@ -53,6 +54,7 @@
                     tb.addFieldEndOffset();
                     appendOtherTupleFields(tb);
                     tupleForwarder.addTuple(tb);
+                    processedTuples++;
                 }
             }
             tupleForwarder.complete();
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java
index 8275953..9c11c97 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java
@@ -35,6 +35,7 @@
     @Override
     public void start(IFrameWriter writer) throws HyracksDataException {
         try {
+            processedTuples = 0;
             ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
             TupleForwarder tupleForwarder = new TupleForwarder(ctx, writer);
             while (true) {
@@ -44,6 +45,7 @@
                 }
                 tb.addFieldEndOffset();
                 tupleForwarder.addTuple(tb);
+                processedTuples++;
             }
             tupleForwarder.complete();
         } catch (Exception e) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
index 0ab59fe..123a552 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
@@ -38,6 +38,11 @@
         controller.start(writer);
     }
 
+    @Override
+    public long getProcessedTuples() {
+        return controller.getProcessedTuples();
+    }
+
     public boolean stop(long timeout) throws HyracksDataException {
         return controller.stop(timeout);
     }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
index 0904384..17a134b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
@@ -35,4 +35,9 @@
     public void start(int partition, IFrameWriter writer) throws HyracksDataException, InterruptedException {
         controller.start(writer);
     }
+
+    @Override
+    public long getProcessedTuples() {
+        return controller.getProcessedTuples();
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalScanOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalScanOperatorDescriptor.java
index 4fd5151..1d7623d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalScanOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalScanOperatorDescriptor.java
@@ -26,6 +26,7 @@
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.profiling.IOperatorStats;
 import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 
@@ -53,13 +54,21 @@
 
         return new AbstractUnaryOutputSourceOperatorNodePushable() {
 
+            private IOperatorStats stats;
+
             @Override
             public void initialize() throws HyracksDataException {
-                IDataSourceAdapter adapter = null;
+                IDataSourceAdapter adapter;
+                if (ctx.getStatsCollector() != null) {
+                    stats = ctx.getStatsCollector().getOrAddOperatorStats(getDisplayName());
+                }
                 try {
                     writer.open();
                     adapter = adapterFactory.createAdapter(ctx, partition);
                     adapter.start(partition, writer);
+                    if (stats != null) {
+                        stats.getTupleCounter().update(adapter.getProcessedTuples());
+                    }
                 } catch (Exception e) {
                     writer.fail();
                     throw HyracksDataException.create(e);