[ASTERIXDB-2532][RT] per-operator profiling
Enables profiling in queries at the operator-level when the analyze
variable is set in a query.
Change-Id: Ie16f3901ae5b32920d8552d5fd1ec8bb6e2ec8ae
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3226
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
index 3718340..606abd2 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
@@ -18,6 +18,9 @@
*/
package org.apache.asterix.translator;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.rmi.RemoteException;
import java.util.ArrayList;
@@ -43,6 +46,10 @@
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.result.ResultSetId;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
/**
* An interface that takes care of executing a list of statements that are submitted through an Asterix API
*/
@@ -78,11 +85,18 @@
}
class Stats implements Serializable {
- private static final long serialVersionUID = 5885273238208454610L;
+ private static final long serialVersionUID = 5885273238208454611L;
+
+ public enum ProfileType {
+ COUNTS,
+ FULL
+ }
private long count;
private long size;
private long processedObjects;
+ private Profile profile;
+ private ProfileType type;
private long diskIoCount;
private long totalWarningsCount;
@@ -125,6 +139,48 @@
public void setTotalWarningsCount(long totalWarningsCount) {
this.totalWarningsCount = totalWarningsCount;
}
+
+ public void setJobProfile(ObjectNode profile) {
+ this.profile = new Profile(profile);
+ }
+
+ public ObjectNode getJobProfile() {
+ return profile != null ? profile.getProfile() : null;
+ }
+
+ public ProfileType getType() {
+ return type;
+ }
+
+ public void setType(ProfileType type) {
+ this.type = type;
+ }
+ }
+
+ class Profile implements Serializable {
+ private transient ObjectNode profile;
+
+ public Profile(ObjectNode profile) {
+ this.profile = profile;
+ }
+
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ ObjectMapper om = new ObjectMapper();
+ out.writeUTF(om.writeValueAsString(profile));
+ }
+
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ ObjectMapper om = new ObjectMapper();
+ JsonNode inNode = om.readTree(in.readUTF());
+ if (!inNode.isObject()) {
+ throw new IOException("Deserialization error");
+ }
+ profile = (ObjectNode) inNode;
+ }
+
+ public ObjectNode getProfile() {
+ return profile;
+ }
}
/**
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/ResultMetadata.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/ResultMetadata.java
index a036696..9cf8e6e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/ResultMetadata.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/ResultMetadata.java
@@ -24,11 +24,14 @@
import org.apache.hyracks.api.exceptions.Warning;
import org.apache.hyracks.api.result.IResultMetadata;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
public class ResultMetadata implements IResultMetadata {
private final SessionConfig.OutputFormat format;
private long jobDuration;
private long processedObjects;
+ private ObjectNode profile;
private long diskIoCount;
private Set<Warning> warnings;
private long totalWarningsCount;
@@ -68,6 +71,14 @@
return jobDuration;
}
+ public void setJobProfile(ObjectNode profile) {
+ this.profile = profile;
+ }
+
+ public ObjectNode getJobProfile() {
+ return profile;
+ }
+
/**
* @return The reported warnings.
*/
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
index 27e685c..2c499c7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
@@ -92,7 +92,7 @@
new ExecuteStatementRequestMessage(ncCtx.getNodeId(), responseFuture.getFutureId(), queryLanguage,
statementsText, sessionOutput.config(), resultProperties.getNcToCcResultProperties(),
param.getClientContextID(), handleUrl, optionalParameters, statementParameters,
- param.isMultiStatement(), stmtCategoryRestrictionMask, requestReference);
+ param.isMultiStatement(), param.isProfile(), stmtCategoryRestrictionMask, requestReference);
execution.start();
ncMb.sendMessageToPrimaryCC(requestMsg);
try {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
index 52c62af..4980f5e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
@@ -22,11 +22,12 @@
import java.util.concurrent.ConcurrentMap;
import org.apache.asterix.api.common.ResultMetadata;
-import org.apache.asterix.app.result.ResponseMertics;
+import org.apache.asterix.app.result.ResponseMetrics;
import org.apache.asterix.app.result.ResponsePrinter;
import org.apache.asterix.app.result.ResultHandle;
import org.apache.asterix.app.result.ResultReader;
import org.apache.asterix.app.result.fields.MetricsPrinter;
+import org.apache.asterix.app.result.fields.ProfilePrinter;
import org.apache.asterix.app.result.fields.ResultsPrinter;
import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.translator.IStatementExecutor.Stats;
@@ -98,10 +99,13 @@
printer.begin();
printer.addResultPrinter(new ResultsPrinter(appCtx, resultReader, null, stats, sessionOutput));
printer.printResults();
- ResponseMertics mertics = ResponseMertics.of(System.nanoTime() - elapsedStart,
+ ResponseMetrics metrics = ResponseMetrics.of(System.nanoTime() - elapsedStart,
metadata.getJobDuration(), stats.getCount(), stats.getSize(), metadata.getProcessedObjects(), 0,
metadata.getTotalWarningsCount(), metadata.getDiskIoCount());
- printer.addFooterPrinter(new MetricsPrinter(mertics, HttpUtil.getPreferredCharset(request)));
+ printer.addFooterPrinter(new MetricsPrinter(metrics, HttpUtil.getPreferredCharset(request)));
+ if (metadata.getJobProfile() != null) {
+ printer.addFooterPrinter(new ProfilePrinter(metadata.getJobProfile()));
+ }
printer.printFooters();
printer.end();
} else {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceRequestParameters.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceRequestParameters.java
index 566133d..ff6dcbd 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceRequestParameters.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceRequestParameters.java
@@ -50,6 +50,7 @@
private boolean logicalPlan;
private boolean optimizedLogicalPlan;
private boolean job;
+ private boolean profile;
private boolean signature;
private boolean multiStatement;
@@ -197,6 +198,14 @@
this.job = job;
}
+ public void setProfile(boolean profile) {
+ this.profile = profile;
+ }
+
+ public boolean isProfile() {
+ return profile;
+ }
+
public boolean isSignature() {
return signature;
}
@@ -230,6 +239,7 @@
object.put("logicalPlan", logicalPlan);
object.put("optimizedLogicalPlan", optimizedLogicalPlan);
object.put("job", job);
+ object.put("profile", profile);
object.put("signature", signature);
object.put("multiStatement", multiStatement);
object.put("parseOnly", parseOnly);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index 1e8f111..d193b13 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -45,13 +45,14 @@
import org.apache.asterix.algebra.base.ILangExtension;
import org.apache.asterix.app.result.ExecutionError;
import org.apache.asterix.app.result.ExecutionWarning;
-import org.apache.asterix.app.result.ResponseMertics;
+import org.apache.asterix.app.result.ResponseMetrics;
import org.apache.asterix.app.result.ResponsePrinter;
import org.apache.asterix.app.result.fields.ClientContextIdPrinter;
import org.apache.asterix.app.result.fields.ErrorsPrinter;
import org.apache.asterix.app.result.fields.MetricsPrinter;
import org.apache.asterix.app.result.fields.ParseOnlyResultPrinter;
import org.apache.asterix.app.result.fields.PlansPrinter;
+import org.apache.asterix.app.result.fields.ProfilePrinter;
import org.apache.asterix.app.result.fields.RequestIdPrinter;
import org.apache.asterix.app.result.fields.SignaturePrinter;
import org.apache.asterix.app.result.fields.StatusPrinter;
@@ -68,7 +69,6 @@
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
-import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.compiler.provider.ILangCompilationProvider;
import org.apache.asterix.lang.aql.parser.TokenMgrError;
@@ -171,6 +171,7 @@
PARSE_ONLY("parse-only"),
READ_ONLY("readonly"),
JOB("job"),
+ PROFILE("profile"),
SIGNATURE("signature"),
MULTI_STATEMENT("multi-statement");
@@ -383,6 +384,7 @@
param.setReadOnly(getOptBoolean(jsonRequest, Parameter.READ_ONLY, false));
param.setOptimizedLogicalPlan(getOptBoolean(jsonRequest, Parameter.OPTIMIZED_LOGICAL_PLAN, false));
param.setJob(getOptBoolean(jsonRequest, Parameter.JOB, false));
+ param.setProfile(getOptBoolean(jsonRequest, Parameter.PROFILE, false));
param.setSignature(getOptBoolean(jsonRequest, Parameter.SIGNATURE, true));
param.setStatementParams(
getOptStatementParameters(jsonRequest, jsonRequest.fieldNames(), JsonNode::get, v -> v));
@@ -405,8 +407,15 @@
param.setTimeout(getParameter(request, Parameter.TIMEOUT));
param.setMaxResultReads(getParameter(request, Parameter.MAX_RESULT_READS));
param.setPlanFormat(getParameter(request, Parameter.PLAN_FORMAT));
+ param.setExpressionTree(getOptBoolean(request, Parameter.EXPRESSION_TREE, false));
+ param.setRewrittenExpressionTree(getOptBoolean(request, Parameter.REWRITTEN_EXPRESSION_TREE, false));
+ param.setLogicalPlan(getOptBoolean(request, Parameter.LOGICAL_PLAN, false));
param.setParseOnly(getOptBoolean(request, Parameter.PARSE_ONLY, false));
param.setReadOnly(getOptBoolean(request, Parameter.READ_ONLY, false));
+ param.setOptimizedLogicalPlan(getOptBoolean(request, Parameter.OPTIMIZED_LOGICAL_PLAN, false));
+ param.setJob(getOptBoolean(request, Parameter.JOB, false));
+ param.setProfile(getOptBoolean(request, Parameter.PROFILE, false));
+ param.setSignature(getOptBoolean(request, Parameter.SIGNATURE, true));
param.setMultiStatement(getOptBoolean(request, Parameter.MULTI_STATEMENT, true));
try {
param.setStatementParams(getOptStatementParameters(request, request.getParameterNames().iterator(),
@@ -509,6 +518,7 @@
.serializeParameterValues(param.getStatementParams());
setAccessControlHeaders(request, response);
response.setStatus(execution.getHttpStatus());
+ stats.setType(param.isProfile() ? Stats.ProfileType.FULL : Stats.ProfileType.COUNTS);
executeStatement(requestRef, statementsText, sessionOutput, resultProperties, stats, param, execution,
optionalParams, statementParams, responsePrinter, warnings);
}
@@ -561,15 +571,18 @@
// in case of ASYNC delivery, the status is printed by query translator
responsePrinter.addFooterPrinter(new StatusPrinter(execution.getResultStatus()));
}
- final ResponseMertics mertics = ResponseMertics.of(System.nanoTime() - elapsedStart, execution.duration(),
+ final ResponseMetrics metrics = ResponseMetrics.of(System.nanoTime() - elapsedStart, execution.duration(),
stats.getCount(), stats.getSize(), stats.getProcessedObjects(), errorCount,
stats.getTotalWarningsCount(), stats.getDiskIoCount());
- responsePrinter.addFooterPrinter(new MetricsPrinter(mertics, resultCharset));
+ responsePrinter.addFooterPrinter(new MetricsPrinter(metrics, resultCharset));
+ if (stats.getType() == Stats.ProfileType.FULL) {
+ responsePrinter.addFooterPrinter(new ProfilePrinter(stats.getJobProfile()));
+ }
}
protected void validateStatement(String statement) throws RuntimeDataException {
if (statement == null || statement.isEmpty()) {
- throw new RuntimeDataException(ErrorCode.NO_STATEMENT_PROVIDED);
+ throw new RuntimeDataException(NO_STATEMENT_PROVIDED);
}
}
@@ -581,8 +594,7 @@
Query query = (Query) stmts.get(stmts.size() - 1);
Set<VariableExpr> extVars =
compilationProvider.getRewriterFactory().createQueryRewriter().getExternalVariables(query.getBody());
- ResultUtil.ParseOnlyResult parseOnlyResult = new ResultUtil.ParseOnlyResult(extVars);
- return parseOnlyResult;
+ return new ResultUtil.ParseOnlyResult(extVars);
}
protected void executeStatement(IRequestReference requestReference, String statementsText,
@@ -697,10 +709,9 @@
public static String extractStatementParameterName(String name) {
int ln = name.length();
- if (ln > 1 && name.charAt(0) == '$' && Character.isLetter(name.charAt(1))) {
- if (ln == 2 || isStatementParameterNameRest(name, 2)) {
- return name.substring(1);
- }
+ if ((ln == 2 || isStatementParameterNameRest(name, 2)) && name.charAt(0) == '$'
+ && Character.isLetter(name.charAt(1))) {
+ return name.substring(1);
}
return null;
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
index b666085..4e2fea0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
@@ -19,6 +19,9 @@
package org.apache.asterix.app.message;
+import static org.apache.asterix.translator.IStatementExecutor.Stats.ProfileType.COUNTS;
+import static org.apache.asterix.translator.IStatementExecutor.Stats.ProfileType.FULL;
+
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.HashSet;
@@ -85,13 +88,14 @@
private final Map<String, byte[]> statementParameters;
private final boolean multiStatement;
private final int statementCategoryRestrictionMask;
+ private final boolean profile;
private final IRequestReference requestReference;
public ExecuteStatementRequestMessage(String requestNodeId, long requestMessageId, ILangExtension.Language lang,
String statementsText, SessionConfig sessionConfig, ResultProperties resultProperties,
String clientContextID, String handleUrl, Map<String, String> optionalParameters,
- Map<String, byte[]> statementParameters, boolean multiStatement, int statementCategoryRestrictionMask,
- IRequestReference requestReference) {
+ Map<String, byte[]> statementParameters, boolean multiStatement, boolean profile,
+ int statementCategoryRestrictionMask, IRequestReference requestReference) {
this.requestNodeId = requestNodeId;
this.requestMessageId = requestMessageId;
this.lang = lang;
@@ -104,6 +108,7 @@
this.statementParameters = statementParameters;
this.multiStatement = multiStatement;
this.statementCategoryRestrictionMask = statementCategoryRestrictionMask;
+ this.profile = profile;
this.requestReference = requestReference;
}
@@ -141,6 +146,7 @@
IStatementExecutor translator = statementExecutorFactory.create(ccAppCtx, statements, sessionOutput,
compilationProvider, storageComponentProvider, new ResponsePrinter(sessionOutput));
final IStatementExecutor.Stats stats = new IStatementExecutor.Stats();
+ stats.setType(profile ? FULL : COUNTS);
Map<String, IAObject> stmtParams = RequestParameters.deserializeParameterValues(statementParameters);
final IRequestParameters requestParameters = new RequestParameters(requestReference, statementsText, null,
resultProperties, stats, outMetadata, clientContextID, optionalParameters, stmtParams,
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java
index 55c1f8e..43d7cd9 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java
@@ -26,6 +26,7 @@
import org.apache.asterix.api.common.ResultMetadata;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.hyracks.api.exceptions.Warning;
+import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.result.IJobResultCallback;
import org.apache.hyracks.api.result.ResultJobRecord;
@@ -39,6 +40,8 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
public class JobResultCallback implements IJobResultCallback {
private static final Logger LOGGER = LogManager.getLogger();
@@ -67,6 +70,16 @@
aggregateJobStats(jobId, metadata);
}
+ ObjectNode getProfile(JobId jobId) {
+ IJobManager jobManager =
+ ((ClusterControllerService) appCtx.getServiceContext().getControllerService()).getJobManager();
+ final JobRun run = jobManager.get(jobId);
+ if (run != null) {
+ return run.getJobProfile().toJSON();
+ }
+ return null;
+ }
+
private void aggregateJobStats(JobId jobId, ResultMetadata metadata) {
long processedObjects = 0;
long diskIoCount = 0;
@@ -99,5 +112,9 @@
metadata.setWarnings(AggregateWarnings);
metadata.setDiskIoCount(diskIoCount);
metadata.setTotalWarningsCount(aggregateTotalWarningsCount);
+ if (run.getFlags() != null && run.getFlags().contains(JobFlag.PROFILE_RUNTIME)) {
+ metadata.setJobProfile(getProfile(jobId));
+ }
}
+
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResponseMertics.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResponseMetrics.java
similarity index 76%
rename from asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResponseMertics.java
rename to asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResponseMetrics.java
index 03a20a5..bc91d54 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResponseMertics.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResponseMetrics.java
@@ -18,7 +18,7 @@
*/
package org.apache.asterix.app.result;
-public class ResponseMertics {
+public class ResponseMetrics {
private long elapsedTime;
private long executionTime;
@@ -29,21 +29,21 @@
private long warnCount;
private long diskIoCount;
- private ResponseMertics() {
+ private ResponseMetrics() {
}
- public static ResponseMertics of(long elapsedTime, long executionTime, long resultCount, long resultSize,
+ public static ResponseMetrics of(long elapsedTime, long executionTime, long resultCount, long resultSize,
long processedObjects, long errorCount, long warnCount, long diskIoCount) {
- ResponseMertics mertics = new ResponseMertics();
- mertics.elapsedTime = elapsedTime;
- mertics.executionTime = executionTime;
- mertics.resultCount = resultCount;
- mertics.resultSize = resultSize;
- mertics.processedObjects = processedObjects;
- mertics.errorCount = errorCount;
- mertics.warnCount = warnCount;
- mertics.diskIoCount = diskIoCount;
- return mertics;
+ ResponseMetrics metrics = new ResponseMetrics();
+ metrics.elapsedTime = elapsedTime;
+ metrics.executionTime = executionTime;
+ metrics.resultCount = resultCount;
+ metrics.resultSize = resultSize;
+ metrics.processedObjects = processedObjects;
+ metrics.errorCount = errorCount;
+ metrics.warnCount = warnCount;
+ metrics.diskIoCount = diskIoCount;
+ return metrics;
}
public long getElapsedTime() {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/MetricsPrinter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/MetricsPrinter.java
index 117441e..21ffe0b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/MetricsPrinter.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/MetricsPrinter.java
@@ -23,7 +23,7 @@
import java.nio.charset.StandardCharsets;
import org.apache.asterix.api.http.server.ResultUtil;
-import org.apache.asterix.app.result.ResponseMertics;
+import org.apache.asterix.app.result.ResponseMetrics;
import org.apache.asterix.common.api.Duration;
import org.apache.asterix.common.api.IResponseFieldPrinter;
@@ -51,11 +51,11 @@
}
public static final String FIELD_NAME = "metrics";
- private final ResponseMertics mertics;
+ private final ResponseMetrics metrics;
private final Charset resultCharset;
- public MetricsPrinter(ResponseMertics mertics, Charset resultCharset) {
- this.mertics = mertics;
+ public MetricsPrinter(ResponseMetrics metrics, Charset resultCharset) {
+ this.metrics = metrics;
this.resultCharset = resultCharset;
}
@@ -67,35 +67,35 @@
pw.print(FIELD_NAME);
pw.print("\": {\n");
pw.print("\t");
- ResultUtil.printField(pw, Metrics.ELAPSED_TIME.str(), Duration.formatNanos(mertics.getElapsedTime(), useAscii));
+ ResultUtil.printField(pw, Metrics.ELAPSED_TIME.str(), Duration.formatNanos(metrics.getElapsedTime(), useAscii));
pw.print("\n\t");
ResultUtil.printField(pw, Metrics.EXECUTION_TIME.str(),
- Duration.formatNanos(mertics.getExecutionTime(), useAscii));
+ Duration.formatNanos(metrics.getExecutionTime(), useAscii));
pw.print("\n\t");
- ResultUtil.printField(pw, Metrics.RESULT_COUNT.str(), mertics.getResultCount(), true);
+ ResultUtil.printField(pw, Metrics.RESULT_COUNT.str(), metrics.getResultCount(), true);
pw.print("\n\t");
- ResultUtil.printField(pw, Metrics.RESULT_SIZE.str(), mertics.getResultSize(), true);
+ ResultUtil.printField(pw, Metrics.RESULT_SIZE.str(), metrics.getResultSize(), true);
pw.print("\n\t");
- final boolean hasErrors = mertics.getErrorCount() > 0;
- final boolean hasWarnings = mertics.getWarnCount() > 0;
- final boolean hasDiskIoStats = mertics.getDiskIoCount() > 0;
- ResultUtil.printField(pw, Metrics.PROCESSED_OBJECTS_COUNT.str(), mertics.getProcessedObjects(),
+ final boolean hasErrors = metrics.getErrorCount() > 0;
+ final boolean hasWarnings = metrics.getWarnCount() > 0;
+ final boolean hasDiskIoStats = metrics.getDiskIoCount() > 0;
+ ResultUtil.printField(pw, Metrics.PROCESSED_OBJECTS_COUNT.str(), metrics.getProcessedObjects(),
hasWarnings || hasErrors || hasDiskIoStats);
pw.print("\n");
//TODO move diskIoCount to the profile printer when it is introduced
if (hasDiskIoStats) {
pw.print("\t");
- ResultUtil.printField(pw, Metrics.DISK_IO_COUNT.str(), mertics.getDiskIoCount(), hasWarnings || hasErrors);
+ ResultUtil.printField(pw, Metrics.DISK_IO_COUNT.str(), metrics.getDiskIoCount(), hasWarnings || hasErrors);
pw.print("\n");
}
if (hasWarnings) {
pw.print("\t");
- ResultUtil.printField(pw, Metrics.WARNING_COUNT.str(), mertics.getWarnCount(), hasErrors);
+ ResultUtil.printField(pw, Metrics.WARNING_COUNT.str(), metrics.getWarnCount(), hasErrors);
pw.print("\n");
}
if (hasErrors) {
pw.print("\t");
- ResultUtil.printField(pw, Metrics.ERROR_COUNT.str(), mertics.getErrorCount(), false);
+ ResultUtil.printField(pw, Metrics.ERROR_COUNT.str(), metrics.getErrorCount(), false);
pw.print("\n");
}
pw.print("\t}");
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/NcResultPrinter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/NcResultPrinter.java
index 90b0656..c37db07 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/NcResultPrinter.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/NcResultPrinter.java
@@ -61,9 +61,11 @@
IStatementExecutor.ResultMetadata resultMetadata = responseMsg.getMetadata();
List<Triple<JobId, ResultSetId, ARecordType>> resultSets = resultMetadata.getResultSets();
if (delivery == IStatementExecutor.ResultDelivery.IMMEDIATE && !resultSets.isEmpty()) {
- stats.setProcessedObjects(responseMsg.getStats().getProcessedObjects());
- stats.setDiskIoCount(responseMsg.getStats().getDiskIoCount());
- stats.setTotalWarningsCount(responseMsg.getStats().getTotalWarningsCount());
+ IStatementExecutor.Stats responseStats = responseMsg.getStats();
+ stats.setJobProfile(responseStats.getJobProfile());
+ stats.setProcessedObjects(responseStats.getProcessedObjects());
+ stats.setDiskIoCount(responseStats.getDiskIoCount());
+ stats.setTotalWarningsCount(responseStats.getTotalWarningsCount());
for (int i = 0; i < resultSets.size(); i++) {
Triple<JobId, ResultSetId, ARecordType> rsmd = resultSets.get(i);
ResultReader resultReader = new ResultReader(resultSet, rsmd.getLeft(), rsmd.getMiddle());
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/ProfilePrinter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/ProfilePrinter.java
new file mode 100644
index 0000000..d74d25c
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/ProfilePrinter.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.result.fields;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+
+import org.apache.asterix.common.api.IResponseFieldPrinter;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.util.DefaultIndenter;
+import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+public class ProfilePrinter implements IResponseFieldPrinter {
+
+ public static final String FIELD_NAME = "profile";
+ private final ObjectNode profile;
+ private final Logger LOGGER = LogManager.getLogger();
+
+ public ProfilePrinter(ObjectNode profile) {
+ this.profile = profile;
+ }
+
+ @Override
+ public void print(PrintWriter pw) {
+ boolean hasProfile = profile != null;
+ if (hasProfile) {
+ try {
+ pw.print("\t\"" + FIELD_NAME + "\" : ");
+ ObjectMapper om = new ObjectMapper();
+ om.disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET);
+ DefaultIndenter ind = new DefaultIndenter("\t", DefaultIndenter.SYS_LF) {
+ @Override
+ public void writeIndentation(JsonGenerator jg, int level) throws IOException {
+ super.writeIndentation(jg, level + 1);
+ }
+ };
+ DefaultPrettyPrinter pp = new DefaultPrettyPrinter();
+ pp = pp.withArrayIndenter(ind);
+ pp = pp.withObjectIndenter(ind);
+ om.writer(pp).writeValue(pw, profile);
+ } catch (IOException e) {
+ LOGGER.error("Unable to print job profile", e);
+
+ }
+ }
+ }
+
+ @Override
+ public String getName() {
+ return FIELD_NAME;
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 1838c4c..a53ca88 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -391,6 +391,9 @@
metadataProvider.setResultAsyncMode(
resultDelivery == ResultDelivery.ASYNC || resultDelivery == ResultDelivery.DEFERRED);
metadataProvider.setMaxResultReads(maxResultReads);
+ if (stats.getType() == Stats.ProfileType.FULL) {
+ this.jobFlags.add(JobFlag.PROFILE_RUNTIME);
+ }
handleQuery(metadataProvider, (Query) stmt, hcc, resultSet, resultDelivery, outMetadata, stats,
requestParameters, stmtParams, stmtRewriter);
break;
@@ -2549,6 +2552,9 @@
(org.apache.asterix.api.common.ResultMetadata) controllerService.getResultDirectoryService()
.getResultMetadata(jobId, rsId);
stats.setProcessedObjects(resultMetadata.getProcessedObjects());
+ if (jobFlags.contains(JobFlag.PROFILE_RUNTIME)) {
+ stats.setJobProfile(resultMetadata.getJobProfile());
+ }
stats.setDiskIoCount(resultMetadata.getDiskIoCount());
stats.setTotalWarningsCount(resultMetadata.getTotalWarningsCount());
warningCollector.warn(resultMetadata.getWarnings());
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java
index eb708ce..60d44a1 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java
@@ -51,6 +51,7 @@
RESULTS("results"),
REQUEST_ID("requestID"),
METRICS("metrics"),
+ PROFILE("profile"),
CLIENT_CONTEXT_ID("clientContextID"),
SIGNATURE("signature"),
STATUS("status"),
@@ -93,6 +94,10 @@
return extract(resultStream, EnumSet.of(ResultField.METRICS), resultCharset).getResult();
}
+ public static InputStream extractProfile(InputStream resultStream, Charset resultCharset) throws Exception {
+ return extract(resultStream, EnumSet.of(ResultField.PROFILE), resultCharset).getResult();
+ }
+
public static InputStream extractPlans(InputStream resultStream, Charset resultCharset) throws Exception {
return extract(resultStream, EnumSet.of(ResultField.PLANS), resultCharset).getResult();
}
@@ -167,6 +172,7 @@
break;
case REQUEST_ID:
case METRICS:
+ case PROFILE:
case CLIENT_CONTEXT_ID:
case SIGNATURE:
case STATUS:
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index 55b8146..7319ab5 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -164,6 +164,7 @@
public static final String DELIVERY_IMMEDIATE = "immediate";
public static final String DIAGNOSE = "diagnose";
private static final String METRICS_QUERY_TYPE = "metrics";
+ private static final String PROFILE_QUERY_TYPE = "profile";
private static final String PLANS_QUERY_TYPE = "plans";
private static final HashMap<Integer, ITestServer> runningTestServers = new HashMap<>();
@@ -951,6 +952,7 @@
case "parse":
case "deferred":
case "metrics":
+ case "profile":
case "plans":
// isDmlRecoveryTest: insert Crash and Recovery
if (isDmlRecoveryTest) {
@@ -1298,6 +1300,9 @@
case METRICS_QUERY_TYPE:
resultStream = ResultExtractor.extractMetrics(resultStream, responseCharset);
break;
+ case PROFILE_QUERY_TYPE:
+ resultStream = ResultExtractor.extractProfile(resultStream, responseCharset);
+ break;
case PLANS_QUERY_TYPE:
resultStream = ResultExtractor.extractPlans(resultStream, responseCharset);
break;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestHelper.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestHelper.java
index 0cfeee7..7695698 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestHelper.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestHelper.java
@@ -218,4 +218,16 @@
objectMapper.configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true);
return objectMapper;
}
+
+ public static void main(String[] args) throws Exception {
+ ObjectMapper om = createObjectMapper();
+ String patternFile = args[0];
+ String instanceFile = args[1];
+ if (equalJson(om.readTree(new File(patternFile)), om.readTree(new File(instanceFile)))) {
+ System.out.println(instanceFile + " matches " + patternFile);
+ } else {
+ System.out.println(instanceFile + " does not match " + patternFile);
+ System.exit(1);
+ }
+ }
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ProfiledExecutionTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ProfiledExecutionTest.java
new file mode 100644
index 0000000..7c945fc
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ProfiledExecutionTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.test.runtime;
+
+import java.util.Collection;
+
+import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Runs the cluster state runtime tests with the storage parallelism.
+ */
+@RunWith(Parameterized.class)
+public class ProfiledExecutionTest {
+ protected static final String TEST_CONFIG_FILE_NAME = "src/test/resources/cc-single.conf";
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, new TestExecutor());
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ LangExecutionUtil.tearDown();
+ }
+
+ @Parameters(name = "ProfiledExecutionTest {index}: {0}")
+ public static Collection<Object[]> tests() throws Exception {
+ return LangExecutionUtil.buildTestsInXml("profiled.xml");
+ }
+
+ protected TestCaseContext tcCtx;
+
+ public ProfiledExecutionTest(TestCaseContext tcCtx) {
+ this.tcCtx = tcCtx;
+ }
+
+ @Test
+ public void test() throws Exception {
+ LangExecutionUtil.test(tcCtx);
+ }
+}
diff --git a/asterixdb/asterix-app/src/test/resources/cc-single.conf b/asterixdb/asterix-app/src/test/resources/cc-single.conf
new file mode 100644
index 0000000..419abf7
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/cc-single.conf
@@ -0,0 +1,50 @@
+; Licensed to the Apache Software Foundation (ASF) under one
+; or more contributor license agreements. See the NOTICE file
+; distributed with this work for additional information
+; regarding copyright ownership. The ASF licenses this file
+; to you under the Apache License, Version 2.0 (the
+; "License"); you may not use this file except in compliance
+; with the License. You may obtain a copy of the License at
+;
+; http://www.apache.org/licenses/LICENSE-2.0
+;
+; Unless required by applicable law or agreed to in writing,
+; software distributed under the License is distributed on an
+; "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+; KIND, either express or implied. See the License for the
+; specific language governing permissions and limitations
+; under the License.
+
+[nc/asterix_nc1]
+txn.log.dir=target/tmp/asterix_nc1/txnlog
+core.dump.dir=target/tmp/asterix_nc1/coredump
+iodevices=target/tmp/asterix_nc1/iodevice1
+nc.api.port=19004
+#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5006
+
+[nc]
+address=127.0.0.1
+command=asterixnc
+app.class=org.apache.asterix.hyracks.bootstrap.NCApplication
+jvm.args=-Xmx4096m -Dnode.Resolver="org.apache.asterix.external.util.IdentitiyResolverFactory"
+storage.buffercache.pagesize=32KB
+storage.buffercache.size=48MB
+storage.memorycomponent.globalbudget=512MB
+
+[cc]
+address = 127.0.0.1
+app.class=org.apache.asterix.hyracks.bootstrap.CCApplication
+heartbeat.period=2000
+heartbeat.max.misses=25
+
+[common]
+log.dir = logs/
+log.level = INFO
+compiler.framesize=32KB
+compiler.sortmemory=320KB
+compiler.groupmemory=160KB
+compiler.joinmemory=256KB
+compiler.textsearchmemory=160KB
+compiler.windowmemory=192KB
+messaging.frame.size=4096
+messaging.frame.count=512
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/profiled.xml b/asterixdb/asterix-app/src/test/resources/runtimets/profiled.xml
new file mode 100644
index 0000000..496927e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/profiled.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements. See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership. The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License. You may obtain a copy of the License at
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied. See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<test-suite xmlns="urn:xml.testframework.asterix.apache.org" ResultOffsetPath="results" QueryOffsetPath="queries_sqlpp"
+ QueryFileExtension=".sqlpp">
+ <test-group name="profile">
+ <test-case FilePath="profile">
+ <compilation-unit name="full-scan">
+ <parameter name="profile" value="true" />
+ <output-dir compare="Text">full-scan</output-dir>
+ </compilation-unit>
+ </test-case>
+ </test-group>
+</test-suite>
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/full-scan/full-scan.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/full-scan/full-scan.1.ddl.sqlpp
new file mode 100644
index 0000000..441bee4
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/full-scan/full-scan.1.ddl.sqlpp
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Processed objects metrics on full scan
+ * Expected Res : Success
+ * Date : 28 Sep 2017
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+
+use test;
+
+create type test.AddressType as
+{
+ number : bigint,
+ street : string,
+ city : string
+};
+
+create type test.CustomerType as
+ closed {
+ cid : bigint,
+ name : string,
+ age : bigint?,
+ address : AddressType?,
+ lastorder : {
+ oid : bigint,
+ total : float
+ }
+};
+
+create dataset Customers(CustomerType) primary key cid;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/full-scan/full-scan.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/full-scan/full-scan.2.update.sqlpp
new file mode 100644
index 0000000..2c5d5d9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/full-scan/full-scan.2.update.sqlpp
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Processed objects metrics on full scan
+ * Expected Res : Success
+ * Date : 28 Sep 2017
+ */
+
+use test;
+
+load dataset Customers using localfs
+ ((`path`=`asterix_nc1://data/custord-tiny/customer-tiny-neg.adm`),
+ (`format`=`adm`));
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/full-scan/full-scan.3.profile.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/full-scan/full-scan.3.profile.sqlpp
new file mode 100644
index 0000000..9db1d12
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/full-scan/full-scan.3.profile.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Processed objects metrics on full scan
+ * Expected Res : Success
+ * Date : 28 Sep 2017
+ */
+
+use test;
+
+select count(*) from Customers;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/full-scan/full-scan.4.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/full-scan/full-scan.4.ddl.sqlpp
new file mode 100644
index 0000000..4cccd9f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/full-scan/full-scan.4.ddl.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Processed objects metrics on full scan
+ * Expected Res : Success
+ * Date : 28 Sep 2017
+ */
+
+drop dataverse test;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/full-scan/full-scan.3.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/full-scan/full-scan.3.regexjson
new file mode 100644
index 0000000..82e7128
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/full-scan/full-scan.3.regexjson
@@ -0,0 +1,64 @@
+{
+ "job-id": "R{[A-Z0-9.:]+}",
+ "counters": [],
+ "joblets": [
+ {
+ "node-id": "R{.+}",
+ "counters": [],
+ "tasks": [
+ {
+ "activity-id": "R{[A-Z0-9.:]+}",
+ "partition": "R{[0-9]+}",
+ "attempt": "R{[0-9]+}",
+ "partition-send-profile": [
+ {
+ "partition-id": {
+ "job-id": "R{[A-Z0-9.:]+}",
+ "connector-id": "R{[A-Z0-9.:]+}",
+ "sender-index": "R{[0-9]+}",
+ "receiver-index": "R{[0-9]+}"
+ },
+ "open-time": "R{[0-9]+}",
+ "close-time": "R{[0-9]+}",
+ "offset": "R{[0-9]+}",
+ "frame-times": [
+ 0
+ ],
+ "resolution": 1
+ }
+ ],
+ "counters": [
+ {
+ "name": "Empty Tuple Source",
+ "time": "R{[0-9.]+}"
+ },
+ {
+ "name": "Index Search",
+ "time": "R{[0-9.]+}"
+ },
+ {
+ "name": "R{.+}",
+ "time": "R{[0-9.]+}"
+ }
+ ]
+ },
+ {
+ "activity-id": "R{[A-Z0-9.:]+}",
+ "partition": "R{[0-9]+}",
+ "attempt": "R{[0-9]+}",
+ "partition-send-profile": [],
+ "counters": [
+ {
+ "name": "R{.+}",
+ "time": "R{[0-9.]+}"
+ },
+ {
+ "name": "Result Writer",
+ "time": "R{[0-9.]+}"
+ }
+ ]
+ }
+ ]
+ }
+ ]
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/builders/IARecordBuilder.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/builders/IARecordBuilder.java
index 6d648b1..ea29643 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/builders/IARecordBuilder.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/builders/IARecordBuilder.java
@@ -20,9 +20,7 @@
package org.apache.asterix.builders;
import java.io.DataOutput;
-import java.io.IOException;
-import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.om.types.ARecordType;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.IValueReference;
@@ -55,7 +53,7 @@
* The field name.
* @param value
* The field value.
- * @throws AsterixException
+ * @throws HyracksDataException
* if the field name conflicts with a closed field name
*/
public void addField(IValueReference name, IValueReference value) throws HyracksDataException;
@@ -66,8 +64,7 @@
* @param writeTypeTag
* Whether to write a typetag as part of the record's serialized
* representation.
- * @throws IOException
- * @throws AsterixException
+ * @throws HyracksDataException
* if any open field names conflict with each other
*/
public void write(DataOutput out, boolean writeTypeTag) throws HyracksDataException;
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/constructors/OpenRecordConstructorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/constructors/OpenRecordConstructorDescriptor.java
index 2977470..a98e2dd 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/constructors/OpenRecordConstructorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/constructors/OpenRecordConstructorDescriptor.java
@@ -118,4 +118,9 @@
}
};
}
+
+ @Override
+ public String toString() {
+ return "Open Record Constructor";
+ }
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
index 7887dde..07d4e94 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
@@ -28,6 +28,7 @@
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.EnforceFrameWriter;
+import org.apache.hyracks.api.dataflow.TimedFrameWriter;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobFlag;
@@ -60,15 +61,19 @@
public IAggregatorDescriptor createAggregator(final IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults,
final IFrameWriter writer, long memoryBudget) throws HyracksDataException {
+ final boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT);
+ final boolean profile = ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME);
final RunningAggregatorOutput outputWriter =
new RunningAggregatorOutput(ctx, subplans, keyFieldIdx.length + decorFieldIdx.length, writer);
- // should enforce protocol
- boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT);
- IFrameWriter enforcedWriter = enforce ? EnforceFrameWriter.enforce(outputWriter) : outputWriter;
+ IFrameWriter fw = outputWriter;
+ if (profile) {
+ fw = TimedFrameWriter.time(outputWriter, ctx, "Aggregate Writer");
+ } else if (enforce) {
+ fw = EnforceFrameWriter.enforce(outputWriter);
+ }
final NestedTupleSourceRuntime[] pipelines = new NestedTupleSourceRuntime[subplans.length];
for (int i = 0; i < subplans.length; i++) {
- pipelines[i] = (NestedTupleSourceRuntime) PipelineAssembler.assemblePipeline(subplans[i], enforcedWriter,
- ctx, null);
+ pipelines[i] = (NestedTupleSourceRuntime) PipelineAssembler.assemblePipeline(subplans[i], fw, ctx, null);
}
final ArrayTupleBuilder gbyTb = outputWriter.getGroupByTupleBuilder();
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
index 07365db..b3fef7f 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
@@ -121,6 +121,11 @@
throw exception;
}
}
+
+ @Override
+ public String getDisplayName() {
+ return "Empty Tuple Source";
+ }
}
private IOperatorNodePushable createOneInputOneOutputPushRuntime(final IHyracksTaskContext ctx,
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
index f5477ec..81f5d08 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
@@ -56,13 +56,13 @@
public IFrameWriter assemblePipeline(IFrameWriter writer, IHyracksTaskContext ctx) throws HyracksDataException {
// should enforce protocol
boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT);
+ boolean profile = ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME);
// plug the operators
IFrameWriter start = writer;// this.writer;
IPushRuntimeFactory[] runtimeFactories = pipeline.getRuntimeFactories();
RecordDescriptor[] recordDescriptors = pipeline.getRecordDescriptors();
for (int i = runtimeFactories.length - 1; i >= 0; i--) {
- start = enforce ? EnforceFrameWriter.enforce(start) : start;
-
+ start = (enforce && !profile) ? EnforceFrameWriter.enforce(start) : start;
IPushRuntimeFactory runtimeFactory = runtimeFactories[i];
IPushRuntime[] newRuntimes = runtimeFactory.createPushRuntime(ctx);
for (int j = 0; j < newRuntimes.length; j++) {
@@ -99,12 +99,13 @@
IHyracksTaskContext ctx, Map<IPushRuntimeFactory, IPushRuntime> outRuntimeMap) throws HyracksDataException {
// should enforce protocol
boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT);
+ boolean profile = ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME);
// plug the operators
IFrameWriter start = writer;
IPushRuntimeFactory[] runtimeFactories = subplan.getRuntimeFactories();
RecordDescriptor[] recordDescriptors = subplan.getRecordDescriptors();
for (int i = runtimeFactories.length - 1; i >= 0; i--) {
- start = enforce ? EnforceFrameWriter.enforce(start) : start;
+ start = (enforce && !profile) ? EnforceFrameWriter.enforce(start) : start;
IPushRuntimeFactory runtimeFactory = runtimeFactories[i];
IPushRuntime[] newRuntimes = runtimeFactory.createPushRuntime(ctx);
IPushRuntime newRuntime = enforce ? EnforcePushRuntime.enforce(newRuntimes[0]) : newRuntimes[0];
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
index 3cdc5aed..5343069 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
@@ -79,7 +79,7 @@
if (i > 0) {
sb.append(", ");
}
- sb.append(evalFactories[i]);
+ sb.append(evalFactories[i].toString());
}
sb.append("]");
return sb.toString();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/counters/Counter.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/com/job/profiling/counters/Counter.java
similarity index 94%
rename from hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/counters/Counter.java
rename to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/com/job/profiling/counters/Counter.java
index 0bcb246..0490576 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/counters/Counter.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/com/job/profiling/counters/Counter.java
@@ -16,12 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.hyracks.control.common.job.profiling.counters;
+package org.apache.hyracks.api.com.job.profiling.counters;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hyracks.api.job.profiling.counters.ICounter;
+@SuppressWarnings("squid:S1700")
public class Counter implements ICounter {
private static final long serialVersionUID = -3935601595055562080L;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IPassableTimer.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IPassableTimer.java
new file mode 100644
index 0000000..6afbccb
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IPassableTimer.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.dataflow;
+
+public interface IPassableTimer {
+ /*
+ A timer intended to be used for timing the individual components of a
+ pipelined process. An instance of IPassableTimer is held by each method
+ in the pipeline, and is paused() when that method passes off control to
+ a component above it, and is resume()d when the component above it returns.
+ */
+
+ void pause();
+
+ void resume();
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/TimedFrameWriter.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/TimedFrameWriter.java
new file mode 100644
index 0000000..83a4b34
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/TimedFrameWriter.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.dataflow;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.profiling.IStatsCollector;
+import org.apache.hyracks.api.job.profiling.counters.ICounter;
+
+public class TimedFrameWriter implements IFrameWriter, IPassableTimer {
+
+ // The downstream data consumer of this writer.
+ private final IFrameWriter writer;
+ private long frameStart = 0;
+ final ICounter counter;
+ final IStatsCollector collector;
+ final String name;
+
+ public TimedFrameWriter(IFrameWriter writer, IStatsCollector collector, String name, ICounter counter) {
+ this.writer = writer;
+ this.collector = collector;
+ this.name = name;
+ this.counter = counter;
+ }
+
+ protected TimedFrameWriter(IFrameWriter writer, IStatsCollector collector, String name) {
+ this(writer, collector, name, collector.getOrAddOperatorStats(name).getTimeCounter());
+ }
+
+ @Override
+ public final void open() throws HyracksDataException {
+ try {
+ startClock();
+ writer.open();
+ } finally {
+ stopClock();
+ }
+ }
+
+ @Override
+ public final void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ try {
+ startClock();
+ writer.nextFrame(buffer);
+ } finally {
+ stopClock();
+ }
+ }
+
+ @Override
+ public final void flush() throws HyracksDataException {
+ try {
+ startClock();
+ writer.flush();
+ } finally {
+ stopClock();
+ }
+ }
+
+ @Override
+ public final void fail() throws HyracksDataException {
+ writer.fail();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ try {
+ startClock();
+ writer.close();
+ } finally {
+ stopClock();
+ }
+ }
+
+ private void stopClock() {
+ pause();
+ collector.giveClock(this);
+ }
+
+ private void startClock() {
+ if (frameStart > 0) {
+ return;
+ }
+ frameStart = collector.takeClock(this);
+ }
+
+ @Override
+ public void resume() {
+ if (frameStart > 0) {
+ return;
+ }
+ long nt = System.nanoTime();
+ frameStart = nt;
+ }
+
+ @Override
+ public void pause() {
+ if (frameStart > 1) {
+ long nt = System.nanoTime();
+ long delta = nt - frameStart;
+ counter.update(delta);
+ frameStart = -1;
+ }
+ }
+
+ public static IFrameWriter time(IFrameWriter writer, IHyracksTaskContext ctx, String name)
+ throws HyracksDataException {
+ return writer instanceof TimedFrameWriter ? writer
+ : new TimedFrameWriter(writer, ctx.getStatsCollector(), name);
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/TimedOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/TimedOperatorNodePushable.java
new file mode 100644
index 0000000..2d46bea
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/TimedOperatorNodePushable.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.dataflow;
+
+import java.util.HashMap;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.profiling.IStatsCollector;
+import org.apache.hyracks.api.rewriter.runtime.SuperActivityOperatorNodePushable;
+
+public class TimedOperatorNodePushable extends TimedFrameWriter implements IOperatorNodePushable, IPassableTimer {
+
+ IOperatorNodePushable op;
+ HashMap<Integer, IFrameWriter> inputs;
+ long frameStart;
+
+ TimedOperatorNodePushable(IOperatorNodePushable op, IStatsCollector collector) throws HyracksDataException {
+ super(null, collector, op.getDisplayName());
+ this.op = op;
+ inputs = new HashMap<>();
+ }
+
+ @Override
+ public void initialize() throws HyracksDataException {
+ synchronized (collector) {
+ startClock();
+ op.initialize();
+ stopClock();
+ }
+ }
+
+ @Override
+ public void deinitialize() throws HyracksDataException {
+ synchronized (collector) {
+ startClock();
+ op.deinitialize();
+ stopClock();
+ }
+ }
+
+ @Override
+ public int getInputArity() {
+ return op.getInputArity();
+ }
+
+ @Override
+ public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc)
+ throws HyracksDataException {
+ op.setOutputFrameWriter(index, writer, recordDesc);
+ }
+
+ @Override
+ public IFrameWriter getInputFrameWriter(int index) {
+ IFrameWriter ifw = op.getInputFrameWriter(index);
+ if (!(op instanceof TimedFrameWriter) && ifw.equals(op)) {
+ return new TimedFrameWriter(op.getInputFrameWriter(index), collector, op.getDisplayName(), counter);
+ }
+ return op.getInputFrameWriter(index);
+ }
+
+ @Override
+ public String getDisplayName() {
+ return op.getDisplayName();
+ }
+
+ private void stopClock() {
+ pause();
+ collector.giveClock(this);
+ }
+
+ private void startClock() {
+ if (frameStart > 0) {
+ return;
+ }
+ frameStart = collector.takeClock(this);
+ }
+
+ @Override
+ public void resume() {
+ if (frameStart > 0) {
+ return;
+ }
+ long nt = System.nanoTime();
+ frameStart = nt;
+ }
+
+ @Override
+ public void pause() {
+ if (frameStart > 0) {
+ long nt = System.nanoTime();
+ long delta = nt - frameStart;
+ counter.update(delta);
+ frameStart = -1;
+ }
+ }
+
+ public static IOperatorNodePushable time(IOperatorNodePushable op, IHyracksTaskContext ctx)
+ throws HyracksDataException {
+ if (!(op instanceof TimedOperatorNodePushable) && !(op instanceof SuperActivityOperatorNodePushable)) {
+ return new TimedOperatorNodePushable(op, ctx.getStatsCollector());
+ }
+ return op;
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IStatsCollector.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IStatsCollector.java
index 1e73146..8930d34 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IStatsCollector.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IStatsCollector.java
@@ -19,7 +19,9 @@
package org.apache.hyracks.api.job.profiling;
import java.io.Serializable;
+import java.util.Map;
+import org.apache.hyracks.api.dataflow.IPassableTimer;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IWritable;
@@ -36,13 +38,34 @@
/**
* @param operatorName
* @return {@link IOperatorStats} for the operator with name <code>operatorName</code>
- * if one exists or else null.
+ * if it already exists, and adds it if it does not.
*/
- IOperatorStats getOperatorStats(String operatorName);
+ IOperatorStats getOrAddOperatorStats(String operatorName);
+
+ /**
+ * Get every registered operator stats object
+ * @return All registered operators, and their collected stats, with the names as keys and stats as values
+ */
+ Map<String, IOperatorStats> getAllOperatorStats();
/**
* @return A special {@link IOperatorStats} that has the aggregated stats
* from all operators in the collection.
*/
IOperatorStats getAggregatedStats();
+
+ /**
+ * Pause an operator's timer, to pass it to another operator
+ * @param newHolder the timer that is starting execution
+ * @return the current nanoTime when the clock was taken from the other operator
+ */
+ long takeClock(IPassableTimer newHolder);
+
+ /**
+ * Resume an operator's timer, when a downstream operator has finished execution of
+ * the method the upstream operator called
+ * @param currHolder the timer that needs to be paused
+ */
+ void giveClock(IPassableTimer currHolder);
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/OperatorStats.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/OperatorStats.java
similarity index 87%
rename from hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/OperatorStats.java
rename to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/OperatorStats.java
index 66fb797..08c1adc 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/OperatorStats.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/OperatorStats.java
@@ -16,15 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.hyracks.control.common.job.profiling;
+package org.apache.hyracks.api.job.profiling;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import org.apache.hyracks.api.job.profiling.IOperatorStats;
+import org.apache.hyracks.api.com.job.profiling.counters.Counter;
import org.apache.hyracks.api.job.profiling.counters.ICounter;
-import org.apache.hyracks.control.common.job.profiling.counters.Counter;
public class OperatorStats implements IOperatorStats {
private static final long serialVersionUID = 6401830963367567167L;
@@ -85,4 +84,10 @@
timeCounter.set(input.readLong());
diskIoCounter.set(input.readLong());
}
+
+ @Override
+ public String toString() {
+ return "{ " + "\"operatorName\": \"" + operatorName + "\", " + "\"" + tupleCounter.getName() + "\": "
+ + tupleCounter.get() + ", \"" + timeCounter.getName() + "\": " + timeCounter.get() + " }";
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index 12d696e..168d7dd 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -22,11 +22,13 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
+import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -41,6 +43,7 @@
import org.apache.hyracks.api.dataflow.IActivity;
import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.TimedOperatorNodePushable;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -94,15 +97,22 @@
}
private void init() throws HyracksDataException {
- Queue<Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> childQueue = new LinkedList<>();
+ LinkedList<Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> childQueue = new LinkedList<>();
List<IConnectorDescriptor> outputConnectors;
final boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT);
+ final boolean profile = ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME);
/*
* Set up the source operators
*/
+ Set<Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> sources = new HashSet<>();
for (Entry<ActivityId, IActivity> entry : startActivities.entrySet()) {
- IOperatorNodePushable opPushable =
- entry.getValue().createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
+ IOperatorNodePushable opPushable = null;
+ if (profile) {
+ opPushable = TimedOperatorNodePushable
+ .time(entry.getValue().createPushRuntime(ctx, recordDescProvider, partition, nPartitions), ctx);
+ } else {
+ opPushable = entry.getValue().createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
+ }
operatorNodePushablesBFSOrder.add(opPushable);
operatorNodePushables.put(entry.getKey(), opPushable);
inputArity += opPushable.getInputArity();
@@ -110,6 +120,7 @@
MapUtils.getObject(parent.getActivityOutputMap(), entry.getKey(), Collections.emptyList());
for (IConnectorDescriptor conn : outputConnectors) {
childQueue.add(parent.getConnectorActivityMap().get(conn.getConnectorId()));
+ sources.add(childQueue.peekLast());
}
}
@@ -128,8 +139,13 @@
IOperatorNodePushable sourceOp = operatorNodePushables.get(sourceId);
IOperatorNodePushable destOp = operatorNodePushables.get(destId);
if (destOp == null) {
- destOp = channel.getRight().getLeft().createPushRuntime(ctx, recordDescProvider, partition,
- nPartitions);
+ if (profile) {
+ destOp = TimedOperatorNodePushable.time(channel.getRight().getLeft().createPushRuntime(ctx,
+ recordDescProvider, partition, nPartitions), ctx);
+ } else {
+ destOp = channel.getRight().getLeft().createPushRuntime(ctx, recordDescProvider, partition,
+ nPartitions);
+ }
operatorNodePushablesBFSOrder.add(destOp);
operatorNodePushables.put(destId, destOp);
}
@@ -138,7 +154,7 @@
* construct the dataflow connection from a producer to a consumer
*/
IFrameWriter writer = destOp.getInputFrameWriter(inputChannel);
- writer = enforce ? EnforceFrameWriter.enforce(writer) : writer;
+ writer = (enforce && !profile) ? EnforceFrameWriter.enforce(writer) : writer;
sourceOp.setOutputFrameWriter(outputChannel, writer,
recordDescProvider.getInputRecordDescriptor(destId, inputChannel));
diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/stats/AggregateCounter.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/stats/AggregateCounter.java
index 6cb90e3..1a878a0 100644
--- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/stats/AggregateCounter.java
+++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/stats/AggregateCounter.java
@@ -19,7 +19,7 @@
package org.apache.hyracks.client.stats;
-import org.apache.hyracks.control.common.job.profiling.counters.Counter;
+import org.apache.hyracks.api.com.job.profiling.counters.Counter;
public class AggregateCounter extends Counter {
private static final long serialVersionUID = 9140555872026977436L;
diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/stats/impl/ClientCounterContext.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/stats/impl/ClientCounterContext.java
index 0d3affc..84831e2 100644
--- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/stats/impl/ClientCounterContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/stats/impl/ClientCounterContext.java
@@ -30,11 +30,11 @@
import java.util.List;
import java.util.Map;
+import org.apache.hyracks.api.com.job.profiling.counters.Counter;
import org.apache.hyracks.api.job.profiling.counters.ICounter;
import org.apache.hyracks.client.stats.AggregateCounter;
import org.apache.hyracks.client.stats.Counters;
import org.apache.hyracks.client.stats.IClusterCounterContext;
-import org.apache.hyracks.control.common.job.profiling.counters.Counter;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java
index 8da19ac..878d7c3 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java
@@ -21,20 +21,25 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.util.HashMap;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.LinkedHashMap;
import java.util.Map;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.dataflow.IPassableTimer;
import org.apache.hyracks.api.job.profiling.IOperatorStats;
import org.apache.hyracks.api.job.profiling.IStatsCollector;
+import org.apache.hyracks.api.job.profiling.OperatorStats;
public class StatsCollector implements IStatsCollector {
- private static final long serialVersionUID = 6858817639895434577L;
+ private static final long serialVersionUID = 6858817639895434578L;
- private final Map<String, IOperatorStats> operatorStatsMap = new HashMap<>();
+ private final Map<String, IOperatorStats> operatorStatsMap = new LinkedHashMap<>();
+ private transient Deque<IPassableTimer> clockHolder = new ArrayDeque<>();
@Override
- public void add(IOperatorStats operatorStats) throws HyracksDataException {
+ public void add(IOperatorStats operatorStats) {
if (operatorStatsMap.containsKey(operatorStats.getName())) {
throw new IllegalArgumentException("Operator with the same name already exists");
}
@@ -42,8 +47,13 @@
}
@Override
- public IOperatorStats getOperatorStats(String operatorName) {
- return operatorStatsMap.get(operatorName);
+ public IOperatorStats getOrAddOperatorStats(String operatorName) {
+ return operatorStatsMap.computeIfAbsent(operatorName, OperatorStats::new);
+ }
+
+ @Override
+ public Map<String, IOperatorStats> getAllOperatorStats() {
+ return Collections.unmodifiableMap(operatorStatsMap);
}
public static StatsCollector create(DataInput input) throws IOException {
@@ -79,4 +89,24 @@
operatorStatsMap.put(opStats.getName(), opStats);
}
}
+
+ @Override
+ public long takeClock(IPassableTimer newHolder) {
+ if (newHolder != null) {
+ if (clockHolder.peek() != null) {
+ clockHolder.peek().pause();
+ }
+ clockHolder.push(newHolder);
+ }
+ return System.nanoTime();
+ }
+
+ @Override
+ public void giveClock(IPassableTimer currHolder) {
+ clockHolder.removeLastOccurrence(currHolder);
+ if (clockHolder.peek() != null) {
+ clockHolder.peek().resume();
+ }
+ }
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java
index 6dfb71d..1e7a3b7 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java
@@ -21,6 +21,7 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.text.DecimalFormat;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -29,6 +30,7 @@
import org.apache.hyracks.api.dataflow.TaskAttemptId;
import org.apache.hyracks.api.exceptions.Warning;
+import org.apache.hyracks.api.job.profiling.IOperatorStats;
import org.apache.hyracks.api.job.profiling.IStatsCollector;
import org.apache.hyracks.api.partitions.PartitionId;
import org.apache.hyracks.control.common.job.profiling.StatsCollector;
@@ -116,10 +118,24 @@
json.set("partition-send-profile", pspArray);
}
populateCounters(json);
-
return json;
}
+ @Override
+ protected void populateCounters(ObjectNode json) {
+ ObjectMapper om = new ObjectMapper();
+ Map<String, IOperatorStats> opTimes = statsCollector.getAllOperatorStats();
+ ArrayNode countersObj = om.createArrayNode();
+ opTimes.forEach((key, value) -> {
+ ObjectNode jpe = om.createObjectNode();
+ jpe.put("name", key);
+ jpe.put("time", Double
+ .parseDouble(new DecimalFormat("#.####").format((double) value.getTimeCounter().get() / 1000000)));
+ countersObj.add(jpe);
+ });
+ json.set("counters", countersObj);
+ }
+
public IStatsCollector getStatsCollector() {
return statsCollector;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
index 8afdc9e..d9b3961 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
@@ -27,6 +27,7 @@
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.com.job.profiling.counters.Counter;
import org.apache.hyracks.api.comm.IPartitionCollector;
import org.apache.hyracks.api.comm.PartitionChannel;
import org.apache.hyracks.api.context.IHyracksJobletContext;
@@ -53,7 +54,6 @@
import org.apache.hyracks.control.common.job.PartitionRequest;
import org.apache.hyracks.control.common.job.PartitionState;
import org.apache.hyracks.control.common.job.profiling.StatsCollector;
-import org.apache.hyracks.control.common.job.profiling.counters.Counter;
import org.apache.hyracks.control.common.job.profiling.om.JobletProfile;
import org.apache.hyracks.control.common.job.profiling.om.TaskProfile;
import org.apache.hyracks.control.nc.io.WorkspaceFileFactory;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 81b9d5e..00de038 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -36,6 +36,7 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hyracks.api.com.job.profiling.counters.Counter;
import org.apache.hyracks.api.comm.IFrameReader;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.IPartitionCollector;
@@ -66,7 +67,6 @@
import org.apache.hyracks.api.util.JavaSerializationUtils;
import org.apache.hyracks.control.common.job.PartitionState;
import org.apache.hyracks.control.common.job.profiling.StatsCollector;
-import org.apache.hyracks.control.common.job.profiling.counters.Counter;
import org.apache.hyracks.control.common.job.profiling.om.PartitionProfile;
import org.apache.hyracks.control.common.job.profiling.om.TaskProfile;
import org.apache.hyracks.control.nc.io.WorkspaceFileFactory;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
index 09a4c18..69f1113 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
@@ -169,6 +169,7 @@
List<IConnectorDescriptor> outputs = ac.getActivityOutputMap().get(aid);
if (outputs != null) {
final boolean enforce = flags.contains(JobFlag.ENFORCE_CONTRACT);
+ final boolean profile = flags.contains(JobFlag.PROFILE_RUNTIME);
for (int i = 0; i < outputs.size(); ++i) {
final IConnectorDescriptor conn = outputs.get(i);
RecordDescriptor recordDesc = ac.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
@@ -179,7 +180,7 @@
LOGGER.trace("input: {}: {}", i, conn.getConnectorId());
IFrameWriter writer = conn.createPartitioner(task, recordDesc, pwFactory, partition,
td.getPartitionCount(), td.getOutputPartitionCounts()[i]);
- writer = enforce ? EnforceFrameWriter.enforce(writer) : writer;
+ writer = (enforce && !profile) ? EnforceFrameWriter.enforce(writer) : writer;
operator.setOutputFrameWriter(i, writer, recordDesc);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java
index 7ca01a5..c545e7d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java
@@ -30,12 +30,14 @@
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
import org.apache.hyracks.dataflow.std.sort.AbstractExternalSortRunMerger;
-import org.apache.hyracks.dataflow.std.sort.AbstractSortRunGenerator;
import org.apache.hyracks.dataflow.std.sort.AbstractSorterOperatorDescriptor;
import org.apache.hyracks.dataflow.std.sort.Algorithm;
+import org.apache.hyracks.dataflow.std.sort.IRunGenerator;
+import org.apache.hyracks.dataflow.std.sort.TimedRunGenerator;
/**
* This Operator pushes group-by aggregation into the external sort.
@@ -122,7 +124,7 @@
RecordDescriptor outRecordDesc, boolean finalStage) {
super(spec, framesLimit, sortFields, keyNormalizerFactories, comparatorFactories, outRecordDesc);
if (framesLimit <= 1) {
- throw new IllegalStateException();// minimum of 2 fames (1 in,1 out)
+ throw new IllegalStateException(); // minimum of 2 frames (1 in,1 out)
}
this.groupFields = groupFields;
@@ -139,12 +141,14 @@
private static final long serialVersionUID = 1L;
@Override
- protected AbstractSortRunGenerator getRunGenerator(IHyracksTaskContext ctx,
+ protected IRunGenerator getRunGenerator(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescriptorProvider) throws HyracksDataException {
- return new ExternalSortGroupByRunGenerator(ctx, sortFields,
+ final boolean profile = ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME);
+ IRunGenerator runGen = new ExternalSortGroupByRunGenerator(ctx, sortFields,
recordDescriptorProvider.getInputRecordDescriptor(this.getActivityId(), 0), framesLimit,
groupFields, keyNormalizerFactories, comparatorFactories, partialAggregatorFactory,
partialAggRecordDesc, ALG);
+ return profile ? TimedRunGenerator.time(runGen, ctx, "GroupBy (Sort Runs)") : runGen;
}
};
}
@@ -165,4 +169,14 @@
}
};
}
+
+ @Override
+ public String getDisplayName() {
+ return "GroupBy (Sort)";
+ }
+
+ @Override
+ public String toString() {
+ return getDisplayName();
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 2fd17da..6d2b485 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -320,6 +320,11 @@
isFailed = true;
}
+ @Override
+ public String getDisplayName() {
+ return "Hybrid Hash Join: Build";
+ }
+
};
}
}
@@ -800,6 +805,11 @@
outerReader.close();
}
}
+
+ @Override
+ public String getDisplayName() {
+ return "Hybrid Hash Join: Probe & Join";
+ }
};
return op;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
index 8006fc6..a4cfa13 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
@@ -146,6 +146,11 @@
sb.append("\"maxReads\": ").append(maxReads).append(" }");
return sb.toString();
}
+
+ @Override
+ public String getDisplayName() {
+ return "Result Writer";
+ }
};
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java
index 6abc064..6ff79b7 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java
@@ -102,14 +102,14 @@
super(id);
}
- protected abstract AbstractSortRunGenerator getRunGenerator(IHyracksTaskContext ctx,
+ protected abstract IRunGenerator getRunGenerator(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider) throws HyracksDataException;
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
return new AbstractUnaryInputSinkOperatorNodePushable() {
- private AbstractSortRunGenerator runGen;
+ private IRunGenerator runGen;
@Override
public void open() throws HyracksDataException {
@@ -139,6 +139,11 @@
public void fail() throws HyracksDataException {
runGen.fail();
}
+
+ @Override
+ public String getDisplayName() {
+ return "Sort (Run Generation)";
+ }
};
}
}
@@ -155,6 +160,7 @@
IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer, int necessaryFrames);
@Override
+ @SuppressWarnings("squid:S1188")
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
return new AbstractUnaryOutputSourceOperatorNodePushable() {
@@ -203,6 +209,11 @@
}
}
}
+
+ @Override
+ public String getDisplayName() {
+ return "Sort (Run Merge)";
+ }
};
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
index 8b80a26..654f3a3 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
@@ -30,6 +30,7 @@
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
import org.apache.hyracks.dataflow.std.buffermanager.EnumFreeSlotPolicy;
@@ -76,10 +77,12 @@
private static final long serialVersionUID = 1L;
@Override
- protected AbstractSortRunGenerator getRunGenerator(IHyracksTaskContext ctx,
+ protected IRunGenerator getRunGenerator(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider) throws HyracksDataException {
- return new ExternalSortRunGenerator(ctx, sortFields, keyNormalizerFactories, comparatorFactories,
- outRecDescs[0], alg, policy, framesLimit, outputLimit);
+ final boolean profile = ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME);
+ IRunGenerator runGen = new ExternalSortRunGenerator(ctx, sortFields, keyNormalizerFactories,
+ comparatorFactories, outRecDescs[0], alg, policy, framesLimit, outputLimit);
+ return profile ? TimedRunGenerator.time(runGen, ctx, "ExternalSort(Sort)") : runGen;
}
};
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/IRunGenerator.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/IRunGenerator.java
index 02880dd..d43d02d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/IRunGenerator.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/IRunGenerator.java
@@ -33,4 +33,6 @@
* @return the list of generated (sorted) runs
*/
List<GeneratedRunFileReader> getRuns();
+
+ ISorter getSorter();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TimedRunGenerator.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TimedRunGenerator.java
new file mode 100644
index 0000000..b3a4aee
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TimedRunGenerator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.sort;
+
+import java.util.List;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.TimedFrameWriter;
+import org.apache.hyracks.api.job.profiling.IStatsCollector;
+import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
+
+public class TimedRunGenerator extends TimedFrameWriter implements IRunGenerator {
+
+ private final IRunGenerator runGenerator;
+
+ private TimedRunGenerator(IRunGenerator runGenerator, IStatsCollector collector, String name) {
+ super(runGenerator, collector, name);
+ this.runGenerator = runGenerator;
+ }
+
+ @Override
+ public List<GeneratedRunFileReader> getRuns() {
+ return runGenerator.getRuns();
+ }
+
+ @Override
+ public ISorter getSorter() {
+ return runGenerator.getSorter();
+ }
+
+ public static IRunGenerator time(IRunGenerator runGenerator, IHyracksTaskContext ctx, String name) {
+ return runGenerator instanceof TimedRunGenerator ? runGenerator
+ : new TimedRunGenerator(runGenerator, ctx.getStatsCollector(), name);
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java
index b29057f..b7ff530 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java
@@ -30,6 +30,7 @@
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
public class TopKSorterOperatorDescriptor extends AbstractSorterOperatorDescriptor {
@@ -59,10 +60,12 @@
private static final long serialVersionUID = 1L;
@Override
- protected AbstractSortRunGenerator getRunGenerator(IHyracksTaskContext ctx,
+ protected IRunGenerator getRunGenerator(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider) {
- return new HybridTopKSortRunGenerator(ctx, framesLimit, topK, sortFields, keyNormalizerFactories,
- comparatorFactories, outRecDescs[0]);
+ final boolean profile = ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME);
+ IRunGenerator runGen = new HybridTopKSortRunGenerator(ctx, framesLimit, topK, sortFields,
+ keyNormalizerFactories, comparatorFactories, outRecDescs[0]);
+ return profile ? TimedRunGenerator.time(runGen, ctx, "TopKSort (Sort)") : runGen;
}
};
@@ -82,4 +85,10 @@
}
};
}
+
+ @Override
+ public String getDisplayName() {
+ return "Top K Sort";
+ }
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java
index fd985db..6f59ed8 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -27,7 +27,6 @@
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
@@ -38,7 +37,6 @@
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.FileSplit;
import org.apache.hyracks.api.io.ManagedFileSplit;
-import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.result.IResultSet;
@@ -138,7 +136,7 @@
if (LOGGER.isInfoEnabled()) {
LOGGER.info(spec.toJSON().asText());
}
- JobId jobId = hcc.startJob(spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
+ JobId jobId = hcc.startJob(spec);
if (LOGGER.isInfoEnabled()) {
LOGGER.info(jobId.toString());
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
index 566d8e2..001e250 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
@@ -101,4 +101,9 @@
searchCallbackProceedResultTrueValue);
}
+ @Override
+ public String getDisplayName() {
+ return "BTree Search";
+ }
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java
index 12dc310..44af086 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java
@@ -36,6 +36,7 @@
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.profiling.IStatsCollector;
import org.apache.hyracks.api.test.CountAndThrowError;
import org.apache.hyracks.api.test.CountAndThrowException;
import org.apache.hyracks.api.test.CountAnswer;
@@ -307,6 +308,7 @@
IHyracksTaskContext ctx = Mockito.mock(IHyracksTaskContext.class);
IHyracksJobletContext jobletCtx = Mockito.mock(IHyracksJobletContext.class);
INCServiceContext serviceCtx = Mockito.mock(INCServiceContext.class);
+ IStatsCollector collector = Mockito.mock(IStatsCollector.class);
Mockito.when(ctx.allocateFrame()).thenReturn(mockByteBuffer());
Mockito.when(ctx.allocateFrame(Mockito.anyInt())).thenReturn(mockByteBuffer());
Mockito.when(ctx.getInitialFrameSize()).thenReturn(BUFFER_SIZE);
@@ -314,6 +316,7 @@
.thenReturn(mockByteBuffer());
Mockito.when(ctx.getJobletContext()).thenReturn(jobletCtx);
Mockito.when(jobletCtx.getServiceContext()).thenReturn(serviceCtx);
+ Mockito.when(ctx.getStatsCollector()).thenReturn(collector);
return new IHyracksTaskContext[] { ctx };
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/pom.xml b/hyracks-fullstack/hyracks/hyracks-storage-am-common/pom.xml
index 262570a..31463b7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/pom.xml
@@ -92,11 +92,6 @@
<version>${project.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.hyracks</groupId>
- <artifactId>hyracks-control-common</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
index d504a96..142e879 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
@@ -32,7 +32,6 @@
import org.apache.hyracks.api.job.profiling.IOperatorStats;
import org.apache.hyracks.api.util.CleanupUtils;
import org.apache.hyracks.api.util.ExceptionUtils;
-import org.apache.hyracks.control.common.job.profiling.OperatorStats;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -93,7 +92,7 @@
protected ArrayTupleBuilder nonFilterTupleBuild;
protected final ISearchOperationCallbackFactory searchCallbackFactory;
protected boolean failed = false;
- private final IOperatorStats stats;
+ private IOperatorStats stats;
// Used when the result of the search operation callback needs to be passed.
protected boolean appendSearchCallbackProceedResult;
@@ -150,13 +149,13 @@
this.appendSearchCallbackProceedResult = appendSearchCallbackProceedResult;
this.searchCallbackProceedResultFalseValue = searchCallbackProceedResultFalseValue;
this.searchCallbackProceedResultTrueValue = searchCallbackProceedResultTrueValue;
- stats = new OperatorStats(getDisplayName());
- if (ctx.getStatsCollector() != null) {
- ctx.getStatsCollector().add(stats);
- }
this.tupleFilterFactory = tupleFactoryFactory;
this.outputLimit = outputLimit;
+ if (ctx != null && ctx.getStatsCollector() != null) {
+ stats = ctx.getStatsCollector().getOrAddOperatorStats(getDisplayName());
+ }
+
if (this.tupleFilterFactory != null && this.retainMissing) {
throw new IllegalStateException("RetainMissing with tuple filter is not supported");
}
@@ -435,4 +434,9 @@
}
+ @Override
+ public String getDisplayName() {
+ return "Index Search";
+ }
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/CounterContext.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/CounterContext.java
index 7cbf948..267c69e 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/CounterContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/CounterContext.java
@@ -21,9 +21,9 @@
import java.util.HashMap;
import java.util.Map;
+import org.apache.hyracks.api.com.job.profiling.counters.Counter;
import org.apache.hyracks.api.job.profiling.counters.ICounter;
import org.apache.hyracks.api.job.profiling.counters.ICounterContext;
-import org.apache.hyracks.control.common.job.profiling.counters.Counter;
public class CounterContext implements ICounterContext {
private final String contextName;