[ASTERIXDB-2108][API][RT] Add Processed Objects Metric
- user model changes: no
- storage format changes: no
- interface changes: yes
Introduced IOperatorStats and IStatsCollector APIs to collect
runtime stats.
Details:
- Introduce OperatorStats API to report operators runtime stats.
- Introduce StatsCollector API to report task runtime stats.
- Implement OperatorStats for IndexSearchOperatorNodePushable
(tuple counter only).
- Add "processedObjects" metric to QueryService API.
- Add Stats to ExecuteStatementResponseMessage to pass stats
from CC to NCQueryService.
- Add metrics test cases.
Change-Id: Ie4afe6a676ef0b8a31d36d7dafc13a4023ebf177
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2032
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
index 57c4809..d76c421 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
@@ -68,11 +68,13 @@
public List<Triple<JobId, ResultSetId, ARecordType>> getResultSets() {
return resultSets;
}
+
}
- public static class Stats {
+ class Stats implements Serializable {
private long count;
private long size;
+ private long processedObjects;
public long getCount() {
return count;
@@ -90,6 +92,13 @@
this.size = size;
}
+ public long getProcessedObjects() {
+ return processedObjects;
+ }
+
+ public void setProcessedObjects(long processedObjects) {
+ this.processedObjects = processedObjects;
+ }
}
/**
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
index f4c3949..da06dd1 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
@@ -118,6 +118,7 @@
IStatementExecutor.ResultMetadata resultMetadata = responseMsg.getMetadata();
if (delivery == IStatementExecutor.ResultDelivery.IMMEDIATE && !resultMetadata.getResultSets().isEmpty()) {
+ stats.setProcessedObjects(responseMsg.getStats().getProcessedObjects());
for (Triple<JobId, ResultSetId, ARecordType> rsmd : resultMetadata.getResultSets()) {
ResultReader resultReader = new ResultReader(getHyracksDataset(), rsmd.getLeft(), rsmd.getMiddle());
ResultUtil.printResults(appCtx, resultReader, sessionOutput, stats, rsmd.getRight());
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index c4f58f6..6a92a26 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -147,7 +147,8 @@
EXECUTION_TIME("executionTime"),
RESULT_COUNT("resultCount"),
RESULT_SIZE("resultSize"),
- ERROR_COUNT("errorCount");
+ ERROR_COUNT("errorCount"),
+ PROCESSED_OBJECTS_COUNT("processedObjects");
private final String str;
@@ -271,7 +272,7 @@
}
private static void printMetrics(PrintWriter pw, long elapsedTime, long executionTime, long resultCount,
- long resultSize, long errorCount) {
+ long resultSize, long processedObjects, long errorCount) {
boolean hasErrors = errorCount != 0;
pw.print("\t\"");
pw.print(ResultFields.METRICS.str());
@@ -283,7 +284,10 @@
pw.print("\t");
ResultUtil.printField(pw, Metrics.RESULT_COUNT.str(), resultCount, true);
pw.print("\t");
- ResultUtil.printField(pw, Metrics.RESULT_SIZE.str(), resultSize, hasErrors);
+ ResultUtil.printField(pw, Metrics.RESULT_SIZE.str(), resultSize, true);
+ pw.print("\t");
+ ResultUtil.printField(pw, Metrics.PROCESSED_OBJECTS_COUNT.str(), processedObjects, hasErrors);
+ pw.print("\t");
if (hasErrors) {
pw.print("\t");
ResultUtil.printField(pw, Metrics.ERROR_COUNT.str(), errorCount, false);
@@ -421,7 +425,7 @@
}
}
printMetrics(resultWriter, System.nanoTime() - elapsedStart, execStartEnd[1] - execStartEnd[0],
- stats.getCount(), stats.getSize(), errorCount);
+ stats.getCount(), stats.getSize(), stats.getProcessedObjects(), errorCount);
resultWriter.print("}\n");
resultWriter.flush();
String result = stringWriter.toString();
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
index 27cdb66..ed683dc 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
@@ -119,13 +119,14 @@
MetadataManager.INSTANCE.init();
IStatementExecutor translator = statementExecutorFactory.create(ccAppCtx, statements, sessionOutput,
compilationProvider, storageComponentProvider);
+ final IStatementExecutor.Stats stats = new IStatementExecutor.Stats();
final IRequestParameters requestParameters =
- new RequestParameters(null, delivery, new IStatementExecutor.Stats(), outMetadata, clientContextID,
- optionalParameters);
+ new RequestParameters(null, delivery, stats, outMetadata, clientContextID, optionalParameters);
translator.compileAndExecute(ccApp.getHcc(), statementExecutorContext, requestParameters);
outPrinter.close();
responseMsg.setResult(outWriter.toString());
responseMsg.setMetadata(outMetadata);
+ responseMsg.setStats(stats);
} catch (AlgebricksException | HyracksException | TokenMgrError
| org.apache.asterix.aqlplus.parser.TokenMgrError pe) {
// we trust that "our" exceptions are serializable and have a comprehensible error message
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java
index 54f0a4e..7475be4 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java
@@ -35,6 +35,8 @@
private IStatementExecutor.ResultMetadata metadata;
+ private IStatementExecutor.Stats stats;
+
private Throwable error;
public ExecuteStatementResponseMessage(long requestMessageId) {
@@ -74,6 +76,14 @@
this.metadata = metadata;
}
+ public IStatementExecutor.Stats getStats() {
+ return stats;
+ }
+
+ public void setStats(IStatementExecutor.Stats stats) {
+ this.stats = stats;
+ }
+
@Override
public String toString() {
return String.format("%s(id=%s): %d characters", getClass().getSimpleName(), requestMessageId,
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 2abc18f..454b501 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -26,6 +26,7 @@
import java.io.InputStreamReader;
import java.rmi.RemoteException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.EnumSet;
@@ -195,7 +196,14 @@
import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.job.IJobManager;
+import org.apache.hyracks.control.cc.job.JobRun;
import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
+import org.apache.hyracks.control.common.job.profiling.om.JobletProfile;
+import org.apache.hyracks.control.common.job.profiling.om.TaskProfile;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
/*
@@ -2367,12 +2375,14 @@
case IMMEDIATE:
createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> {
final ResultReader resultReader = new ResultReader(hdc, id, resultSetId);
+ updateJobStats(id, stats);
ResultUtil.printResults(appCtx, resultReader, sessionOutput, stats,
metadataProvider.findOutputRecordType());
}, clientContextId, ctx);
break;
case DEFERRED:
createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> {
+ updateJobStats(id, stats);
ResultUtil.printResultHandle(sessionOutput, new ResultHandle(id, resultSetId));
if (outMetadata != null) {
outMetadata.getResultSets()
@@ -2385,6 +2395,25 @@
}
}
+ private void updateJobStats(JobId jobId, Stats stats) {
+ final IJobManager jobManager =
+ ((ClusterControllerService) appCtx.getServiceContext().getControllerService()).getJobManager();
+ final JobRun run = jobManager.get(jobId);
+ if (run == null || run.getStatus() != JobStatus.TERMINATED) {
+ return;
+ }
+ final JobProfile jobProfile = run.getJobProfile();
+ final Collection<JobletProfile> jobletProfiles = jobProfile.getJobletProfiles().values();
+ long processedObjects = 0;
+ for (JobletProfile jp : jobletProfiles) {
+ final Collection<TaskProfile> jobletTasksProfile = jp.getTaskProfiles().values();
+ for (TaskProfile tp : jobletTasksProfile) {
+ processedObjects += tp.getStatsCollector().getAggregatedStats().getTupleCounter().get();
+ }
+ }
+ stats.setProcessedObjects(processedObjects);
+ }
+
private void asyncCreateAndRunJob(IHyracksClientConnection hcc, IStatementCompiler compiler, IMetadataLocker locker,
ResultDelivery resultDelivery, String clientContextId, IStatementExecutorContext ctx,
ResultSetId resultSetId, MutableBoolean printed) {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java
index 8c0f8e1..01b280d 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java
@@ -20,7 +20,11 @@
import java.io.InputStream;
import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.EnumSet;
+import java.util.HashMap;
import java.util.Iterator;
+import java.util.Map;
import java.util.logging.Logger;
import org.apache.asterix.common.exceptions.AsterixException;
@@ -40,47 +44,88 @@
*/
public class ResultExtractor {
+ private enum ResultField {
+ RESULTS("results"),
+ REQUEST_ID("requestID"),
+ METRICS("metrics"),
+ CLIENT_CONTEXT_ID("clientContextID"),
+ SIGNATURE("signature"),
+ STATUS("status"),
+ TYPE("type"),
+ ERRORS("errors");
+
+ private static final Map<String, ResultField> fields = new HashMap<>();
+
+ static {
+ for (ResultField field : ResultField.values()) {
+ fields.put(field.getFieldName(), field);
+ }
+ }
+
+ private String fieldName;
+
+ ResultField(String fieldName) {
+ this.fieldName = fieldName;
+ }
+
+ public String getFieldName() {
+ return fieldName;
+ }
+
+ public static ResultField ofFieldName(String fieldName) {
+ return fields.get(fieldName);
+ }
+ }
+
private static final Logger LOGGER = Logger.getLogger(ResultExtractor.class.getName());
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static InputStream extract(InputStream resultStream) throws Exception {
- ObjectMapper om = new ObjectMapper();
- String resultStr = IOUtils.toString(resultStream, Charset.defaultCharset());
- PrettyPrinter singleLine = new SingleLinePrettyPrinter();
- ObjectNode result = om.readValue(resultStr, ObjectNode.class);
+ return extract(resultStream, EnumSet.of(ResultField.RESULTS, ResultField.ERRORS));
+ }
+
+ public static InputStream extractMetrics(InputStream resultStream) throws Exception {
+ return extract(resultStream, EnumSet.of(ResultField.METRICS, ResultField.ERRORS));
+ }
+
+ public static String extractHandle(InputStream resultStream) throws Exception {
+ String result = IOUtils.toString(resultStream, StandardCharsets.UTF_8);
+ ObjectNode resultJson = OBJECT_MAPPER.readValue(result, ObjectNode.class);
+ final JsonNode handle = resultJson.get("handle");
+ if (handle != null) {
+ return handle.asText();
+ } else {
+ JsonNode errors = resultJson.get("errors");
+ if (errors != null) {
+ JsonNode msg = errors.get(0).get("msg");
+ throw new AsterixException(msg.asText());
+ }
+ }
+ return null;
+ }
+
+ private static InputStream extract(InputStream resultStream, EnumSet<ResultField> resultFields) throws Exception {
+ final String resultStr = IOUtils.toString(resultStream, Charset.defaultCharset());
+ final PrettyPrinter singleLine = new SingleLinePrettyPrinter();
+ final ObjectNode result = OBJECT_MAPPER.readValue(resultStr, ObjectNode.class);
LOGGER.fine("+++++++\n" + result + "\n+++++++\n");
- String type = "";
- String status = "";
- StringBuilder resultBuilder = new StringBuilder();
- String field = "";
- String fieldPrefix = "";
- for (Iterator<String> sIter = result.fieldNames(); sIter.hasNext();) {
+ final StringBuilder resultBuilder = new StringBuilder();
+ String field;
+ String fieldPrefix;
+ for (Iterator<String> sIter = result.fieldNames(); sIter.hasNext(); ) {
field = sIter.next();
fieldPrefix = field.split("-")[0];
- switch (fieldPrefix) {
- case "requestID":
- break;
- case "clientContextID":
- break;
- case "signature":
- break;
- case "status":
- status = om.writeValueAsString(result.get(field));
- break;
- case "type":
- type = om.writeValueAsString(result.get(field));
- break;
- case "metrics":
- LOGGER.fine(om.writeValueAsString(result.get(field)));
- break;
- case "errors":
- JsonNode errors = result.get(field).get(0).get("msg");
- if (!result.get("metrics").has("errorCount")) {
- throw new AsterixException("Request reported error but not an errorCount");
- };
- throw new AsterixException(errors.asText());
- case "results":
+ final ResultField extractedResultField = ResultField.ofFieldName(fieldPrefix);
+ if (extractedResultField == null) {
+ throw new AsterixException("Unanticipated field \"" + field + "\"");
+ }
+ if (!resultFields.contains(extractedResultField)) {
+ continue;
+ }
+ switch (extractedResultField) {
+ case RESULTS:
if (result.get(field).size() <= 1) {
if (result.get(field).size() == 0) {
resultBuilder.append("");
@@ -94,44 +139,41 @@
resultBuilder.append(omm.writer(singleLine).writeValueAsString(result.get(field)));
}
} else {
- resultBuilder.append(om.writeValueAsString(result.get(field)));
+ resultBuilder.append(OBJECT_MAPPER.writeValueAsString(result.get(field)));
}
} else {
JsonNode[] fields = Iterators.toArray(result.get(field).elements(), JsonNode.class);
if (fields.length > 1) {
for (JsonNode f : fields) {
if (f.isObject()) {
- resultBuilder.append(om.writeValueAsString(f));
+
+ resultBuilder.append(OBJECT_MAPPER.writeValueAsString(f));
} else {
resultBuilder.append(f.asText());
}
}
}
+
}
break;
+ case ERRORS:
+ final JsonNode errors = result.get(field).get(0).get("msg");
+ if (!result.get(ResultField.METRICS.getFieldName()).has("errorCount")) {
+ throw new AsterixException("Request reported error but not an errorCount");
+ }
+ throw new AsterixException(errors.asText());
+ case REQUEST_ID:
+ case METRICS:
+ case CLIENT_CONTEXT_ID:
+ case SIGNATURE:
+ case STATUS:
+ case TYPE:
+ resultBuilder.append(OBJECT_MAPPER.writeValueAsString(result.get(field)));
+ break;
default:
- throw new AsterixException("Unanticipated field \"" + field + "\"");
+ throw new IllegalStateException("Unexpected result field: " + extractedResultField);
}
}
-
- return IOUtils.toInputStream(resultBuilder.toString());
+ return IOUtils.toInputStream(resultBuilder.toString(), StandardCharsets.UTF_8);
}
-
- public static String extractHandle(InputStream resultStream) throws Exception {
- final Charset utf8 = Charset.forName("UTF-8");
- ObjectMapper om = new ObjectMapper();
- String result = IOUtils.toString(resultStream, utf8);
- ObjectNode resultJson = om.readValue(result, ObjectNode.class);
- final JsonNode handle = resultJson.get("handle");
- if (handle != null) {
- return handle.asText();
- } else {
- JsonNode errors = resultJson.get("errors");
- if (errors != null) {
- JsonNode msg = errors.get(0).get("msg");
- throw new AsterixException(msg.asText());
- }
- }
- return null;
- }
-}
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index b9b7bda..43b61b6 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -67,9 +67,9 @@
import org.apache.asterix.testframework.context.TestCaseContext.OutputFormat;
import org.apache.asterix.testframework.context.TestFileContext;
import org.apache.asterix.testframework.xml.ComparisonEnum;
-import org.apache.asterix.testframework.xml.TestGroup;
import org.apache.asterix.testframework.xml.TestCase.CompilationUnit;
import org.apache.asterix.testframework.xml.TestCase.CompilationUnit.Parameter;
+import org.apache.asterix.testframework.xml.TestGroup;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.output.ByteArrayOutputStream;
@@ -124,6 +124,7 @@
public static final String DELIVERY_ASYNC = "async";
public static final String DELIVERY_DEFERRED = "deferred";
public static final String DELIVERY_IMMEDIATE = "immediate";
+ private static final String METRICS_QUERY_TYPE = "metrics";
private static Method managixExecuteMethod = null;
private static final HashMap<Integer, ITestServer> runningTestServers = new HashMap<>();
@@ -888,6 +889,7 @@
case "query":
case "async":
case "deferred":
+ case "metrics":
// isDmlRecoveryTest: insert Crash and Recovery
if (isDmlRecoveryTest) {
executeScript(pb, pb.environment().get("SCRIPT_HOME") + File.separator + "dml_recovery"
@@ -1203,7 +1205,9 @@
final URI uri = getEndpoint(Servlets.QUERY_SERVICE);
if (DELIVERY_IMMEDIATE.equals(delivery)) {
resultStream = executeQueryService(statement, fmt, uri, params, true, null, true);
- resultStream = ResultExtractor.extract(resultStream);
+ resultStream = METRICS_QUERY_TYPE.equals(reqType) ?
+ ResultExtractor.extractMetrics(resultStream) :
+ ResultExtractor.extract(resultStream);
} else {
String handleVar = getHandleVariable(statement);
resultStream = executeQueryService(statement, fmt, uri, upsertParam(params, "mode", delivery), true);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/MetricsExecutionTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/MetricsExecutionTest.java
new file mode 100644
index 0000000..c7fae46
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/MetricsExecutionTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.test.runtime;
+
+import java.util.Collection;
+
+import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Runs the cluster state runtime tests with the storage parallelism.
+ */
+@RunWith(Parameterized.class)
+public class MetricsExecutionTest {
+ protected static final String TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml";
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, new TestExecutor());
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ LangExecutionUtil.tearDown();
+ }
+
+ @Parameters(name = "MetricsExecutionTest {index}: {0}")
+ public static Collection<Object[]> tests() throws Exception {
+ return LangExecutionUtil.buildTestsInXml("metrics.xml");
+ }
+
+ protected TestCaseContext tcCtx;
+
+ public MetricsExecutionTest(TestCaseContext tcCtx) {
+ this.tcCtx = tcCtx;
+ }
+
+ @Test
+ public void test() throws Exception {
+ LangExecutionUtil.test(tcCtx);
+ }
+}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/metrics.xml b/asterixdb/asterix-app/src/test/resources/runtimets/metrics.xml
new file mode 100644
index 0000000..f65ede3
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/metrics.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements. See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership. The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License. You may obtain a copy of the License at
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied. See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<test-suite xmlns="urn:xml.testframework.asterix.apache.org" ResultOffsetPath="results" QueryOffsetPath="queries_sqlpp"
+ QueryFileExtension=".sqlpp">
+ <test-group name="metrics">
+ <test-case FilePath="metrics">
+ <compilation-unit name="full-scan">
+ <output-dir compare="Text">full-scan</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="metrics">
+ <compilation-unit name="primary-index">
+ <output-dir compare="Text">primary-index</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="metrics">
+ <compilation-unit name="secondary-index">
+ <output-dir compare="Text">secondary-index</output-dir>
+ </compilation-unit>
+ </test-case>
+ </test-group>
+</test-suite>
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.1.ddl.sqlpp
new file mode 100644
index 0000000..441bee4
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.1.ddl.sqlpp
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Processed objects metrics on full scan
+ * Expected Res : Success
+ * Date : 28 Sep 2017
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+
+use test;
+
+create type test.AddressType as
+{
+ number : bigint,
+ street : string,
+ city : string
+};
+
+create type test.CustomerType as
+ closed {
+ cid : bigint,
+ name : string,
+ age : bigint?,
+ address : AddressType?,
+ lastorder : {
+ oid : bigint,
+ total : float
+ }
+};
+
+create dataset Customers(CustomerType) primary key cid;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.2.update.sqlpp
new file mode 100644
index 0000000..2c5d5d9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.2.update.sqlpp
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Processed objects metrics on full scan
+ * Expected Res : Success
+ * Date : 28 Sep 2017
+ */
+
+use test;
+
+load dataset Customers using localfs
+ ((`path`=`asterix_nc1://data/custord-tiny/customer-tiny-neg.adm`),
+ (`format`=`adm`));
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.3.metrics.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.3.metrics.sqlpp
new file mode 100644
index 0000000..3671133
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.3.metrics.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Processed objects metrics on full scan
+ * Expected Res : Success
+ * Date : 28 Sep 2017
+ */
+
+use test;
+
+select count(*) from Customers
+where name = "Marvella Loud";
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.4.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.4.ddl.sqlpp
new file mode 100644
index 0000000..4cccd9f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.4.ddl.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Processed objects metrics on full scan
+ * Expected Res : Success
+ * Date : 28 Sep 2017
+ */
+
+drop dataverse test;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.1.ddl.sqlpp
new file mode 100644
index 0000000..5fb58b4
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.1.ddl.sqlpp
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Processed objects metrics on primary index scan
+ * Expected Res : Success
+ * Date : 28 Sep 2017
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+
+use test;
+
+create type test.AddressType as
+{
+ number : bigint,
+ street : string,
+ city : string
+};
+
+create type test.CustomerType as
+ closed {
+ cid : bigint,
+ name : string,
+ age : bigint?,
+ address : AddressType?,
+ lastorder : {
+ oid : bigint,
+ total : float
+ }
+};
+
+create dataset Customers(CustomerType) primary key cid;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.2.update.sqlpp
new file mode 100644
index 0000000..046c40b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.2.update.sqlpp
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Processed objects metrics on primary index scan
+ * Expected Res : Success
+ * Date : 28 Sep 2017
+ */
+
+use test;
+
+load dataset Customers using localfs
+ ((`path`=`asterix_nc1://data/custord-tiny/customer-tiny-neg.adm`),
+ (`format`=`adm`));
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.3.metrics.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.3.metrics.sqlpp
new file mode 100644
index 0000000..969949e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.3.metrics.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Processed objects metrics on primary index scan
+ * Expected Res : Success
+ * Date : 28 Sep 2017
+ */
+
+use test;
+
+select count(*) from Customers
+where cid = 996;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.4.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.4.ddl.sqlpp
new file mode 100644
index 0000000..1dbe56e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.4.ddl.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Processed objects metrics on primary index scan
+ * Expected Res : Success
+ * Date : 28 Sep 2017
+ */
+
+drop dataverse test;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.1.ddl.sqlpp
new file mode 100644
index 0000000..74f8a19
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.1.ddl.sqlpp
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Processed objects metrics on secondary index scan
+ * Expected Res : Success
+ * Date : 28 Sep 2017
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+
+use test;
+
+create type test.AddressType as
+{
+ number : bigint,
+ street : string,
+ city : string
+};
+
+create type test.CustomerType as
+ closed {
+ cid : bigint,
+ name : string,
+ age : bigint?,
+ address : AddressType?,
+ lastorder : {
+ oid : bigint,
+ total : float
+ }
+};
+
+create dataset Customers(CustomerType) primary key cid;
+create index customer_name_idx on Customers(name);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.2.update.sqlpp
new file mode 100644
index 0000000..40d87f7
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.2.update.sqlpp
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Processed objects metrics on secondary index scan
+ * Expected Res : Success
+ * Date : 28 Sep 2017
+ */
+
+use test;
+
+load dataset Customers using localfs
+ ((`path`=`asterix_nc1://data/custord-tiny/customer-tiny-neg.adm`),
+ (`format`=`adm`));
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.3.metrics.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.3.metrics.sqlpp
new file mode 100644
index 0000000..1c5126f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.3.metrics.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Processed objects metrics on secondary index scan
+ * Expected Res : Success
+ * Date : 28 Sep 2017
+ */
+
+use test;
+
+select count(*) from Customers
+where name = "Marvella Loud";
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.4.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.4.ddl.sqlpp
new file mode 100644
index 0000000..1de3b3b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.4.ddl.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Processed objects metrics on secondary index scan
+ * Expected Res : Success
+ * Date : 28 Sep 2017
+ */
+
+drop dataverse test;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/metrics/full-scan/full-scan.3.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/metrics/full-scan/full-scan.3.regexadm
new file mode 100644
index 0000000..aca314d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/metrics/full-scan/full-scan.3.regexadm
@@ -0,0 +1 @@
+.*"processedObjects":10.*
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/metrics/primary-index/primary-index.3.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/metrics/primary-index/primary-index.3.regexadm
new file mode 100644
index 0000000..91a2dfd
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/metrics/primary-index/primary-index.3.regexadm
@@ -0,0 +1 @@
+.*"processedObjects":1.*
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/metrics/secondary-index/secondary-index.3.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/metrics/secondary-index/secondary-index.3.regexadm
new file mode 100644
index 0000000..8578d41
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/metrics/secondary-index/secondary-index.3.regexadm
@@ -0,0 +1 @@
+.*"processedObjects":2.*
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
index df693b2..10bb336 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
@@ -28,6 +28,7 @@
import org.apache.hyracks.api.io.IWorkspaceFileFactory;
import org.apache.hyracks.api.job.IOperatorEnvironment;
import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.profiling.IStatsCollector;
import org.apache.hyracks.api.job.profiling.counters.ICounterContext;
import org.apache.hyracks.api.resources.IDeallocatableRegistry;
@@ -52,4 +53,6 @@
Object getSharedObject();
Set<JobFlag> getJobFlags();
+
+ IStatsCollector getStatsCollector();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java
new file mode 100644
index 0000000..35ae8b6
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.job.profiling;
+
+import java.io.Serializable;
+
+import org.apache.hyracks.api.io.IWritable;
+import org.apache.hyracks.api.job.profiling.counters.ICounter;
+
+public interface IOperatorStats extends IWritable, Serializable {
+
+ /**
+ * @return The name of the operator
+ */
+ String getName();
+
+ /**
+ * @return A counter used to track the number of tuples
+ * accessed by an operator
+ */
+ ICounter getTupleCounter();
+
+ /**
+ * @return A counter used to track the execution time
+ * of an operator
+ */
+ ICounter getTimeCounter();
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IStatsCollector.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IStatsCollector.java
new file mode 100644
index 0000000..1e73146
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IStatsCollector.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.job.profiling;
+
+import java.io.Serializable;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IWritable;
+
+public interface IStatsCollector extends IWritable, Serializable {
+
+ /**
+ * Adds {@link IOperatorStats} to the stats collections
+ *
+ * @param operatorStats
+ * @throws HyracksDataException when an operator with the same was already added.
+ */
+ void add(IOperatorStats operatorStats) throws HyracksDataException;
+
+ /**
+ * @param operatorName
+ * @return {@link IOperatorStats} for the operator with name <code>operatorName</code>
+ * if one exists or else null.
+ */
+ IOperatorStats getOperatorStats(String operatorName);
+
+ /**
+ * @return A special {@link IOperatorStats} that has the aggregated stats
+ * from all operators in the collection.
+ */
+ IOperatorStats getAggregatedStats();
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/counters/ICounter.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/counters/ICounter.java
index 987b86b..e43a2e1 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/counters/ICounter.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/counters/ICounter.java
@@ -18,13 +18,15 @@
*/
package org.apache.hyracks.api.job.profiling.counters;
-public interface ICounter {
+import java.io.Serializable;
+
+public interface ICounter extends Serializable {
/**
* Get the fully-qualified name of the counter.
*
* @return Name of the counter.
*/
- public String getName();
+ String getName();
/**
* Update the value of the counter to be current + delta.
@@ -33,7 +35,7 @@
* - Amount to change the counter value by.
* @return the new value after update.
*/
- public long update(long delta);
+ long update(long delta);
/**
* Set the value of the counter.
@@ -42,12 +44,12 @@
* - New value of the counter.
* @return Old value of the counter.
*/
- public long set(long value);
+ long set(long value);
/**
* Get the value of the counter.
*
* @return the value of the counter.
*/
- public long get();
+ long get();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/OperatorStats.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/OperatorStats.java
new file mode 100644
index 0000000..718ac3d
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/OperatorStats.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.common.job.profiling;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hyracks.api.job.profiling.IOperatorStats;
+import org.apache.hyracks.api.job.profiling.counters.ICounter;
+import org.apache.hyracks.control.common.job.profiling.counters.Counter;
+
+public class OperatorStats implements IOperatorStats {
+
+ public final String operatorName;
+ public final ICounter tupleCounter;
+ public final ICounter timeCounter;
+
+ public OperatorStats(String operatorName) {
+ if (operatorName == null || operatorName.isEmpty()) {
+ throw new IllegalArgumentException("operatorName must not be null or empty");
+ }
+ this.operatorName = operatorName;
+ tupleCounter = new Counter("tupleCounter");
+ timeCounter = new Counter("timeCounter");
+ }
+
+ public static IOperatorStats create(DataInput input) throws IOException {
+ String name = input.readUTF();
+ OperatorStats operatorStats = new OperatorStats(name);
+ operatorStats.readFields(input);
+ return operatorStats;
+ }
+
+ @Override
+ public String getName() {
+ return operatorName;
+ }
+
+ @Override
+ public ICounter getTupleCounter() {
+ return tupleCounter;
+ }
+
+ @Override
+ public ICounter getTimeCounter() {
+ return timeCounter;
+ }
+
+ @Override
+ public void writeFields(DataOutput output) throws IOException {
+ output.writeUTF(operatorName);
+ output.writeLong(tupleCounter.get());
+ output.writeLong(timeCounter.get());
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ tupleCounter.set(input.readLong());
+ timeCounter.set(input.readLong());
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java
new file mode 100644
index 0000000..90cdc72
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.common.job.profiling;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.profiling.IOperatorStats;
+import org.apache.hyracks.api.job.profiling.IStatsCollector;
+
+public class StatsCollector implements IStatsCollector {
+
+ private final Map<String, IOperatorStats> operatorStatsMap = new HashMap<>();
+
+ @Override
+ public void add(IOperatorStats operatorStats) throws HyracksDataException {
+ if (operatorStatsMap.containsKey(operatorStats.getName())) {
+ throw new IllegalArgumentException("Operator with the same name already exists");
+ }
+ operatorStatsMap.put(operatorStats.getName(), operatorStats);
+ }
+
+ @Override
+ public IOperatorStats getOperatorStats(String operatorName) {
+ return operatorStatsMap.get(operatorName);
+ }
+
+ public static StatsCollector create(DataInput input) throws IOException {
+ StatsCollector statsCollector = new StatsCollector();
+ statsCollector.readFields(input);
+ return statsCollector;
+ }
+
+ @Override
+ public IOperatorStats getAggregatedStats() {
+ IOperatorStats aggregatedStats = new OperatorStats("aggregated");
+ for (IOperatorStats stats : operatorStatsMap.values()) {
+ aggregatedStats.getTupleCounter().update(stats.getTupleCounter().get());
+ aggregatedStats.getTimeCounter().update(stats.getTupleCounter().get());
+ }
+ return aggregatedStats;
+ }
+
+ @Override
+ public void writeFields(DataOutput output) throws IOException {
+ output.writeInt(operatorStatsMap.size());
+ for (IOperatorStats operatorStats : operatorStatsMap.values()) {
+ operatorStats.writeFields(output);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ int operatorCount = input.readInt();
+ for (int i = 0; i < operatorCount; i++) {
+ IOperatorStats opStats = OperatorStats.create(input);
+ operatorStatsMap.put(opStats.getName(), opStats);
+ }
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java
index 680d2f9..3b54887 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java
@@ -25,12 +25,15 @@
import java.util.Map;
import java.util.Map.Entry;
+import org.apache.hyracks.api.dataflow.TaskAttemptId;
+import org.apache.hyracks.api.job.profiling.IStatsCollector;
+import org.apache.hyracks.api.partitions.PartitionId;
+import org.apache.hyracks.control.common.job.profiling.StatsCollector;
+import org.apache.hyracks.control.common.job.profiling.counters.MultiResolutionEventProfiler;
+
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.hyracks.api.dataflow.TaskAttemptId;
-import org.apache.hyracks.api.partitions.PartitionId;
-import org.apache.hyracks.control.common.job.profiling.counters.MultiResolutionEventProfiler;
public class TaskProfile extends AbstractProfile {
private static final long serialVersionUID = 1L;
@@ -39,6 +42,8 @@
private Map<PartitionId, PartitionProfile> partitionSendProfile;
+ private IStatsCollector statsCollector;
+
public static TaskProfile create(DataInput dis) throws IOException {
TaskProfile taskProfile = new TaskProfile();
taskProfile.readFields(dis);
@@ -49,9 +54,11 @@
}
- public TaskProfile(TaskAttemptId taskAttemptId, Map<PartitionId, PartitionProfile> partitionSendProfile) {
+ public TaskProfile(TaskAttemptId taskAttemptId, Map<PartitionId, PartitionProfile> partitionSendProfile,
+ IStatsCollector statsCollector) {
this.taskAttemptId = taskAttemptId;
- this.partitionSendProfile = new HashMap<PartitionId, PartitionProfile>(partitionSendProfile);
+ this.partitionSendProfile = new HashMap<>(partitionSendProfile);
+ this.statsCollector = statsCollector;
}
public TaskAttemptId getTaskId() {
@@ -104,17 +111,22 @@
return json;
}
+ public IStatsCollector getStatsCollector() {
+ return statsCollector;
+ }
+
@Override
public void readFields(DataInput input) throws IOException {
super.readFields(input);
taskAttemptId = TaskAttemptId.create(input);
int size = input.readInt();
- partitionSendProfile = new HashMap<PartitionId, PartitionProfile>();
+ partitionSendProfile = new HashMap<>();
for (int i = 0; i < size; i++) {
PartitionId key = PartitionId.create(input);
PartitionProfile value = PartitionProfile.create(input);
partitionSendProfile.put(key, value);
}
+ statsCollector = StatsCollector.create(input);
}
@Override
@@ -126,5 +138,6 @@
entry.getKey().writeFields(output);
entry.getValue().writeFields(output);
}
+ statsCollector.writeFields(output);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
index d7b4be0..6dc9619 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
@@ -53,6 +53,7 @@
import org.apache.hyracks.control.common.deployment.DeploymentUtils;
import org.apache.hyracks.control.common.job.PartitionRequest;
import org.apache.hyracks.control.common.job.PartitionState;
+import org.apache.hyracks.control.common.job.profiling.StatsCollector;
import org.apache.hyracks.control.common.job.profiling.counters.Counter;
import org.apache.hyracks.control.common.job.profiling.om.JobletProfile;
import org.apache.hyracks.control.common.job.profiling.om.TaskProfile;
@@ -190,7 +191,7 @@
}
for (Task task : taskMap.values()) {
TaskProfile taskProfile = new TaskProfile(task.getTaskAttemptId(),
- new Hashtable<>(task.getPartitionSendProfile()));
+ new Hashtable<>(task.getPartitionSendProfile()), new StatsCollector());
task.dumpProfile(taskProfile);
jProfile.getTaskProfiles().put(task.getTaskAttemptId(), taskProfile);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index bff2794..fcd4bde 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -53,12 +53,14 @@
import org.apache.hyracks.api.io.IWorkspaceFileFactory;
import org.apache.hyracks.api.job.IOperatorEnvironment;
import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.profiling.IStatsCollector;
import org.apache.hyracks.api.job.profiling.counters.ICounter;
import org.apache.hyracks.api.job.profiling.counters.ICounterContext;
import org.apache.hyracks.api.partitions.PartitionId;
import org.apache.hyracks.api.resources.IDeallocatable;
import org.apache.hyracks.api.util.JavaSerializationUtils;
import org.apache.hyracks.control.common.job.PartitionState;
+import org.apache.hyracks.control.common.job.profiling.StatsCollector;
import org.apache.hyracks.control.common.job.profiling.counters.Counter;
import org.apache.hyracks.control.common.job.profiling.om.PartitionProfile;
import org.apache.hyracks.control.common.job.profiling.om.TaskProfile;
@@ -107,6 +109,8 @@
private final Set<JobFlag> jobFlags;
+ private final IStatsCollector statsCollector;
+
public Task(Joblet joblet, Set<JobFlag> jobFlags, TaskAttemptId taskId, String displayName,
ExecutorService executor, NodeControllerService ncs,
List<List<PartitionChannel>> inputChannelsFromConnectors) {
@@ -124,6 +128,7 @@
exceptions = new CopyOnWriteArrayList<>(); // Multiple threads could add exceptions to this list.
this.ncs = ncs;
this.inputChannelsFromConnectors = inputChannelsFromConnectors;
+ statsCollector = new StatsCollector();
}
public void setTaskRuntime(IPartitionCollector[] collectors, IOperatorNodePushable operator) {
@@ -453,4 +458,9 @@
public Set<JobFlag> getJobFlags() {
return jobFlags;
}
+
+ @Override
+ public IStatsCollector getStatsCollector() {
+ return statsCollector;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java
index 7728d16..675926e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java
@@ -38,8 +38,8 @@
@Override
public void run() {
- TaskProfile taskProfile = new TaskProfile(task.getTaskAttemptId(), task.getPartitionSendProfile());
- task.dumpProfile(taskProfile);
+ TaskProfile taskProfile =
+ new TaskProfile(task.getTaskAttemptId(), task.getPartitionSendProfile(), task.getStatsCollector());
try {
ncs.getClusterController().notifyTaskComplete(task.getJobletContext().getJobId(), task.getTaskAttemptId(),
ncs.getId(), taskProfile);
@@ -53,4 +53,4 @@
public String toString() {
return getClass().getSimpleName() + ":" + task.getTaskAttemptId();
}
-}
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/pom.xml b/hyracks-fullstack/hyracks/hyracks-storage-am-common/pom.xml
index 526caa9..2b3f3cb 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/pom.xml
@@ -81,6 +81,11 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-control-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
index 07d07c3..1e3d5f0 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
@@ -30,6 +30,8 @@
import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.profiling.IOperatorStats;
+import org.apache.hyracks.control.common.job.profiling.OperatorStats;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -79,6 +81,7 @@
protected ArrayTupleBuilder nonFilterTupleBuild;
protected final ISearchOperationCallbackFactory searchCallbackFactory;
protected boolean failed = false;
+ private final IOperatorStats stats;
public IndexSearchOperatorNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc, int partition,
int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, IIndexDataflowHelperFactory indexHelperFactory,
@@ -105,6 +108,10 @@
maxFilterKey = new PermutingFrameTupleReference();
maxFilterKey.setFieldPermutation(maxFilterFieldIndexes);
}
+ stats = new OperatorStats(getDisplayName());
+ if (ctx.getStatsCollector() != null) {
+ ctx.getStatsCollector().add(stats);
+ }
}
protected abstract ISearchPredicate createSearchPredicate();
@@ -154,9 +161,9 @@
}
protected void writeSearchResults(int tupleIndex) throws Exception {
- boolean matched = false;
+ long matchingTupleCount = 0;
while (cursor.hasNext()) {
- matched = true;
+ matchingTupleCount++;
tb.reset();
cursor.next();
if (retainInput) {
@@ -174,8 +181,9 @@
}
FrameUtils.appendToWriter(writer, appender, tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
}
+ stats.getTupleCounter().update(matchingTupleCount);
- if (!matched && retainInput && retainMissing) {
+ if (matchingTupleCount == 0 && retainInput && retainMissing) {
FrameUtils.appendConcatToWriter(writer, appender, accessor, tupleIndex,
nonMatchTupleBuild.getFieldEndOffsets(), nonMatchTupleBuild.getByteArray(), 0,
nonMatchTupleBuild.getSize());
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
index b100300..3d13cf9 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
@@ -36,8 +36,10 @@
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.profiling.IStatsCollector;
import org.apache.hyracks.api.job.profiling.counters.ICounterContext;
import org.apache.hyracks.api.resources.IDeallocatable;
+import org.apache.hyracks.control.common.job.profiling.StatsCollector;
import org.apache.hyracks.control.nc.io.WorkspaceFileFactory;
public class TestTaskContext implements IHyracksTaskContext {
@@ -46,6 +48,7 @@
private WorkspaceFileFactory fileFactory;
private Map<Object, IStateObject> stateObjectMap = new HashMap<>();
private Object sharedObject;
+ private final IStatsCollector statsCollector = new StatsCollector();
public TestTaskContext(TestJobletContext jobletContext, TaskAttemptId taskId) {
this.jobletContext = jobletContext;
@@ -163,4 +166,9 @@
public Set<JobFlag> getJobFlags() {
return EnumSet.noneOf(JobFlag.class);
}
+
+ @Override
+ public IStatsCollector getStatsCollector() {
+ return statsCollector;
+ }
}