[ASTERIXDB-2532][RT] per-operator profiling

Enables profiling in queries at the operator-level when the analyze
variable is set in a query.

Change-Id: Ie16f3901ae5b32920d8552d5fd1ec8bb6e2ec8ae
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3226
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
index 3718340..606abd2 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
@@ -18,6 +18,9 @@
  */
 package org.apache.asterix.translator;
 
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.rmi.RemoteException;
 import java.util.ArrayList;
@@ -43,6 +46,10 @@
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.result.ResultSetId;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
 /**
  * An interface that takes care of executing a list of statements that are submitted through an Asterix API
  */
@@ -78,11 +85,18 @@
     }
 
     class Stats implements Serializable {
-        private static final long serialVersionUID = 5885273238208454610L;
+        private static final long serialVersionUID = 5885273238208454611L;
+
+        public enum ProfileType {
+            COUNTS,
+            FULL
+        }
 
         private long count;
         private long size;
         private long processedObjects;
+        private Profile profile;
+        private ProfileType type;
         private long diskIoCount;
         private long totalWarningsCount;
 
@@ -125,6 +139,48 @@
         public void setTotalWarningsCount(long totalWarningsCount) {
             this.totalWarningsCount = totalWarningsCount;
         }
+
+        public void setJobProfile(ObjectNode profile) {
+            this.profile = new Profile(profile);
+        }
+
+        public ObjectNode getJobProfile() {
+            return profile != null ? profile.getProfile() : null;
+        }
+
+        public ProfileType getType() {
+            return type;
+        }
+
+        public void setType(ProfileType type) {
+            this.type = type;
+        }
+    }
+
+    class Profile implements Serializable {
+        private transient ObjectNode profile;
+
+        public Profile(ObjectNode profile) {
+            this.profile = profile;
+        }
+
+        private void writeObject(ObjectOutputStream out) throws IOException {
+            ObjectMapper om = new ObjectMapper();
+            out.writeUTF(om.writeValueAsString(profile));
+        }
+
+        private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+            ObjectMapper om = new ObjectMapper();
+            JsonNode inNode = om.readTree(in.readUTF());
+            if (!inNode.isObject()) {
+                throw new IOException("Deserialization error");
+            }
+            profile = (ObjectNode) inNode;
+        }
+
+        public ObjectNode getProfile() {
+            return profile;
+        }
     }
 
     /**
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/ResultMetadata.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/ResultMetadata.java
index a036696..9cf8e6e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/ResultMetadata.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/ResultMetadata.java
@@ -24,11 +24,14 @@
 import org.apache.hyracks.api.exceptions.Warning;
 import org.apache.hyracks.api.result.IResultMetadata;
 
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
 public class ResultMetadata implements IResultMetadata {
 
     private final SessionConfig.OutputFormat format;
     private long jobDuration;
     private long processedObjects;
+    private ObjectNode profile;
     private long diskIoCount;
     private Set<Warning> warnings;
     private long totalWarningsCount;
@@ -68,6 +71,14 @@
         return jobDuration;
     }
 
+    public void setJobProfile(ObjectNode profile) {
+        this.profile = profile;
+    }
+
+    public ObjectNode getJobProfile() {
+        return profile;
+    }
+
     /**
      * @return The reported warnings.
      */
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
index 27e685c..2c499c7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
@@ -92,7 +92,7 @@
                     new ExecuteStatementRequestMessage(ncCtx.getNodeId(), responseFuture.getFutureId(), queryLanguage,
                             statementsText, sessionOutput.config(), resultProperties.getNcToCcResultProperties(),
                             param.getClientContextID(), handleUrl, optionalParameters, statementParameters,
-                            param.isMultiStatement(), stmtCategoryRestrictionMask, requestReference);
+                            param.isMultiStatement(), param.isProfile(), stmtCategoryRestrictionMask, requestReference);
             execution.start();
             ncMb.sendMessageToPrimaryCC(requestMsg);
             try {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
index 52c62af..4980f5e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
@@ -22,11 +22,12 @@
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.asterix.api.common.ResultMetadata;
-import org.apache.asterix.app.result.ResponseMertics;
+import org.apache.asterix.app.result.ResponseMetrics;
 import org.apache.asterix.app.result.ResponsePrinter;
 import org.apache.asterix.app.result.ResultHandle;
 import org.apache.asterix.app.result.ResultReader;
 import org.apache.asterix.app.result.fields.MetricsPrinter;
+import org.apache.asterix.app.result.fields.ProfilePrinter;
 import org.apache.asterix.app.result.fields.ResultsPrinter;
 import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.translator.IStatementExecutor.Stats;
@@ -98,10 +99,13 @@
                 printer.begin();
                 printer.addResultPrinter(new ResultsPrinter(appCtx, resultReader, null, stats, sessionOutput));
                 printer.printResults();
-                ResponseMertics mertics = ResponseMertics.of(System.nanoTime() - elapsedStart,
+                ResponseMetrics metrics = ResponseMetrics.of(System.nanoTime() - elapsedStart,
                         metadata.getJobDuration(), stats.getCount(), stats.getSize(), metadata.getProcessedObjects(), 0,
                         metadata.getTotalWarningsCount(), metadata.getDiskIoCount());
-                printer.addFooterPrinter(new MetricsPrinter(mertics, HttpUtil.getPreferredCharset(request)));
+                printer.addFooterPrinter(new MetricsPrinter(metrics, HttpUtil.getPreferredCharset(request)));
+                if (metadata.getJobProfile() != null) {
+                    printer.addFooterPrinter(new ProfilePrinter(metadata.getJobProfile()));
+                }
                 printer.printFooters();
                 printer.end();
             } else {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceRequestParameters.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceRequestParameters.java
index 566133d..ff6dcbd 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceRequestParameters.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceRequestParameters.java
@@ -50,6 +50,7 @@
     private boolean logicalPlan;
     private boolean optimizedLogicalPlan;
     private boolean job;
+    private boolean profile;
     private boolean signature;
     private boolean multiStatement;
 
@@ -197,6 +198,14 @@
         this.job = job;
     }
 
+    public void setProfile(boolean profile) {
+        this.profile = profile;
+    }
+
+    public boolean isProfile() {
+        return profile;
+    }
+
     public boolean isSignature() {
         return signature;
     }
@@ -230,6 +239,7 @@
         object.put("logicalPlan", logicalPlan);
         object.put("optimizedLogicalPlan", optimizedLogicalPlan);
         object.put("job", job);
+        object.put("profile", profile);
         object.put("signature", signature);
         object.put("multiStatement", multiStatement);
         object.put("parseOnly", parseOnly);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index 1e8f111..d193b13 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -45,13 +45,14 @@
 import org.apache.asterix.algebra.base.ILangExtension;
 import org.apache.asterix.app.result.ExecutionError;
 import org.apache.asterix.app.result.ExecutionWarning;
-import org.apache.asterix.app.result.ResponseMertics;
+import org.apache.asterix.app.result.ResponseMetrics;
 import org.apache.asterix.app.result.ResponsePrinter;
 import org.apache.asterix.app.result.fields.ClientContextIdPrinter;
 import org.apache.asterix.app.result.fields.ErrorsPrinter;
 import org.apache.asterix.app.result.fields.MetricsPrinter;
 import org.apache.asterix.app.result.fields.ParseOnlyResultPrinter;
 import org.apache.asterix.app.result.fields.PlansPrinter;
+import org.apache.asterix.app.result.fields.ProfilePrinter;
 import org.apache.asterix.app.result.fields.RequestIdPrinter;
 import org.apache.asterix.app.result.fields.SignaturePrinter;
 import org.apache.asterix.app.result.fields.StatusPrinter;
@@ -68,7 +69,6 @@
 import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.CompilationException;
-import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.compiler.provider.ILangCompilationProvider;
 import org.apache.asterix.lang.aql.parser.TokenMgrError;
@@ -171,6 +171,7 @@
         PARSE_ONLY("parse-only"),
         READ_ONLY("readonly"),
         JOB("job"),
+        PROFILE("profile"),
         SIGNATURE("signature"),
         MULTI_STATEMENT("multi-statement");
 
@@ -383,6 +384,7 @@
         param.setReadOnly(getOptBoolean(jsonRequest, Parameter.READ_ONLY, false));
         param.setOptimizedLogicalPlan(getOptBoolean(jsonRequest, Parameter.OPTIMIZED_LOGICAL_PLAN, false));
         param.setJob(getOptBoolean(jsonRequest, Parameter.JOB, false));
+        param.setProfile(getOptBoolean(jsonRequest, Parameter.PROFILE, false));
         param.setSignature(getOptBoolean(jsonRequest, Parameter.SIGNATURE, true));
         param.setStatementParams(
                 getOptStatementParameters(jsonRequest, jsonRequest.fieldNames(), JsonNode::get, v -> v));
@@ -405,8 +407,15 @@
         param.setTimeout(getParameter(request, Parameter.TIMEOUT));
         param.setMaxResultReads(getParameter(request, Parameter.MAX_RESULT_READS));
         param.setPlanFormat(getParameter(request, Parameter.PLAN_FORMAT));
+        param.setExpressionTree(getOptBoolean(request, Parameter.EXPRESSION_TREE, false));
+        param.setRewrittenExpressionTree(getOptBoolean(request, Parameter.REWRITTEN_EXPRESSION_TREE, false));
+        param.setLogicalPlan(getOptBoolean(request, Parameter.LOGICAL_PLAN, false));
         param.setParseOnly(getOptBoolean(request, Parameter.PARSE_ONLY, false));
         param.setReadOnly(getOptBoolean(request, Parameter.READ_ONLY, false));
+        param.setOptimizedLogicalPlan(getOptBoolean(request, Parameter.OPTIMIZED_LOGICAL_PLAN, false));
+        param.setJob(getOptBoolean(request, Parameter.JOB, false));
+        param.setProfile(getOptBoolean(request, Parameter.PROFILE, false));
+        param.setSignature(getOptBoolean(request, Parameter.SIGNATURE, true));
         param.setMultiStatement(getOptBoolean(request, Parameter.MULTI_STATEMENT, true));
         try {
             param.setStatementParams(getOptStatementParameters(request, request.getParameterNames().iterator(),
@@ -509,6 +518,7 @@
                         .serializeParameterValues(param.getStatementParams());
                 setAccessControlHeaders(request, response);
                 response.setStatus(execution.getHttpStatus());
+                stats.setType(param.isProfile() ? Stats.ProfileType.FULL : Stats.ProfileType.COUNTS);
                 executeStatement(requestRef, statementsText, sessionOutput, resultProperties, stats, param, execution,
                         optionalParams, statementParams, responsePrinter, warnings);
             }
@@ -561,15 +571,18 @@
             // in case of ASYNC delivery, the status is printed by query translator
             responsePrinter.addFooterPrinter(new StatusPrinter(execution.getResultStatus()));
         }
-        final ResponseMertics mertics = ResponseMertics.of(System.nanoTime() - elapsedStart, execution.duration(),
+        final ResponseMetrics metrics = ResponseMetrics.of(System.nanoTime() - elapsedStart, execution.duration(),
                 stats.getCount(), stats.getSize(), stats.getProcessedObjects(), errorCount,
                 stats.getTotalWarningsCount(), stats.getDiskIoCount());
-        responsePrinter.addFooterPrinter(new MetricsPrinter(mertics, resultCharset));
+        responsePrinter.addFooterPrinter(new MetricsPrinter(metrics, resultCharset));
+        if (stats.getType() == Stats.ProfileType.FULL) {
+            responsePrinter.addFooterPrinter(new ProfilePrinter(stats.getJobProfile()));
+        }
     }
 
     protected void validateStatement(String statement) throws RuntimeDataException {
         if (statement == null || statement.isEmpty()) {
-            throw new RuntimeDataException(ErrorCode.NO_STATEMENT_PROVIDED);
+            throw new RuntimeDataException(NO_STATEMENT_PROVIDED);
         }
     }
 
@@ -581,8 +594,7 @@
         Query query = (Query) stmts.get(stmts.size() - 1);
         Set<VariableExpr> extVars =
                 compilationProvider.getRewriterFactory().createQueryRewriter().getExternalVariables(query.getBody());
-        ResultUtil.ParseOnlyResult parseOnlyResult = new ResultUtil.ParseOnlyResult(extVars);
-        return parseOnlyResult;
+        return new ResultUtil.ParseOnlyResult(extVars);
     }
 
     protected void executeStatement(IRequestReference requestReference, String statementsText,
@@ -697,10 +709,9 @@
 
     public static String extractStatementParameterName(String name) {
         int ln = name.length();
-        if (ln > 1 && name.charAt(0) == '$' && Character.isLetter(name.charAt(1))) {
-            if (ln == 2 || isStatementParameterNameRest(name, 2)) {
-                return name.substring(1);
-            }
+        if ((ln == 2 || isStatementParameterNameRest(name, 2)) && name.charAt(0) == '$'
+                && Character.isLetter(name.charAt(1))) {
+            return name.substring(1);
         }
         return null;
     }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
index b666085..4e2fea0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
@@ -19,6 +19,9 @@
 
 package org.apache.asterix.app.message;
 
+import static org.apache.asterix.translator.IStatementExecutor.Stats.ProfileType.COUNTS;
+import static org.apache.asterix.translator.IStatementExecutor.Stats.ProfileType.FULL;
+
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.util.HashSet;
@@ -85,13 +88,14 @@
     private final Map<String, byte[]> statementParameters;
     private final boolean multiStatement;
     private final int statementCategoryRestrictionMask;
+    private final boolean profile;
     private final IRequestReference requestReference;
 
     public ExecuteStatementRequestMessage(String requestNodeId, long requestMessageId, ILangExtension.Language lang,
             String statementsText, SessionConfig sessionConfig, ResultProperties resultProperties,
             String clientContextID, String handleUrl, Map<String, String> optionalParameters,
-            Map<String, byte[]> statementParameters, boolean multiStatement, int statementCategoryRestrictionMask,
-            IRequestReference requestReference) {
+            Map<String, byte[]> statementParameters, boolean multiStatement, boolean profile,
+            int statementCategoryRestrictionMask, IRequestReference requestReference) {
         this.requestNodeId = requestNodeId;
         this.requestMessageId = requestMessageId;
         this.lang = lang;
@@ -104,6 +108,7 @@
         this.statementParameters = statementParameters;
         this.multiStatement = multiStatement;
         this.statementCategoryRestrictionMask = statementCategoryRestrictionMask;
+        this.profile = profile;
         this.requestReference = requestReference;
     }
 
@@ -141,6 +146,7 @@
             IStatementExecutor translator = statementExecutorFactory.create(ccAppCtx, statements, sessionOutput,
                     compilationProvider, storageComponentProvider, new ResponsePrinter(sessionOutput));
             final IStatementExecutor.Stats stats = new IStatementExecutor.Stats();
+            stats.setType(profile ? FULL : COUNTS);
             Map<String, IAObject> stmtParams = RequestParameters.deserializeParameterValues(statementParameters);
             final IRequestParameters requestParameters = new RequestParameters(requestReference, statementsText, null,
                     resultProperties, stats, outMetadata, clientContextID, optionalParameters, stmtParams,
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java
index 55c1f8e..43d7cd9 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java
@@ -26,6 +26,7 @@
 import org.apache.asterix.api.common.ResultMetadata;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.hyracks.api.exceptions.Warning;
+import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.result.IJobResultCallback;
 import org.apache.hyracks.api.result.ResultJobRecord;
@@ -39,6 +40,8 @@
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
 public class JobResultCallback implements IJobResultCallback {
 
     private static final Logger LOGGER = LogManager.getLogger();
@@ -67,6 +70,16 @@
         aggregateJobStats(jobId, metadata);
     }
 
+    ObjectNode getProfile(JobId jobId) {
+        IJobManager jobManager =
+                ((ClusterControllerService) appCtx.getServiceContext().getControllerService()).getJobManager();
+        final JobRun run = jobManager.get(jobId);
+        if (run != null) {
+            return run.getJobProfile().toJSON();
+        }
+        return null;
+    }
+
     private void aggregateJobStats(JobId jobId, ResultMetadata metadata) {
         long processedObjects = 0;
         long diskIoCount = 0;
@@ -99,5 +112,9 @@
         metadata.setWarnings(AggregateWarnings);
         metadata.setDiskIoCount(diskIoCount);
         metadata.setTotalWarningsCount(aggregateTotalWarningsCount);
+        if (run.getFlags() != null && run.getFlags().contains(JobFlag.PROFILE_RUNTIME)) {
+            metadata.setJobProfile(getProfile(jobId));
+        }
     }
+
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResponseMertics.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResponseMetrics.java
similarity index 76%
rename from asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResponseMertics.java
rename to asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResponseMetrics.java
index 03a20a5..bc91d54 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResponseMertics.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResponseMetrics.java
@@ -18,7 +18,7 @@
  */
 package org.apache.asterix.app.result;
 
-public class ResponseMertics {
+public class ResponseMetrics {
 
     private long elapsedTime;
     private long executionTime;
@@ -29,21 +29,21 @@
     private long warnCount;
     private long diskIoCount;
 
-    private ResponseMertics() {
+    private ResponseMetrics() {
     }
 
-    public static ResponseMertics of(long elapsedTime, long executionTime, long resultCount, long resultSize,
+    public static ResponseMetrics of(long elapsedTime, long executionTime, long resultCount, long resultSize,
             long processedObjects, long errorCount, long warnCount, long diskIoCount) {
-        ResponseMertics mertics = new ResponseMertics();
-        mertics.elapsedTime = elapsedTime;
-        mertics.executionTime = executionTime;
-        mertics.resultCount = resultCount;
-        mertics.resultSize = resultSize;
-        mertics.processedObjects = processedObjects;
-        mertics.errorCount = errorCount;
-        mertics.warnCount = warnCount;
-        mertics.diskIoCount = diskIoCount;
-        return mertics;
+        ResponseMetrics metrics = new ResponseMetrics();
+        metrics.elapsedTime = elapsedTime;
+        metrics.executionTime = executionTime;
+        metrics.resultCount = resultCount;
+        metrics.resultSize = resultSize;
+        metrics.processedObjects = processedObjects;
+        metrics.errorCount = errorCount;
+        metrics.warnCount = warnCount;
+        metrics.diskIoCount = diskIoCount;
+        return metrics;
     }
 
     public long getElapsedTime() {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/MetricsPrinter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/MetricsPrinter.java
index 117441e..21ffe0b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/MetricsPrinter.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/MetricsPrinter.java
@@ -23,7 +23,7 @@
 import java.nio.charset.StandardCharsets;
 
 import org.apache.asterix.api.http.server.ResultUtil;
-import org.apache.asterix.app.result.ResponseMertics;
+import org.apache.asterix.app.result.ResponseMetrics;
 import org.apache.asterix.common.api.Duration;
 import org.apache.asterix.common.api.IResponseFieldPrinter;
 
@@ -51,11 +51,11 @@
     }
 
     public static final String FIELD_NAME = "metrics";
-    private final ResponseMertics mertics;
+    private final ResponseMetrics metrics;
     private final Charset resultCharset;
 
-    public MetricsPrinter(ResponseMertics mertics, Charset resultCharset) {
-        this.mertics = mertics;
+    public MetricsPrinter(ResponseMetrics metrics, Charset resultCharset) {
+        this.metrics = metrics;
         this.resultCharset = resultCharset;
     }
 
@@ -67,35 +67,35 @@
         pw.print(FIELD_NAME);
         pw.print("\": {\n");
         pw.print("\t");
-        ResultUtil.printField(pw, Metrics.ELAPSED_TIME.str(), Duration.formatNanos(mertics.getElapsedTime(), useAscii));
+        ResultUtil.printField(pw, Metrics.ELAPSED_TIME.str(), Duration.formatNanos(metrics.getElapsedTime(), useAscii));
         pw.print("\n\t");
         ResultUtil.printField(pw, Metrics.EXECUTION_TIME.str(),
-                Duration.formatNanos(mertics.getExecutionTime(), useAscii));
+                Duration.formatNanos(metrics.getExecutionTime(), useAscii));
         pw.print("\n\t");
-        ResultUtil.printField(pw, Metrics.RESULT_COUNT.str(), mertics.getResultCount(), true);
+        ResultUtil.printField(pw, Metrics.RESULT_COUNT.str(), metrics.getResultCount(), true);
         pw.print("\n\t");
-        ResultUtil.printField(pw, Metrics.RESULT_SIZE.str(), mertics.getResultSize(), true);
+        ResultUtil.printField(pw, Metrics.RESULT_SIZE.str(), metrics.getResultSize(), true);
         pw.print("\n\t");
-        final boolean hasErrors = mertics.getErrorCount() > 0;
-        final boolean hasWarnings = mertics.getWarnCount() > 0;
-        final boolean hasDiskIoStats = mertics.getDiskIoCount() > 0;
-        ResultUtil.printField(pw, Metrics.PROCESSED_OBJECTS_COUNT.str(), mertics.getProcessedObjects(),
+        final boolean hasErrors = metrics.getErrorCount() > 0;
+        final boolean hasWarnings = metrics.getWarnCount() > 0;
+        final boolean hasDiskIoStats = metrics.getDiskIoCount() > 0;
+        ResultUtil.printField(pw, Metrics.PROCESSED_OBJECTS_COUNT.str(), metrics.getProcessedObjects(),
                 hasWarnings || hasErrors || hasDiskIoStats);
         pw.print("\n");
         //TODO move diskIoCount to the profile printer when it is introduced
         if (hasDiskIoStats) {
             pw.print("\t");
-            ResultUtil.printField(pw, Metrics.DISK_IO_COUNT.str(), mertics.getDiskIoCount(), hasWarnings || hasErrors);
+            ResultUtil.printField(pw, Metrics.DISK_IO_COUNT.str(), metrics.getDiskIoCount(), hasWarnings || hasErrors);
             pw.print("\n");
         }
         if (hasWarnings) {
             pw.print("\t");
-            ResultUtil.printField(pw, Metrics.WARNING_COUNT.str(), mertics.getWarnCount(), hasErrors);
+            ResultUtil.printField(pw, Metrics.WARNING_COUNT.str(), metrics.getWarnCount(), hasErrors);
             pw.print("\n");
         }
         if (hasErrors) {
             pw.print("\t");
-            ResultUtil.printField(pw, Metrics.ERROR_COUNT.str(), mertics.getErrorCount(), false);
+            ResultUtil.printField(pw, Metrics.ERROR_COUNT.str(), metrics.getErrorCount(), false);
             pw.print("\n");
         }
         pw.print("\t}");
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/NcResultPrinter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/NcResultPrinter.java
index 90b0656..c37db07 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/NcResultPrinter.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/NcResultPrinter.java
@@ -61,9 +61,11 @@
         IStatementExecutor.ResultMetadata resultMetadata = responseMsg.getMetadata();
         List<Triple<JobId, ResultSetId, ARecordType>> resultSets = resultMetadata.getResultSets();
         if (delivery == IStatementExecutor.ResultDelivery.IMMEDIATE && !resultSets.isEmpty()) {
-            stats.setProcessedObjects(responseMsg.getStats().getProcessedObjects());
-            stats.setDiskIoCount(responseMsg.getStats().getDiskIoCount());
-            stats.setTotalWarningsCount(responseMsg.getStats().getTotalWarningsCount());
+            IStatementExecutor.Stats responseStats = responseMsg.getStats();
+            stats.setJobProfile(responseStats.getJobProfile());
+            stats.setProcessedObjects(responseStats.getProcessedObjects());
+            stats.setDiskIoCount(responseStats.getDiskIoCount());
+            stats.setTotalWarningsCount(responseStats.getTotalWarningsCount());
             for (int i = 0; i < resultSets.size(); i++) {
                 Triple<JobId, ResultSetId, ARecordType> rsmd = resultSets.get(i);
                 ResultReader resultReader = new ResultReader(resultSet, rsmd.getLeft(), rsmd.getMiddle());
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/ProfilePrinter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/ProfilePrinter.java
new file mode 100644
index 0000000..d74d25c
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/ProfilePrinter.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.result.fields;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+
+import org.apache.asterix.common.api.IResponseFieldPrinter;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.util.DefaultIndenter;
+import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+public class ProfilePrinter implements IResponseFieldPrinter {
+
+    public static final String FIELD_NAME = "profile";
+    private final ObjectNode profile;
+    private final Logger LOGGER = LogManager.getLogger();
+
+    public ProfilePrinter(ObjectNode profile) {
+        this.profile = profile;
+    }
+
+    @Override
+    public void print(PrintWriter pw) {
+        boolean hasProfile = profile != null;
+        if (hasProfile) {
+            try {
+                pw.print("\t\"" + FIELD_NAME + "\" : ");
+                ObjectMapper om = new ObjectMapper();
+                om.disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET);
+                DefaultIndenter ind = new DefaultIndenter("\t", DefaultIndenter.SYS_LF) {
+                    @Override
+                    public void writeIndentation(JsonGenerator jg, int level) throws IOException {
+                        super.writeIndentation(jg, level + 1);
+                    }
+                };
+                DefaultPrettyPrinter pp = new DefaultPrettyPrinter();
+                pp = pp.withArrayIndenter(ind);
+                pp = pp.withObjectIndenter(ind);
+                om.writer(pp).writeValue(pw, profile);
+            } catch (IOException e) {
+                LOGGER.error("Unable to print job profile", e);
+
+            }
+        }
+    }
+
+    @Override
+    public String getName() {
+        return FIELD_NAME;
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 1838c4c..a53ca88 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -391,6 +391,9 @@
                         metadataProvider.setResultAsyncMode(
                                 resultDelivery == ResultDelivery.ASYNC || resultDelivery == ResultDelivery.DEFERRED);
                         metadataProvider.setMaxResultReads(maxResultReads);
+                        if (stats.getType() == Stats.ProfileType.FULL) {
+                            this.jobFlags.add(JobFlag.PROFILE_RUNTIME);
+                        }
                         handleQuery(metadataProvider, (Query) stmt, hcc, resultSet, resultDelivery, outMetadata, stats,
                                 requestParameters, stmtParams, stmtRewriter);
                         break;
@@ -2549,6 +2552,9 @@
                 (org.apache.asterix.api.common.ResultMetadata) controllerService.getResultDirectoryService()
                         .getResultMetadata(jobId, rsId);
         stats.setProcessedObjects(resultMetadata.getProcessedObjects());
+        if (jobFlags.contains(JobFlag.PROFILE_RUNTIME)) {
+            stats.setJobProfile(resultMetadata.getJobProfile());
+        }
         stats.setDiskIoCount(resultMetadata.getDiskIoCount());
         stats.setTotalWarningsCount(resultMetadata.getTotalWarningsCount());
         warningCollector.warn(resultMetadata.getWarnings());
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java
index eb708ce..60d44a1 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java
@@ -51,6 +51,7 @@
         RESULTS("results"),
         REQUEST_ID("requestID"),
         METRICS("metrics"),
+        PROFILE("profile"),
         CLIENT_CONTEXT_ID("clientContextID"),
         SIGNATURE("signature"),
         STATUS("status"),
@@ -93,6 +94,10 @@
         return extract(resultStream, EnumSet.of(ResultField.METRICS), resultCharset).getResult();
     }
 
+    public static InputStream extractProfile(InputStream resultStream, Charset resultCharset) throws Exception {
+        return extract(resultStream, EnumSet.of(ResultField.PROFILE), resultCharset).getResult();
+    }
+
     public static InputStream extractPlans(InputStream resultStream, Charset resultCharset) throws Exception {
         return extract(resultStream, EnumSet.of(ResultField.PLANS), resultCharset).getResult();
     }
@@ -167,6 +172,7 @@
                     break;
                 case REQUEST_ID:
                 case METRICS:
+                case PROFILE:
                 case CLIENT_CONTEXT_ID:
                 case SIGNATURE:
                 case STATUS:
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index 55b8146..7319ab5 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -164,6 +164,7 @@
     public static final String DELIVERY_IMMEDIATE = "immediate";
     public static final String DIAGNOSE = "diagnose";
     private static final String METRICS_QUERY_TYPE = "metrics";
+    private static final String PROFILE_QUERY_TYPE = "profile";
     private static final String PLANS_QUERY_TYPE = "plans";
 
     private static final HashMap<Integer, ITestServer> runningTestServers = new HashMap<>();
@@ -951,6 +952,7 @@
             case "parse":
             case "deferred":
             case "metrics":
+            case "profile":
             case "plans":
                 // isDmlRecoveryTest: insert Crash and Recovery
                 if (isDmlRecoveryTest) {
@@ -1298,6 +1300,9 @@
                 case METRICS_QUERY_TYPE:
                     resultStream = ResultExtractor.extractMetrics(resultStream, responseCharset);
                     break;
+                case PROFILE_QUERY_TYPE:
+                    resultStream = ResultExtractor.extractProfile(resultStream, responseCharset);
+                    break;
                 case PLANS_QUERY_TYPE:
                     resultStream = ResultExtractor.extractPlans(resultStream, responseCharset);
                     break;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestHelper.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestHelper.java
index 0cfeee7..7695698 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestHelper.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestHelper.java
@@ -218,4 +218,16 @@
         objectMapper.configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true);
         return objectMapper;
     }
+
+    public static void main(String[] args) throws Exception {
+        ObjectMapper om = createObjectMapper();
+        String patternFile = args[0];
+        String instanceFile = args[1];
+        if (equalJson(om.readTree(new File(patternFile)), om.readTree(new File(instanceFile)))) {
+            System.out.println(instanceFile + " matches " + patternFile);
+        } else {
+            System.out.println(instanceFile + " does not match " + patternFile);
+            System.exit(1);
+        }
+    }
 }
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ProfiledExecutionTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ProfiledExecutionTest.java
new file mode 100644
index 0000000..7c945fc
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ProfiledExecutionTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.test.runtime;
+
+import java.util.Collection;
+
+import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Runs the cluster state runtime tests with the storage parallelism.
+ */
+@RunWith(Parameterized.class)
+public class ProfiledExecutionTest {
+    protected static final String TEST_CONFIG_FILE_NAME = "src/test/resources/cc-single.conf";
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, new TestExecutor());
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        LangExecutionUtil.tearDown();
+    }
+
+    @Parameters(name = "ProfiledExecutionTest {index}: {0}")
+    public static Collection<Object[]> tests() throws Exception {
+        return LangExecutionUtil.buildTestsInXml("profiled.xml");
+    }
+
+    protected TestCaseContext tcCtx;
+
+    public ProfiledExecutionTest(TestCaseContext tcCtx) {
+        this.tcCtx = tcCtx;
+    }
+
+    @Test
+    public void test() throws Exception {
+        LangExecutionUtil.test(tcCtx);
+    }
+}
diff --git a/asterixdb/asterix-app/src/test/resources/cc-single.conf b/asterixdb/asterix-app/src/test/resources/cc-single.conf
new file mode 100644
index 0000000..419abf7
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/cc-single.conf
@@ -0,0 +1,50 @@
+; Licensed to the Apache Software Foundation (ASF) under one
+; or more contributor license agreements.  See the NOTICE file
+; distributed with this work for additional information
+; regarding copyright ownership.  The ASF licenses this file
+; to you under the Apache License, Version 2.0 (the
+; "License"); you may not use this file except in compliance
+; with the License.  You may obtain a copy of the License at
+;
+;   http://www.apache.org/licenses/LICENSE-2.0
+;
+; Unless required by applicable law or agreed to in writing,
+; software distributed under the License is distributed on an
+; "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+; KIND, either express or implied.  See the License for the
+; specific language governing permissions and limitations
+; under the License.
+
+[nc/asterix_nc1]
+txn.log.dir=target/tmp/asterix_nc1/txnlog
+core.dump.dir=target/tmp/asterix_nc1/coredump
+iodevices=target/tmp/asterix_nc1/iodevice1
+nc.api.port=19004
+#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5006
+
+[nc]
+address=127.0.0.1
+command=asterixnc
+app.class=org.apache.asterix.hyracks.bootstrap.NCApplication
+jvm.args=-Xmx4096m -Dnode.Resolver="org.apache.asterix.external.util.IdentitiyResolverFactory"
+storage.buffercache.pagesize=32KB
+storage.buffercache.size=48MB
+storage.memorycomponent.globalbudget=512MB
+
+[cc]
+address = 127.0.0.1
+app.class=org.apache.asterix.hyracks.bootstrap.CCApplication
+heartbeat.period=2000
+heartbeat.max.misses=25
+
+[common]
+log.dir = logs/
+log.level = INFO
+compiler.framesize=32KB
+compiler.sortmemory=320KB
+compiler.groupmemory=160KB
+compiler.joinmemory=256KB
+compiler.textsearchmemory=160KB
+compiler.windowmemory=192KB
+messaging.frame.size=4096
+messaging.frame.count=512
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/profiled.xml b/asterixdb/asterix-app/src/test/resources/runtimets/profiled.xml
new file mode 100644
index 0000000..496927e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/profiled.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements.  See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership.  The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License.  You may obtain a copy of the License at
+ !
+ !   http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied.  See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<test-suite xmlns="urn:xml.testframework.asterix.apache.org" ResultOffsetPath="results" QueryOffsetPath="queries_sqlpp"
+            QueryFileExtension=".sqlpp">
+  <test-group name="profile">
+    <test-case FilePath="profile">
+      <compilation-unit name="full-scan">
+        <parameter name="profile" value="true" />
+        <output-dir compare="Text">full-scan</output-dir>
+      </compilation-unit>
+    </test-case>
+  </test-group>
+</test-suite>
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/full-scan/full-scan.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/full-scan/full-scan.1.ddl.sqlpp
new file mode 100644
index 0000000..441bee4
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/full-scan/full-scan.1.ddl.sqlpp
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Processed objects metrics on full scan
+ * Expected Res : Success
+ * Date         : 28 Sep 2017
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+
+use test;
+
+create type test.AddressType as
+{
+  number : bigint,
+  street : string,
+  city : string
+};
+
+create type test.CustomerType as
+ closed {
+  cid : bigint,
+  name : string,
+  age : bigint?,
+  address : AddressType?,
+  lastorder : {
+      oid : bigint,
+      total : float
+  }
+};
+
+create dataset Customers(CustomerType) primary key cid;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/full-scan/full-scan.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/full-scan/full-scan.2.update.sqlpp
new file mode 100644
index 0000000..2c5d5d9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/full-scan/full-scan.2.update.sqlpp
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Processed objects metrics on full scan
+ * Expected Res : Success
+ * Date         : 28 Sep 2017
+ */
+
+use test;
+
+load dataset Customers using localfs
+  ((`path`=`asterix_nc1://data/custord-tiny/customer-tiny-neg.adm`),
+  (`format`=`adm`));
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/full-scan/full-scan.3.profile.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/full-scan/full-scan.3.profile.sqlpp
new file mode 100644
index 0000000..9db1d12
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/full-scan/full-scan.3.profile.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Processed objects metrics on full scan
+ * Expected Res : Success
+ * Date         : 28 Sep 2017
+ */
+
+use test;
+
+select count(*) from Customers;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/full-scan/full-scan.4.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/full-scan/full-scan.4.ddl.sqlpp
new file mode 100644
index 0000000..4cccd9f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/full-scan/full-scan.4.ddl.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : Processed objects metrics on full scan
+ * Expected Res : Success
+ * Date         : 28 Sep 2017
+ */
+
+drop dataverse test;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/full-scan/full-scan.3.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/full-scan/full-scan.3.regexjson
new file mode 100644
index 0000000..82e7128
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/full-scan/full-scan.3.regexjson
@@ -0,0 +1,64 @@
+{
+    "job-id": "R{[A-Z0-9.:]+}",
+    "counters": [],
+    "joblets": [
+        {
+            "node-id": "R{.+}",
+            "counters": [],
+            "tasks": [
+                {
+                    "activity-id": "R{[A-Z0-9.:]+}",
+                    "partition": "R{[0-9]+}",
+                    "attempt": "R{[0-9]+}",
+                    "partition-send-profile": [
+                        {
+                            "partition-id": {
+                                "job-id": "R{[A-Z0-9.:]+}",
+                                "connector-id": "R{[A-Z0-9.:]+}",
+                                "sender-index": "R{[0-9]+}",
+                                "receiver-index": "R{[0-9]+}"
+                            },
+                            "open-time": "R{[0-9]+}",
+                            "close-time": "R{[0-9]+}",
+                            "offset": "R{[0-9]+}",
+                            "frame-times": [
+                                0
+                            ],
+                            "resolution": 1
+                        }
+                    ],
+                    "counters": [
+                        {
+                            "name": "Empty Tuple Source",
+                            "time": "R{[0-9.]+}"
+                        },
+                        {
+                            "name": "Index Search",
+                            "time": "R{[0-9.]+}"
+                        },
+                        {
+                            "name": "R{.+}",
+                            "time": "R{[0-9.]+}"
+                        }
+                    ]
+                },
+                {
+                  "activity-id": "R{[A-Z0-9.:]+}",
+                  "partition": "R{[0-9]+}",
+                  "attempt": "R{[0-9]+}",
+                  "partition-send-profile": [],
+                  "counters": [
+                    {
+                      "name": "R{.+}",
+                      "time": "R{[0-9.]+}"
+                    },
+                    {
+                      "name": "Result Writer",
+                      "time": "R{[0-9.]+}"
+                    }
+                  ]
+                }
+            ]
+        }
+    ]
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/builders/IARecordBuilder.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/builders/IARecordBuilder.java
index 6d648b1..ea29643 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/builders/IARecordBuilder.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/builders/IARecordBuilder.java
@@ -20,9 +20,7 @@
 package org.apache.asterix.builders;
 
 import java.io.DataOutput;
-import java.io.IOException;
 
-import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.api.IValueReference;
@@ -55,7 +53,7 @@
      *            The field name.
      * @param value
      *            The field value.
-     * @throws AsterixException
+     * @throws HyracksDataException
      *             if the field name conflicts with a closed field name
      */
     public void addField(IValueReference name, IValueReference value) throws HyracksDataException;
@@ -66,8 +64,7 @@
      * @param writeTypeTag
      *            Whether to write a typetag as part of the record's serialized
      *            representation.
-     * @throws IOException
-     * @throws AsterixException
+     * @throws HyracksDataException
      *             if any open field names conflict with each other
      */
     public void write(DataOutput out, boolean writeTypeTag) throws HyracksDataException;
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/constructors/OpenRecordConstructorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/constructors/OpenRecordConstructorDescriptor.java
index 2977470..a98e2dd 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/constructors/OpenRecordConstructorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/constructors/OpenRecordConstructorDescriptor.java
@@ -118,4 +118,9 @@
             }
         };
     }
+
+    @Override
+    public String toString() {
+        return "Open Record Constructor";
+    }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
index 7887dde..07d4e94 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
@@ -28,6 +28,7 @@
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.EnforceFrameWriter;
+import org.apache.hyracks.api.dataflow.TimedFrameWriter;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobFlag;
@@ -60,15 +61,19 @@
     public IAggregatorDescriptor createAggregator(final IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
             RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults,
             final IFrameWriter writer, long memoryBudget) throws HyracksDataException {
+        final boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT);
+        final boolean profile = ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME);
         final RunningAggregatorOutput outputWriter =
                 new RunningAggregatorOutput(ctx, subplans, keyFieldIdx.length + decorFieldIdx.length, writer);
-        // should enforce protocol
-        boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT);
-        IFrameWriter enforcedWriter = enforce ? EnforceFrameWriter.enforce(outputWriter) : outputWriter;
+        IFrameWriter fw = outputWriter;
+        if (profile) {
+            fw = TimedFrameWriter.time(outputWriter, ctx, "Aggregate Writer");
+        } else if (enforce) {
+            fw = EnforceFrameWriter.enforce(outputWriter);
+        }
         final NestedTupleSourceRuntime[] pipelines = new NestedTupleSourceRuntime[subplans.length];
         for (int i = 0; i < subplans.length; i++) {
-            pipelines[i] = (NestedTupleSourceRuntime) PipelineAssembler.assemblePipeline(subplans[i], enforcedWriter,
-                    ctx, null);
+            pipelines[i] = (NestedTupleSourceRuntime) PipelineAssembler.assemblePipeline(subplans[i], fw, ctx, null);
         }
 
         final ArrayTupleBuilder gbyTb = outputWriter.getGroupByTupleBuilder();
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
index 07365db..b3fef7f 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
@@ -121,6 +121,11 @@
                 throw exception;
             }
         }
+
+        @Override
+        public String getDisplayName() {
+            return "Empty Tuple Source";
+        }
     }
 
     private IOperatorNodePushable createOneInputOneOutputPushRuntime(final IHyracksTaskContext ctx,
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
index f5477ec..81f5d08 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
@@ -56,13 +56,13 @@
     public IFrameWriter assemblePipeline(IFrameWriter writer, IHyracksTaskContext ctx) throws HyracksDataException {
         // should enforce protocol
         boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT);
+        boolean profile = ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME);
         // plug the operators
         IFrameWriter start = writer;// this.writer;
         IPushRuntimeFactory[] runtimeFactories = pipeline.getRuntimeFactories();
         RecordDescriptor[] recordDescriptors = pipeline.getRecordDescriptors();
         for (int i = runtimeFactories.length - 1; i >= 0; i--) {
-            start = enforce ? EnforceFrameWriter.enforce(start) : start;
-
+            start = (enforce && !profile) ? EnforceFrameWriter.enforce(start) : start;
             IPushRuntimeFactory runtimeFactory = runtimeFactories[i];
             IPushRuntime[] newRuntimes = runtimeFactory.createPushRuntime(ctx);
             for (int j = 0; j < newRuntimes.length; j++) {
@@ -99,12 +99,13 @@
             IHyracksTaskContext ctx, Map<IPushRuntimeFactory, IPushRuntime> outRuntimeMap) throws HyracksDataException {
         // should enforce protocol
         boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT);
+        boolean profile = ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME);
         // plug the operators
         IFrameWriter start = writer;
         IPushRuntimeFactory[] runtimeFactories = subplan.getRuntimeFactories();
         RecordDescriptor[] recordDescriptors = subplan.getRecordDescriptors();
         for (int i = runtimeFactories.length - 1; i >= 0; i--) {
-            start = enforce ? EnforceFrameWriter.enforce(start) : start;
+            start = (enforce && !profile) ? EnforceFrameWriter.enforce(start) : start;
             IPushRuntimeFactory runtimeFactory = runtimeFactories[i];
             IPushRuntime[] newRuntimes = runtimeFactory.createPushRuntime(ctx);
             IPushRuntime newRuntime = enforce ? EnforcePushRuntime.enforce(newRuntimes[0]) : newRuntimes[0];
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
index 3cdc5aed..5343069 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
@@ -79,7 +79,7 @@
             if (i > 0) {
                 sb.append(", ");
             }
-            sb.append(evalFactories[i]);
+            sb.append(evalFactories[i].toString());
         }
         sb.append("]");
         return sb.toString();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/counters/Counter.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/com/job/profiling/counters/Counter.java
similarity index 94%
rename from hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/counters/Counter.java
rename to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/com/job/profiling/counters/Counter.java
index 0bcb246..0490576 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/counters/Counter.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/com/job/profiling/counters/Counter.java
@@ -16,12 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.control.common.job.profiling.counters;
+package org.apache.hyracks.api.com.job.profiling.counters;
 
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hyracks.api.job.profiling.counters.ICounter;
 
+@SuppressWarnings("squid:S1700")
 public class Counter implements ICounter {
     private static final long serialVersionUID = -3935601595055562080L;
 
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IPassableTimer.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IPassableTimer.java
new file mode 100644
index 0000000..6afbccb
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IPassableTimer.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.dataflow;
+
+public interface IPassableTimer {
+    /*
+    A timer intended to be used for timing the individual components of a
+    pipelined process. An instance of IPassableTimer is held by each method
+    in the pipeline, and is paused() when that method passes off control to
+    a component above it, and is resume()d when the component above it returns.
+     */
+
+    void pause();
+
+    void resume();
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/TimedFrameWriter.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/TimedFrameWriter.java
new file mode 100644
index 0000000..83a4b34
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/TimedFrameWriter.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.dataflow;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.profiling.IStatsCollector;
+import org.apache.hyracks.api.job.profiling.counters.ICounter;
+
+public class TimedFrameWriter implements IFrameWriter, IPassableTimer {
+
+    // The downstream data consumer of this writer.
+    private final IFrameWriter writer;
+    private long frameStart = 0;
+    final ICounter counter;
+    final IStatsCollector collector;
+    final String name;
+
+    public TimedFrameWriter(IFrameWriter writer, IStatsCollector collector, String name, ICounter counter) {
+        this.writer = writer;
+        this.collector = collector;
+        this.name = name;
+        this.counter = counter;
+    }
+
+    protected TimedFrameWriter(IFrameWriter writer, IStatsCollector collector, String name) {
+        this(writer, collector, name, collector.getOrAddOperatorStats(name).getTimeCounter());
+    }
+
+    @Override
+    public final void open() throws HyracksDataException {
+        try {
+            startClock();
+            writer.open();
+        } finally {
+            stopClock();
+        }
+    }
+
+    @Override
+    public final void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        try {
+            startClock();
+            writer.nextFrame(buffer);
+        } finally {
+            stopClock();
+        }
+    }
+
+    @Override
+    public final void flush() throws HyracksDataException {
+        try {
+            startClock();
+            writer.flush();
+        } finally {
+            stopClock();
+        }
+    }
+
+    @Override
+    public final void fail() throws HyracksDataException {
+        writer.fail();
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        try {
+            startClock();
+            writer.close();
+        } finally {
+            stopClock();
+        }
+    }
+
+    private void stopClock() {
+        pause();
+        collector.giveClock(this);
+    }
+
+    private void startClock() {
+        if (frameStart > 0) {
+            return;
+        }
+        frameStart = collector.takeClock(this);
+    }
+
+    @Override
+    public void resume() {
+        if (frameStart > 0) {
+            return;
+        }
+        long nt = System.nanoTime();
+        frameStart = nt;
+    }
+
+    @Override
+    public void pause() {
+        if (frameStart > 1) {
+            long nt = System.nanoTime();
+            long delta = nt - frameStart;
+            counter.update(delta);
+            frameStart = -1;
+        }
+    }
+
+    public static IFrameWriter time(IFrameWriter writer, IHyracksTaskContext ctx, String name)
+            throws HyracksDataException {
+        return writer instanceof TimedFrameWriter ? writer
+                : new TimedFrameWriter(writer, ctx.getStatsCollector(), name);
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/TimedOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/TimedOperatorNodePushable.java
new file mode 100644
index 0000000..2d46bea
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/TimedOperatorNodePushable.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.dataflow;
+
+import java.util.HashMap;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.profiling.IStatsCollector;
+import org.apache.hyracks.api.rewriter.runtime.SuperActivityOperatorNodePushable;
+
+public class TimedOperatorNodePushable extends TimedFrameWriter implements IOperatorNodePushable, IPassableTimer {
+
+    IOperatorNodePushable op;
+    HashMap<Integer, IFrameWriter> inputs;
+    long frameStart;
+
+    TimedOperatorNodePushable(IOperatorNodePushable op, IStatsCollector collector) throws HyracksDataException {
+        super(null, collector, op.getDisplayName());
+        this.op = op;
+        inputs = new HashMap<>();
+    }
+
+    @Override
+    public void initialize() throws HyracksDataException {
+        synchronized (collector) {
+            startClock();
+            op.initialize();
+            stopClock();
+        }
+    }
+
+    @Override
+    public void deinitialize() throws HyracksDataException {
+        synchronized (collector) {
+            startClock();
+            op.deinitialize();
+            stopClock();
+        }
+    }
+
+    @Override
+    public int getInputArity() {
+        return op.getInputArity();
+    }
+
+    @Override
+    public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc)
+            throws HyracksDataException {
+        op.setOutputFrameWriter(index, writer, recordDesc);
+    }
+
+    @Override
+    public IFrameWriter getInputFrameWriter(int index) {
+        IFrameWriter ifw = op.getInputFrameWriter(index);
+        if (!(op instanceof TimedFrameWriter) && ifw.equals(op)) {
+            return new TimedFrameWriter(op.getInputFrameWriter(index), collector, op.getDisplayName(), counter);
+        }
+        return op.getInputFrameWriter(index);
+    }
+
+    @Override
+    public String getDisplayName() {
+        return op.getDisplayName();
+    }
+
+    private void stopClock() {
+        pause();
+        collector.giveClock(this);
+    }
+
+    private void startClock() {
+        if (frameStart > 0) {
+            return;
+        }
+        frameStart = collector.takeClock(this);
+    }
+
+    @Override
+    public void resume() {
+        if (frameStart > 0) {
+            return;
+        }
+        long nt = System.nanoTime();
+        frameStart = nt;
+    }
+
+    @Override
+    public void pause() {
+        if (frameStart > 0) {
+            long nt = System.nanoTime();
+            long delta = nt - frameStart;
+            counter.update(delta);
+            frameStart = -1;
+        }
+    }
+
+    public static IOperatorNodePushable time(IOperatorNodePushable op, IHyracksTaskContext ctx)
+            throws HyracksDataException {
+        if (!(op instanceof TimedOperatorNodePushable) && !(op instanceof SuperActivityOperatorNodePushable)) {
+            return new TimedOperatorNodePushable(op, ctx.getStatsCollector());
+        }
+        return op;
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IStatsCollector.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IStatsCollector.java
index 1e73146..8930d34 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IStatsCollector.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IStatsCollector.java
@@ -19,7 +19,9 @@
 package org.apache.hyracks.api.job.profiling;
 
 import java.io.Serializable;
+import java.util.Map;
 
+import org.apache.hyracks.api.dataflow.IPassableTimer;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IWritable;
 
@@ -36,13 +38,34 @@
     /**
      * @param operatorName
      * @return {@link IOperatorStats} for the operator with name <code>operatorName</code>
-     * if one exists or else null.
+     * if it already exists, and adds it if it does not.
      */
-    IOperatorStats getOperatorStats(String operatorName);
+    IOperatorStats getOrAddOperatorStats(String operatorName);
+
+    /**
+     * Get every registered operator stats object
+     * @return All registered operators, and their collected stats, with the names as keys and stats as values
+     */
+    Map<String, IOperatorStats> getAllOperatorStats();
 
     /**
      * @return A special {@link IOperatorStats} that has the aggregated stats
      * from all operators in the collection.
      */
     IOperatorStats getAggregatedStats();
+
+    /**
+     * Pause an operator's timer, to pass it to another operator
+     * @param newHolder the timer that is starting execution
+     * @return the current nanoTime when the clock was taken from the other operator
+     */
+    long takeClock(IPassableTimer newHolder);
+
+    /**
+     * Resume an operator's timer, when a downstream operator has finished execution of
+     * the method the upstream operator called
+     * @param currHolder the timer that needs to be paused
+     */
+    void giveClock(IPassableTimer currHolder);
+
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/OperatorStats.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/OperatorStats.java
similarity index 87%
rename from hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/OperatorStats.java
rename to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/OperatorStats.java
index 66fb797..08c1adc 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/OperatorStats.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/OperatorStats.java
@@ -16,15 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.control.common.job.profiling;
+package org.apache.hyracks.api.job.profiling;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.hyracks.api.job.profiling.IOperatorStats;
+import org.apache.hyracks.api.com.job.profiling.counters.Counter;
 import org.apache.hyracks.api.job.profiling.counters.ICounter;
-import org.apache.hyracks.control.common.job.profiling.counters.Counter;
 
 public class OperatorStats implements IOperatorStats {
     private static final long serialVersionUID = 6401830963367567167L;
@@ -85,4 +84,10 @@
         timeCounter.set(input.readLong());
         diskIoCounter.set(input.readLong());
     }
+
+    @Override
+    public String toString() {
+        return "{ " + "\"operatorName\": \"" + operatorName + "\", " + "\"" + tupleCounter.getName() + "\": "
+                + tupleCounter.get() + ", \"" + timeCounter.getName() + "\": " + timeCounter.get() + " }";
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index 12d696e..168d7dd 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -22,11 +22,13 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Queue;
+import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -41,6 +43,7 @@
 import org.apache.hyracks.api.dataflow.IActivity;
 import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.TimedOperatorNodePushable;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -94,15 +97,22 @@
     }
 
     private void init() throws HyracksDataException {
-        Queue<Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> childQueue = new LinkedList<>();
+        LinkedList<Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> childQueue = new LinkedList<>();
         List<IConnectorDescriptor> outputConnectors;
         final boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT);
+        final boolean profile = ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME);
         /*
          * Set up the source operators
          */
+        Set<Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> sources = new HashSet<>();
         for (Entry<ActivityId, IActivity> entry : startActivities.entrySet()) {
-            IOperatorNodePushable opPushable =
-                    entry.getValue().createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
+            IOperatorNodePushable opPushable = null;
+            if (profile) {
+                opPushable = TimedOperatorNodePushable
+                        .time(entry.getValue().createPushRuntime(ctx, recordDescProvider, partition, nPartitions), ctx);
+            } else {
+                opPushable = entry.getValue().createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
+            }
             operatorNodePushablesBFSOrder.add(opPushable);
             operatorNodePushables.put(entry.getKey(), opPushable);
             inputArity += opPushable.getInputArity();
@@ -110,6 +120,7 @@
                     MapUtils.getObject(parent.getActivityOutputMap(), entry.getKey(), Collections.emptyList());
             for (IConnectorDescriptor conn : outputConnectors) {
                 childQueue.add(parent.getConnectorActivityMap().get(conn.getConnectorId()));
+                sources.add(childQueue.peekLast());
             }
         }
 
@@ -128,8 +139,13 @@
             IOperatorNodePushable sourceOp = operatorNodePushables.get(sourceId);
             IOperatorNodePushable destOp = operatorNodePushables.get(destId);
             if (destOp == null) {
-                destOp = channel.getRight().getLeft().createPushRuntime(ctx, recordDescProvider, partition,
-                        nPartitions);
+                if (profile) {
+                    destOp = TimedOperatorNodePushable.time(channel.getRight().getLeft().createPushRuntime(ctx,
+                            recordDescProvider, partition, nPartitions), ctx);
+                } else {
+                    destOp = channel.getRight().getLeft().createPushRuntime(ctx, recordDescProvider, partition,
+                            nPartitions);
+                }
                 operatorNodePushablesBFSOrder.add(destOp);
                 operatorNodePushables.put(destId, destOp);
             }
@@ -138,7 +154,7 @@
              * construct the dataflow connection from a producer to a consumer
              */
             IFrameWriter writer = destOp.getInputFrameWriter(inputChannel);
-            writer = enforce ? EnforceFrameWriter.enforce(writer) : writer;
+            writer = (enforce && !profile) ? EnforceFrameWriter.enforce(writer) : writer;
             sourceOp.setOutputFrameWriter(outputChannel, writer,
                     recordDescProvider.getInputRecordDescriptor(destId, inputChannel));
 
diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/stats/AggregateCounter.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/stats/AggregateCounter.java
index 6cb90e3..1a878a0 100644
--- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/stats/AggregateCounter.java
+++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/stats/AggregateCounter.java
@@ -19,7 +19,7 @@
 
 package org.apache.hyracks.client.stats;
 
-import org.apache.hyracks.control.common.job.profiling.counters.Counter;
+import org.apache.hyracks.api.com.job.profiling.counters.Counter;
 
 public class AggregateCounter extends Counter {
     private static final long serialVersionUID = 9140555872026977436L;
diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/stats/impl/ClientCounterContext.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/stats/impl/ClientCounterContext.java
index 0d3affc..84831e2 100644
--- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/stats/impl/ClientCounterContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/stats/impl/ClientCounterContext.java
@@ -30,11 +30,11 @@
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hyracks.api.com.job.profiling.counters.Counter;
 import org.apache.hyracks.api.job.profiling.counters.ICounter;
 import org.apache.hyracks.client.stats.AggregateCounter;
 import org.apache.hyracks.client.stats.Counters;
 import org.apache.hyracks.client.stats.IClusterCounterContext;
-import org.apache.hyracks.control.common.job.profiling.counters.Counter;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java
index 8da19ac..878d7c3 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java
@@ -21,20 +21,25 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.HashMap;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.LinkedHashMap;
 import java.util.Map;
 
-import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.dataflow.IPassableTimer;
 import org.apache.hyracks.api.job.profiling.IOperatorStats;
 import org.apache.hyracks.api.job.profiling.IStatsCollector;
+import org.apache.hyracks.api.job.profiling.OperatorStats;
 
 public class StatsCollector implements IStatsCollector {
-    private static final long serialVersionUID = 6858817639895434577L;
+    private static final long serialVersionUID = 6858817639895434578L;
 
-    private final Map<String, IOperatorStats> operatorStatsMap = new HashMap<>();
+    private final Map<String, IOperatorStats> operatorStatsMap = new LinkedHashMap<>();
+    private transient Deque<IPassableTimer> clockHolder = new ArrayDeque<>();
 
     @Override
-    public void add(IOperatorStats operatorStats) throws HyracksDataException {
+    public void add(IOperatorStats operatorStats) {
         if (operatorStatsMap.containsKey(operatorStats.getName())) {
             throw new IllegalArgumentException("Operator with the same name already exists");
         }
@@ -42,8 +47,13 @@
     }
 
     @Override
-    public IOperatorStats getOperatorStats(String operatorName) {
-        return operatorStatsMap.get(operatorName);
+    public IOperatorStats getOrAddOperatorStats(String operatorName) {
+        return operatorStatsMap.computeIfAbsent(operatorName, OperatorStats::new);
+    }
+
+    @Override
+    public Map<String, IOperatorStats> getAllOperatorStats() {
+        return Collections.unmodifiableMap(operatorStatsMap);
     }
 
     public static StatsCollector create(DataInput input) throws IOException {
@@ -79,4 +89,24 @@
             operatorStatsMap.put(opStats.getName(), opStats);
         }
     }
+
+    @Override
+    public long takeClock(IPassableTimer newHolder) {
+        if (newHolder != null) {
+            if (clockHolder.peek() != null) {
+                clockHolder.peek().pause();
+            }
+            clockHolder.push(newHolder);
+        }
+        return System.nanoTime();
+    }
+
+    @Override
+    public void giveClock(IPassableTimer currHolder) {
+        clockHolder.removeLastOccurrence(currHolder);
+        if (clockHolder.peek() != null) {
+            clockHolder.peek().resume();
+        }
+    }
+
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java
index 6dfb71d..1e7a3b7 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java
@@ -21,6 +21,7 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.text.DecimalFormat;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -29,6 +30,7 @@
 
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.exceptions.Warning;
+import org.apache.hyracks.api.job.profiling.IOperatorStats;
 import org.apache.hyracks.api.job.profiling.IStatsCollector;
 import org.apache.hyracks.api.partitions.PartitionId;
 import org.apache.hyracks.control.common.job.profiling.StatsCollector;
@@ -116,10 +118,24 @@
             json.set("partition-send-profile", pspArray);
         }
         populateCounters(json);
-
         return json;
     }
 
+    @Override
+    protected void populateCounters(ObjectNode json) {
+        ObjectMapper om = new ObjectMapper();
+        Map<String, IOperatorStats> opTimes = statsCollector.getAllOperatorStats();
+        ArrayNode countersObj = om.createArrayNode();
+        opTimes.forEach((key, value) -> {
+            ObjectNode jpe = om.createObjectNode();
+            jpe.put("name", key);
+            jpe.put("time", Double
+                    .parseDouble(new DecimalFormat("#.####").format((double) value.getTimeCounter().get() / 1000000)));
+            countersObj.add(jpe);
+        });
+        json.set("counters", countersObj);
+    }
+
     public IStatsCollector getStatsCollector() {
         return statsCollector;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
index 8afdc9e..d9b3961 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
@@ -27,6 +27,7 @@
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.com.job.profiling.counters.Counter;
 import org.apache.hyracks.api.comm.IPartitionCollector;
 import org.apache.hyracks.api.comm.PartitionChannel;
 import org.apache.hyracks.api.context.IHyracksJobletContext;
@@ -53,7 +54,6 @@
 import org.apache.hyracks.control.common.job.PartitionRequest;
 import org.apache.hyracks.control.common.job.PartitionState;
 import org.apache.hyracks.control.common.job.profiling.StatsCollector;
-import org.apache.hyracks.control.common.job.profiling.counters.Counter;
 import org.apache.hyracks.control.common.job.profiling.om.JobletProfile;
 import org.apache.hyracks.control.common.job.profiling.om.TaskProfile;
 import org.apache.hyracks.control.nc.io.WorkspaceFileFactory;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 81b9d5e..00de038 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -36,6 +36,7 @@
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.hyracks.api.com.job.profiling.counters.Counter;
 import org.apache.hyracks.api.comm.IFrameReader;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.IPartitionCollector;
@@ -66,7 +67,6 @@
 import org.apache.hyracks.api.util.JavaSerializationUtils;
 import org.apache.hyracks.control.common.job.PartitionState;
 import org.apache.hyracks.control.common.job.profiling.StatsCollector;
-import org.apache.hyracks.control.common.job.profiling.counters.Counter;
 import org.apache.hyracks.control.common.job.profiling.om.PartitionProfile;
 import org.apache.hyracks.control.common.job.profiling.om.TaskProfile;
 import org.apache.hyracks.control.nc.io.WorkspaceFileFactory;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
index 09a4c18..69f1113 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
@@ -169,6 +169,7 @@
                 List<IConnectorDescriptor> outputs = ac.getActivityOutputMap().get(aid);
                 if (outputs != null) {
                     final boolean enforce = flags.contains(JobFlag.ENFORCE_CONTRACT);
+                    final boolean profile = flags.contains(JobFlag.PROFILE_RUNTIME);
                     for (int i = 0; i < outputs.size(); ++i) {
                         final IConnectorDescriptor conn = outputs.get(i);
                         RecordDescriptor recordDesc = ac.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
@@ -179,7 +180,7 @@
                         LOGGER.trace("input: {}: {}", i, conn.getConnectorId());
                         IFrameWriter writer = conn.createPartitioner(task, recordDesc, pwFactory, partition,
                                 td.getPartitionCount(), td.getOutputPartitionCounts()[i]);
-                        writer = enforce ? EnforceFrameWriter.enforce(writer) : writer;
+                        writer = (enforce && !profile) ? EnforceFrameWriter.enforce(writer) : writer;
                         operator.setOutputFrameWriter(i, writer, recordDesc);
                     }
                 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java
index 7ca01a5..c545e7d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java
@@ -30,12 +30,14 @@
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
 import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 import org.apache.hyracks.dataflow.std.sort.AbstractExternalSortRunMerger;
-import org.apache.hyracks.dataflow.std.sort.AbstractSortRunGenerator;
 import org.apache.hyracks.dataflow.std.sort.AbstractSorterOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.sort.Algorithm;
+import org.apache.hyracks.dataflow.std.sort.IRunGenerator;
+import org.apache.hyracks.dataflow.std.sort.TimedRunGenerator;
 
 /**
  * This Operator pushes group-by aggregation into the external sort.
@@ -122,7 +124,7 @@
             RecordDescriptor outRecordDesc, boolean finalStage) {
         super(spec, framesLimit, sortFields, keyNormalizerFactories, comparatorFactories, outRecordDesc);
         if (framesLimit <= 1) {
-            throw new IllegalStateException();// minimum of 2 fames (1 in,1 out)
+            throw new IllegalStateException(); // minimum of 2 frames (1 in,1 out)
         }
 
         this.groupFields = groupFields;
@@ -139,12 +141,14 @@
             private static final long serialVersionUID = 1L;
 
             @Override
-            protected AbstractSortRunGenerator getRunGenerator(IHyracksTaskContext ctx,
+            protected IRunGenerator getRunGenerator(IHyracksTaskContext ctx,
                     IRecordDescriptorProvider recordDescriptorProvider) throws HyracksDataException {
-                return new ExternalSortGroupByRunGenerator(ctx, sortFields,
+                final boolean profile = ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME);
+                IRunGenerator runGen = new ExternalSortGroupByRunGenerator(ctx, sortFields,
                         recordDescriptorProvider.getInputRecordDescriptor(this.getActivityId(), 0), framesLimit,
                         groupFields, keyNormalizerFactories, comparatorFactories, partialAggregatorFactory,
                         partialAggRecordDesc, ALG);
+                return profile ? TimedRunGenerator.time(runGen, ctx, "GroupBy (Sort Runs)") : runGen;
             }
         };
     }
@@ -165,4 +169,14 @@
             }
         };
     }
+
+    @Override
+    public String getDisplayName() {
+        return "GroupBy (Sort)";
+    }
+
+    @Override
+    public String toString() {
+        return getDisplayName();
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 2fd17da..6d2b485 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -320,6 +320,11 @@
                     isFailed = true;
                 }
 
+                @Override
+                public String getDisplayName() {
+                    return "Hybrid Hash Join: Build";
+                }
+
             };
         }
     }
@@ -800,6 +805,11 @@
                         outerReader.close();
                     }
                 }
+
+                @Override
+                public String getDisplayName() {
+                    return "Hybrid Hash Join: Probe & Join";
+                }
             };
             return op;
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
index 8006fc6..a4cfa13 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
@@ -146,6 +146,11 @@
                 sb.append("\"maxReads\": ").append(maxReads).append(" }");
                 return sb.toString();
             }
+
+            @Override
+            public String getDisplayName() {
+                return "Result Writer";
+            }
         };
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java
index 6abc064..6ff79b7 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java
@@ -102,14 +102,14 @@
             super(id);
         }
 
-        protected abstract AbstractSortRunGenerator getRunGenerator(IHyracksTaskContext ctx,
+        protected abstract IRunGenerator getRunGenerator(IHyracksTaskContext ctx,
                 IRecordDescriptorProvider recordDescProvider) throws HyracksDataException;
 
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
                 final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
             return new AbstractUnaryInputSinkOperatorNodePushable() {
-                private AbstractSortRunGenerator runGen;
+                private IRunGenerator runGen;
 
                 @Override
                 public void open() throws HyracksDataException {
@@ -139,6 +139,11 @@
                 public void fail() throws HyracksDataException {
                     runGen.fail();
                 }
+
+                @Override
+                public String getDisplayName() {
+                    return "Sort (Run Generation)";
+                }
             };
         }
     }
@@ -155,6 +160,7 @@
                 IBinaryComparator[] comparators, INormalizedKeyComputer nmkComputer, int necessaryFrames);
 
         @Override
+        @SuppressWarnings("squid:S1188")
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
                 final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
             return new AbstractUnaryOutputSourceOperatorNodePushable() {
@@ -203,6 +209,11 @@
                         }
                     }
                 }
+
+                @Override
+                public String getDisplayName() {
+                    return "Sort (Run Merge)";
+                }
             };
         }
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
index 8b80a26..654f3a3 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
@@ -30,6 +30,7 @@
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
 import org.apache.hyracks.dataflow.std.buffermanager.EnumFreeSlotPolicy;
 
@@ -76,10 +77,12 @@
             private static final long serialVersionUID = 1L;
 
             @Override
-            protected AbstractSortRunGenerator getRunGenerator(IHyracksTaskContext ctx,
+            protected IRunGenerator getRunGenerator(IHyracksTaskContext ctx,
                     IRecordDescriptorProvider recordDescProvider) throws HyracksDataException {
-                return new ExternalSortRunGenerator(ctx, sortFields, keyNormalizerFactories, comparatorFactories,
-                        outRecDescs[0], alg, policy, framesLimit, outputLimit);
+                final boolean profile = ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME);
+                IRunGenerator runGen = new ExternalSortRunGenerator(ctx, sortFields, keyNormalizerFactories,
+                        comparatorFactories, outRecDescs[0], alg, policy, framesLimit, outputLimit);
+                return profile ? TimedRunGenerator.time(runGen, ctx, "ExternalSort(Sort)") : runGen;
             }
         };
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/IRunGenerator.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/IRunGenerator.java
index 02880dd..d43d02d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/IRunGenerator.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/IRunGenerator.java
@@ -33,4 +33,6 @@
      * @return the list of generated (sorted) runs
      */
     List<GeneratedRunFileReader> getRuns();
+
+    ISorter getSorter();
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TimedRunGenerator.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TimedRunGenerator.java
new file mode 100644
index 0000000..b3a4aee
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TimedRunGenerator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.sort;
+
+import java.util.List;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.TimedFrameWriter;
+import org.apache.hyracks.api.job.profiling.IStatsCollector;
+import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
+
+public class TimedRunGenerator extends TimedFrameWriter implements IRunGenerator {
+
+    private final IRunGenerator runGenerator;
+
+    private TimedRunGenerator(IRunGenerator runGenerator, IStatsCollector collector, String name) {
+        super(runGenerator, collector, name);
+        this.runGenerator = runGenerator;
+    }
+
+    @Override
+    public List<GeneratedRunFileReader> getRuns() {
+        return runGenerator.getRuns();
+    }
+
+    @Override
+    public ISorter getSorter() {
+        return runGenerator.getSorter();
+    }
+
+    public static IRunGenerator time(IRunGenerator runGenerator, IHyracksTaskContext ctx, String name) {
+        return runGenerator instanceof TimedRunGenerator ? runGenerator
+                : new TimedRunGenerator(runGenerator, ctx.getStatsCollector(), name);
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java
index b29057f..b7ff530 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TopKSorterOperatorDescriptor.java
@@ -30,6 +30,7 @@
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
 
 public class TopKSorterOperatorDescriptor extends AbstractSorterOperatorDescriptor {
@@ -59,10 +60,12 @@
             private static final long serialVersionUID = 1L;
 
             @Override
-            protected AbstractSortRunGenerator getRunGenerator(IHyracksTaskContext ctx,
+            protected IRunGenerator getRunGenerator(IHyracksTaskContext ctx,
                     IRecordDescriptorProvider recordDescProvider) {
-                return new HybridTopKSortRunGenerator(ctx, framesLimit, topK, sortFields, keyNormalizerFactories,
-                        comparatorFactories, outRecDescs[0]);
+                final boolean profile = ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME);
+                IRunGenerator runGen = new HybridTopKSortRunGenerator(ctx, framesLimit, topK, sortFields,
+                        keyNormalizerFactories, comparatorFactories, outRecDescs[0]);
+                return profile ? TimedRunGenerator.time(runGen, ctx, "TopKSort (Sort)") : runGen;
 
             }
         };
@@ -82,4 +85,10 @@
             }
         };
     }
+
+    @Override
+    public String getDisplayName() {
+        return "Top K Sort";
+    }
+
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java
index fd985db..6f59ed8 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -27,7 +27,6 @@
 import java.io.FileWriter;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.EnumSet;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -38,7 +37,6 @@
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.FileSplit;
 import org.apache.hyracks.api.io.ManagedFileSplit;
-import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.result.IResultSet;
@@ -138,7 +136,7 @@
         if (LOGGER.isInfoEnabled()) {
             LOGGER.info(spec.toJSON().asText());
         }
-        JobId jobId = hcc.startJob(spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
+        JobId jobId = hcc.startJob(spec);
         if (LOGGER.isInfoEnabled()) {
             LOGGER.info(jobId.toString());
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
index 566d8e2..001e250 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
@@ -101,4 +101,9 @@
                 searchCallbackProceedResultTrueValue);
     }
 
+    @Override
+    public String getDisplayName() {
+        return "BTree Search";
+    }
+
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java
index 12dc310..44af086 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java
@@ -36,6 +36,7 @@
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.profiling.IStatsCollector;
 import org.apache.hyracks.api.test.CountAndThrowError;
 import org.apache.hyracks.api.test.CountAndThrowException;
 import org.apache.hyracks.api.test.CountAnswer;
@@ -307,6 +308,7 @@
         IHyracksTaskContext ctx = Mockito.mock(IHyracksTaskContext.class);
         IHyracksJobletContext jobletCtx = Mockito.mock(IHyracksJobletContext.class);
         INCServiceContext serviceCtx = Mockito.mock(INCServiceContext.class);
+        IStatsCollector collector = Mockito.mock(IStatsCollector.class);
         Mockito.when(ctx.allocateFrame()).thenReturn(mockByteBuffer());
         Mockito.when(ctx.allocateFrame(Mockito.anyInt())).thenReturn(mockByteBuffer());
         Mockito.when(ctx.getInitialFrameSize()).thenReturn(BUFFER_SIZE);
@@ -314,6 +316,7 @@
                 .thenReturn(mockByteBuffer());
         Mockito.when(ctx.getJobletContext()).thenReturn(jobletCtx);
         Mockito.when(jobletCtx.getServiceContext()).thenReturn(serviceCtx);
+        Mockito.when(ctx.getStatsCollector()).thenReturn(collector);
         return new IHyracksTaskContext[] { ctx };
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/pom.xml b/hyracks-fullstack/hyracks/hyracks-storage-am-common/pom.xml
index 262570a..31463b7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/pom.xml
@@ -92,11 +92,6 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
-      <groupId>org.apache.hyracks</groupId>
-      <artifactId>hyracks-control-common</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
index d504a96..142e879 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
@@ -32,7 +32,6 @@
 import org.apache.hyracks.api.job.profiling.IOperatorStats;
 import org.apache.hyracks.api.util.CleanupUtils;
 import org.apache.hyracks.api.util.ExceptionUtils;
-import org.apache.hyracks.control.common.job.profiling.OperatorStats;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -93,7 +92,7 @@
     protected ArrayTupleBuilder nonFilterTupleBuild;
     protected final ISearchOperationCallbackFactory searchCallbackFactory;
     protected boolean failed = false;
-    private final IOperatorStats stats;
+    private IOperatorStats stats;
 
     // Used when the result of the search operation callback needs to be passed.
     protected boolean appendSearchCallbackProceedResult;
@@ -150,13 +149,13 @@
         this.appendSearchCallbackProceedResult = appendSearchCallbackProceedResult;
         this.searchCallbackProceedResultFalseValue = searchCallbackProceedResultFalseValue;
         this.searchCallbackProceedResultTrueValue = searchCallbackProceedResultTrueValue;
-        stats = new OperatorStats(getDisplayName());
-        if (ctx.getStatsCollector() != null) {
-            ctx.getStatsCollector().add(stats);
-        }
         this.tupleFilterFactory = tupleFactoryFactory;
         this.outputLimit = outputLimit;
 
+        if (ctx != null && ctx.getStatsCollector() != null) {
+            stats = ctx.getStatsCollector().getOrAddOperatorStats(getDisplayName());
+        }
+
         if (this.tupleFilterFactory != null && this.retainMissing) {
             throw new IllegalStateException("RetainMissing with tuple filter is not supported");
         }
@@ -435,4 +434,9 @@
 
     }
 
+    @Override
+    public String getDisplayName() {
+        return "Index Search";
+    }
+
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/CounterContext.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/CounterContext.java
index 7cbf948..267c69e 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/CounterContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/CounterContext.java
@@ -21,9 +21,9 @@
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.hyracks.api.com.job.profiling.counters.Counter;
 import org.apache.hyracks.api.job.profiling.counters.ICounter;
 import org.apache.hyracks.api.job.profiling.counters.ICounterContext;
-import org.apache.hyracks.control.common.job.profiling.counters.Counter;
 
 public class CounterContext implements ICounterContext {
     private final String contextName;