[ASTERIXDB-2393][COMP][RT] Add source location to error messages

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Add source locations to compiler and runtime error messages
- Add source location enforcement to the test framework
  and enable it for SqlppExecutionTest

Change-Id: Ie279ce345d1edcb5dea8e55cdb0233151c7bfd66
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2659
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/exceptions/AlgebricksException.java b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/exceptions/AlgebricksException.java
index 54292c3..46e80be 100644
--- a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/exceptions/AlgebricksException.java
+++ b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/exceptions/AlgebricksException.java
@@ -22,6 +22,7 @@
 
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.IFormattedException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.api.util.ErrorMessageUtil;
 
 public class AlgebricksException extends Exception implements IFormattedException {
@@ -32,15 +33,17 @@
     private final int errorCode;
     private final Serializable[] params;
     private final String nodeId;
+    private final SourceLocation sourceLoc;
 
     @SuppressWarnings("squid:S1165") // exception class not final
     private transient CachedMessage msgCache;
 
-    public AlgebricksException(String component, int errorCode, String message, Throwable cause, String nodeId,
-            Serializable... params) {
+    public AlgebricksException(String component, int errorCode, String message, Throwable cause,
+            SourceLocation sourceLoc, String nodeId, Serializable... params) {
         super(message, cause);
         this.component = component;
         this.errorCode = errorCode;
+        this.sourceLoc = sourceLoc;
         this.nodeId = nodeId;
         this.params = params;
     }
@@ -50,7 +53,7 @@
      */
     @Deprecated
     public AlgebricksException(String message) {
-        this(ErrorMessageUtil.NONE, UNKNOWN, message, null, (Serializable[]) null);
+        this(ErrorMessageUtil.NONE, UNKNOWN, message, null, null, (Serializable[]) null);
     }
 
     /**
@@ -66,32 +69,60 @@
      */
     @Deprecated
     public AlgebricksException(String message, Throwable cause) {
-        this(ErrorMessageUtil.NONE, UNKNOWN, message, cause, (String) null);
+        this(ErrorMessageUtil.NONE, UNKNOWN, message, cause, null, (Serializable[]) null);
+    }
+
+    public AlgebricksException(String component, int errorCode, SourceLocation sourceLoc, Serializable... params) {
+        this(component, errorCode, null, null, sourceLoc, null, params);
     }
 
     public AlgebricksException(String component, int errorCode, Serializable... params) {
-        this(component, errorCode, null, null, null, params);
+        this(component, errorCode, null, null, null, null, params);
+    }
+
+    public AlgebricksException(Throwable cause, int errorCode, SourceLocation sourceLoc, Serializable... params) {
+        this(ErrorMessageUtil.NONE, errorCode, cause.getMessage(), cause, sourceLoc, null, params);
     }
 
     public AlgebricksException(Throwable cause, int errorCode, Serializable... params) {
-        this(ErrorMessageUtil.NONE, errorCode, cause.getMessage(), cause, null, params);
+        this(ErrorMessageUtil.NONE, errorCode, cause.getMessage(), cause, null, null, params);
+    }
+
+    public AlgebricksException(String component, int errorCode, String message, SourceLocation sourceLoc,
+            Serializable... params) {
+        this(component, errorCode, message, null, sourceLoc, null, params);
     }
 
     public AlgebricksException(String component, int errorCode, String message, Serializable... params) {
-        this(component, errorCode, message, null, null, params);
+        this(component, errorCode, message, null, null, null, params);
+    }
+
+    public AlgebricksException(String component, int errorCode, Throwable cause, SourceLocation sourceLoc,
+            Serializable... params) {
+        this(component, errorCode, cause.getMessage(), cause, sourceLoc, null, params);
     }
 
     public AlgebricksException(String component, int errorCode, Throwable cause, Serializable... params) {
-        this(component, errorCode, cause.getMessage(), cause, null, params);
+        this(component, errorCode, cause.getMessage(), cause, null, null, params);
+    }
+
+    public AlgebricksException(String component, int errorCode, String message, Throwable cause,
+            SourceLocation sourceLoc, Serializable... params) {
+        this(component, errorCode, message, cause, sourceLoc, null, params);
     }
 
     public AlgebricksException(String component, int errorCode, String message, Throwable cause,
             Serializable... params) {
-        this(component, errorCode, message, cause, null, params);
+        this(component, errorCode, message, cause, null, null, params);
+    }
+
+    public static AlgebricksException create(int errorCode, SourceLocation sourceLoc, Serializable... params) {
+        return new AlgebricksException(ErrorCode.HYRACKS, errorCode, ErrorCode.getErrorMessage(errorCode), sourceLoc,
+                params);
     }
 
     public static AlgebricksException create(int errorCode, Serializable... params) {
-        return new AlgebricksException(ErrorCode.HYRACKS, errorCode, ErrorCode.getErrorMessage(errorCode), params);
+        return create(errorCode, null, params);
     }
 
     @Override
@@ -112,11 +143,15 @@
         return nodeId;
     }
 
+    public SourceLocation getSourceLocation() {
+        return sourceLoc;
+    }
+
     @Override
     public String getMessage() {
         if (msgCache == null) {
-            msgCache =
-                    new CachedMessage(ErrorMessageUtil.formatMessage(component, errorCode, super.getMessage(), params));
+            msgCache = new CachedMessage(
+                    ErrorMessageUtil.formatMessage(component, errorCode, super.getMessage(), sourceLoc, params));
         }
         return msgCache.message;
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/exceptions/NotImplementedException.java b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/exceptions/NotImplementedException.java
index 16bc73e..d308969 100644
--- a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/exceptions/NotImplementedException.java
+++ b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/exceptions/NotImplementedException.java
@@ -22,18 +22,10 @@
     private static final long serialVersionUID = 2L;
 
     public NotImplementedException() {
-        System.err.println("Not implemented.");
+        super("Not implemented.");
     }
 
     public NotImplementedException(String message) {
         super(message);
     }
-
-    public NotImplementedException(Throwable cause) {
-        super(cause);
-    }
-
-    public NotImplementedException(String message, Throwable cause) {
-        super(message, cause);
-    }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalExpression.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalExpression.java
index 7e10203..1de7e3c 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalExpression.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalExpression.java
@@ -27,6 +27,7 @@
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
 import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionVisitor;
+import org.apache.hyracks.api.exceptions.SourceLocation;
 
 public interface ILogicalExpression {
 
@@ -73,4 +74,6 @@
     public abstract ILogicalExpression cloneExpression();
 
     public boolean isFunctional();
+
+    SourceLocation getSourceLocation();
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
index dd7e065..6bd0d02 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
@@ -33,6 +33,7 @@
 import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
 import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.api.exceptions.SourceLocation;
 
 public interface ILogicalOperator {
 
@@ -103,4 +104,6 @@
      * Indicates whether the expressions used by this operator must be variable reference expressions.
      */
     public boolean requiresVariableReferenceExpressions();
+
+    SourceLocation getSourceLocation();
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/AbstractLogicalExpression.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/AbstractLogicalExpression.java
index 0717c0e..de4d5ac 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/AbstractLogicalExpression.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/AbstractLogicalExpression.java
@@ -25,9 +25,12 @@
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
+import org.apache.hyracks.api.exceptions.SourceLocation;
 
 public abstract class AbstractLogicalExpression implements ILogicalExpression {
 
+    protected SourceLocation sourceLoc;
+
     @Override
     public void getConstraintsAndEquivClasses(Collection<FunctionalDependency> fds,
             Map<LogicalVariable, EquivalenceClass> equivClasses) {
@@ -45,4 +48,12 @@
         return true;
     }
 
+    @Override
+    public SourceLocation getSourceLocation() {
+        return sourceLoc;
+    }
+
+    public void setSourceLocation(SourceLocation sourceLoc) {
+        this.sourceLoc = sourceLoc;
+    }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/AggregateFunctionCallExpression.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/AggregateFunctionCallExpression.java
index 8fca47c..bdd820e 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/AggregateFunctionCallExpression.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/AggregateFunctionCallExpression.java
@@ -59,6 +59,7 @@
         AggregateFunctionCallExpression fun = new AggregateFunctionCallExpression(finfo, twoStep, clonedArgs);
         fun.setStepTwoAggregate(stepTwoAggregate);
         fun.setStepOneAggregate(stepOneAggregate);
+        fun.setSourceLocation(sourceLoc);
         return fun;
     }
 
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/ConstantExpression.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/ConstantExpression.java
index 689d51c..42ff3c0 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/ConstantExpression.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/ConstantExpression.java
@@ -174,6 +174,7 @@
         annotationMap.forEach((key, value1) -> m.put(key, value1.copy()));
         ConstantExpression c = new ConstantExpression(value);
         c.annotationMap = m;
+        c.setSourceLocation(sourceLoc);
         return c;
     }
 
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/ScalarFunctionCallExpression.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/ScalarFunctionCallExpression.java
index 6308636..f8b25e2 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/ScalarFunctionCallExpression.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/ScalarFunctionCallExpression.java
@@ -47,6 +47,7 @@
         ScalarFunctionCallExpression funcExpr = new ScalarFunctionCallExpression(finfo, clonedArgs);
         funcExpr.getAnnotations().putAll(cloneAnnotations());
         funcExpr.setOpaqueParameters(this.getOpaqueParameters());
+        funcExpr.setSourceLocation(sourceLoc);
         return funcExpr;
     }
 
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/StatefulFunctionCallExpression.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/StatefulFunctionCallExpression.java
index 661c389..0d310f7 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/StatefulFunctionCallExpression.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/StatefulFunctionCallExpression.java
@@ -52,7 +52,10 @@
     public StatefulFunctionCallExpression cloneExpression() {
         cloneAnnotations();
         List<Mutable<ILogicalExpression>> clonedArgs = cloneArguments();
-        return new StatefulFunctionCallExpression(finfo, propertiesComputer, clonedArgs);
+        StatefulFunctionCallExpression clonedExpr =
+                new StatefulFunctionCallExpression(finfo, propertiesComputer, clonedArgs);
+        clonedExpr.setSourceLocation(sourceLoc);
+        return clonedExpr;
     }
 
     @Override
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/UnnestingFunctionCallExpression.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/UnnestingFunctionCallExpression.java
index ac539ec..3dad669 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/UnnestingFunctionCallExpression.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/UnnestingFunctionCallExpression.java
@@ -50,6 +50,7 @@
         UnnestingFunctionCallExpression ufce = new UnnestingFunctionCallExpression(finfo, clonedArgs);
         ufce.setReturnsUniqueValues(returnsUniqueValues);
         ufce.setOpaqueParameters(this.getOpaqueParameters());
+        ufce.setSourceLocation(sourceLoc);
         return ufce;
     }
 
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/VariableReferenceExpression.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/VariableReferenceExpression.java
index c1342cc..1c7da34 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/VariableReferenceExpression.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/VariableReferenceExpression.java
@@ -95,7 +95,9 @@
 
     @Override
     public AbstractLogicalExpression cloneExpression() {
-        return new VariableReferenceExpression(variable);
+        VariableReferenceExpression varRef = new VariableReferenceExpression(variable);
+        varRef.setSourceLocation(sourceLoc);
+        return varRef;
     }
 
     @Override
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
index 4686f32..1dbf15e 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
@@ -40,6 +40,7 @@
 import org.apache.hyracks.algebricks.core.algebra.typing.OpRefTypeEnvPointer;
 import org.apache.hyracks.algebricks.core.algebra.typing.PropagatingTypeEnvironment;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.api.exceptions.SourceLocation;
 
 public abstract class AbstractLogicalOperator implements ILogicalOperator {
 
@@ -69,6 +70,8 @@
     protected final List<Mutable<ILogicalOperator>> inputs;
     protected List<LogicalVariable> schema;
 
+    private SourceLocation sourceLoc;
+
     public AbstractLogicalOperator() {
         inputs = new ArrayList<>();
     }
@@ -195,4 +198,13 @@
     public boolean requiresVariableReferenceExpressions() {
         return true;
     }
+
+    @Override
+    public SourceLocation getSourceLocation() {
+        return sourceLoc;
+    }
+
+    public void setSourceLocation(SourceLocation sourceLoc) {
+        this.sourceLoc = sourceLoc;
+    }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalExpressionDeepCopyWithNewVariablesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalExpressionDeepCopyWithNewVariablesVisitor.java
index 24c5162..6be8ee0 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalExpressionDeepCopyWithNewVariablesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalExpressionDeepCopyWithNewVariablesVisitor.java
@@ -30,6 +30,7 @@
 import org.apache.hyracks.algebricks.core.algebra.base.IVariableContext;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractLogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
 import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionAnnotation;
@@ -80,6 +81,10 @@
         dest.setOpaqueParameters(newOpaqueParameters);
     }
 
+    private void copySourceLocation(ILogicalExpression src, AbstractLogicalExpression dest) {
+        dest.setSourceLocation(src.getSourceLocation());
+    }
+
     public MutableObject<ILogicalExpression> deepCopyExpressionReference(Mutable<ILogicalExpression> exprRef)
             throws AlgebricksException {
         return new MutableObject<>(deepCopy(exprRef.getValue()));
@@ -104,12 +109,15 @@
         exprCopy.setStepTwoAggregate(expr.getStepTwoAggregate());
         deepCopyAnnotations(expr, exprCopy);
         deepCopyOpaqueParameters(expr, exprCopy);
+        copySourceLocation(expr, exprCopy);
         return exprCopy;
     }
 
     @Override
     public ILogicalExpression visitConstantExpression(ConstantExpression expr, Void arg) throws AlgebricksException {
-        return new ConstantExpression(expr.getValue());
+        ConstantExpression exprCopy = new ConstantExpression(expr.getValue());
+        copySourceLocation(expr, exprCopy);
+        return exprCopy;
     }
 
     @Override
@@ -119,6 +127,7 @@
                 deepCopyExpressionReferenceList(expr.getArguments()));
         deepCopyAnnotations(expr, exprCopy);
         deepCopyOpaqueParameters(expr, exprCopy);
+        copySourceLocation(expr, exprCopy);
         return exprCopy;
 
     }
@@ -130,6 +139,7 @@
                 expr.getPropertiesComputer(), deepCopyExpressionReferenceList(expr.getArguments()));
         deepCopyAnnotations(expr, exprCopy);
         deepCopyOpaqueParameters(expr, exprCopy);
+        copySourceLocation(expr, exprCopy);
         return exprCopy;
     }
 
@@ -140,6 +150,7 @@
                 deepCopyExpressionReferenceList(expr.getArguments()));
         deepCopyAnnotations(expr, exprCopy);
         deepCopyOpaqueParameters(expr, exprCopy);
+        copySourceLocation(expr, exprCopy);
         return exprCopy;
     }
 
@@ -153,13 +164,17 @@
         LogicalVariable givenVarReplacement = inVarMapping.get(var);
         if (givenVarReplacement != null) {
             outVarMapping.put(var, givenVarReplacement);
-            return new VariableReferenceExpression(givenVarReplacement);
+            VariableReferenceExpression varRef = new VariableReferenceExpression(givenVarReplacement);
+            copySourceLocation(expr, varRef);
+            return varRef;
         }
         LogicalVariable varCopy = outVarMapping.get(var);
         if (varCopy == null) {
             varCopy = varContext.newVar();
             outVarMapping.put(var, varCopy);
         }
-        return new VariableReferenceExpression(varCopy);
+        VariableReferenceExpression varRef = new VariableReferenceExpression(varCopy);
+        copySourceLocation(expr, varRef);
+        return varRef;
     }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
index d6062ee..e0210cc 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
@@ -146,6 +146,10 @@
         dest.getAnnotations().putAll(src.getAnnotations());
     }
 
+    private void copySourceLocation(ILogicalOperator src, AbstractLogicalOperator dest) {
+        dest.setSourceLocation(src.getSourceLocation());
+    }
+
     public ILogicalOperator deepCopy(ILogicalOperator op) throws AlgebricksException {
         // The deep copy call outside this visitor always has a null argument.
         return deepCopy(op, null);
@@ -269,6 +273,7 @@
             AbstractLogicalOperator opCopy) throws AlgebricksException {
         deepCopyInputs(op, opCopy, arg);
         copyAnnotations(op, opCopy);
+        copySourceLocation(op, opCopy);
         opCopy.setExecutionMode(op.getExecutionMode());
     }
 
@@ -337,6 +342,7 @@
     @Override
     public ILogicalOperator visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, ILogicalOperator arg) {
         EmptyTupleSourceOperator opCopy = new EmptyTupleSourceOperator();
+        copySourceLocation(op, opCopy);
         opCopy.setExecutionMode(op.getExecutionMode());
         return opCopy;
     }
@@ -371,6 +377,7 @@
                         deepCopyOperatorReference(op.getInputs().get(0), arg),
                         deepCopyOperatorReference(op.getInputs().get(1), arg));
         copyAnnotations(op, opCopy);
+        copySourceLocation(op, opCopy);
         opCopy.setExecutionMode(op.getExecutionMode());
         return opCopy;
     }
@@ -383,6 +390,7 @@
                         deepCopyOperatorReference(op.getInputs().get(0), arg),
                         deepCopyOperatorReference(op.getInputs().get(1), arg));
         copyAnnotations(op, opCopy);
+        copySourceLocation(op, opCopy);
         opCopy.setExecutionMode(op.getExecutionMode());
         return opCopy;
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractUnnestPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractUnnestPOperator.java
index 4a177f7..0602258 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractUnnestPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractUnnestPOperator.java
@@ -81,6 +81,7 @@
         int[] projectionList = JobGenHelper.projectAllVariables(opSchema);
         UnnestRuntimeFactory unnestRuntime = new UnnestRuntimeFactory(outCol, unnestingFactory, projectionList,
                 unnest.getPositionWriter(), leftOuter, context.getMissingWriterFactory());
+        unnestRuntime.setSourceLocation(unnest.getSourceLocation());
         RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
         builder.contributeMicroOperator(unnest, unnestRuntime, recDesc);
         ILogicalOperator src = unnest.getInputs().get(0).getValue();
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java
index 147d5cc..2736b6f 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java
@@ -102,6 +102,7 @@
         }
 
         AggregateRuntimeFactory runtime = new AggregateRuntimeFactory(aggFactories);
+        runtime.setSourceLocation(aggOp.getSourceLocation());
 
         // contribute one Asterix framewriter
         RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
index ccd27f4..2df5a4e 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
@@ -90,6 +90,7 @@
 
         AssignRuntimeFactory runtime =
                 new AssignRuntimeFactory(outColumns, evalFactories, projectionList, flushFramesRapidly);
+        runtime.setSourceLocation(assign.getSourceLocation());
 
         // contribute one Asterix framewriter
         RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
@@ -103,7 +104,6 @@
         // and contribute one edge from its child
         ILogicalOperator src = assign.getInputs().get(0).getValue();
         builder.contributeGraphEdge(src, 0, assign, 0);
-
     }
 
     @Override
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java
index 2204637..bb0f08b 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java
@@ -107,8 +107,10 @@
         Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints =
                 mp.getInsertRuntime(dataSource, propagatedSchema, typeEnv, primaryKeys, payload,
                         additionalFilteringKeys, additionalNonFilterVars, inputDesc, context, spec, true);
-        builder.contributeHyracksOperator(insertDeleteOp, runtimeAndConstraints.first);
-        builder.contributeAlgebricksPartitionConstraint(runtimeAndConstraints.first, runtimeAndConstraints.second);
+        IOperatorDescriptor opDesc = runtimeAndConstraints.first;
+        opDesc.setSourceLocation(insertDeleteOp.getSourceLocation());
+        builder.contributeHyracksOperator(insertDeleteOp, opDesc);
+        builder.contributeAlgebricksPartitionConstraint(opDesc, runtimeAndConstraints.second);
         ILogicalOperator src = insertDeleteOp.getInputs().get(0).getValue();
         builder.contributeGraphEdge(src, 0, insertDeleteOp, 0);
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DataSourceScanPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DataSourceScanPOperator.java
index e8c5c64..3ddb233 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DataSourceScanPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DataSourceScanPOperator.java
@@ -119,9 +119,11 @@
         Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = mp.getScannerRuntime(dataSource, vars, projectVars,
                 scan.isProjectPushed(), scan.getMinFilterVars(), scan.getMaxFilterVars(), tupleFilterFactory,
                 scan.getOutputLimit(), opSchema, typeEnv, context, builder.getJobSpec(), implConfig);
-        builder.contributeHyracksOperator(scan, p.first);
+        IOperatorDescriptor opDesc = p.first;
+        opDesc.setSourceLocation(scan.getSourceLocation());
+        builder.contributeHyracksOperator(scan, opDesc);
         if (p.second != null) {
-            builder.contributeAlgebricksPartitionConstraint(p.first, p.second);
+            builder.contributeAlgebricksPartitionConstraint(opDesc, p.second);
         }
 
         ILogicalOperator srcExchange = scan.getInputs().get(0).getValue();
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
index 178f2a1..34dc0d4 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
@@ -105,8 +105,10 @@
 
         Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints =
                 mp.getResultHandleRuntime(resultOp.getDataSink(), columns, pf, inputDesc, true, spec);
+        IOperatorDescriptor opDesc = runtimeAndConstraints.first;
+        opDesc.setSourceLocation(resultOp.getSourceLocation());
+        builder.contributeHyracksOperator(resultOp, opDesc);
 
-        builder.contributeHyracksOperator(resultOp, runtimeAndConstraints.first);
         ILogicalOperator src = resultOp.getInputs().get(0).getValue();
         builder.contributeGraphEdge(src, 0, resultOp, 0);
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/EmptyTupleSourcePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/EmptyTupleSourcePOperator.java
index 0718d13..502d022 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/EmptyTupleSourcePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/EmptyTupleSourcePOperator.java
@@ -63,6 +63,7 @@
             IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
             throws AlgebricksException {
         EmptyTupleSourceRuntimeFactory runtime = new EmptyTupleSourceRuntimeFactory();
+        runtime.setSourceLocation(op.getSourceLocation());
         RecordDescriptor recDesc = new RecordDescriptor(new ISerializerDeserializer[] {});
         builder.contributeMicroOperator(op, runtime, recDesc);
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
index 5ee967d..edc9d08 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
@@ -62,6 +62,7 @@
 import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.dataflow.std.group.AbstractAggregatorDescriptorFactory;
 import org.apache.hyracks.dataflow.std.group.HashSpillableTableFactory;
 import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 import org.apache.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
@@ -253,8 +254,10 @@
             merges[i] = expressionRuntimeProvider.createSerializableAggregateFunctionFactory(mergeFun, aggOpInputEnv,
                     localInputSchemas, context);
         }
-        IAggregatorDescriptorFactory aggregatorFactory = new SerializableAggregatorDescriptorFactory(aff);
-        IAggregatorDescriptorFactory mergeFactory = new SerializableAggregatorDescriptorFactory(merges);
+        AbstractAggregatorDescriptorFactory aggregatorFactory = new SerializableAggregatorDescriptorFactory(aff);
+        aggregatorFactory.setSourceLocation(gby.getSourceLocation());
+        AbstractAggregatorDescriptorFactory mergeFactory = new SerializableAggregatorDescriptorFactory(merges);
+        mergeFactory.setSourceLocation(gby.getSourceLocation());
 
         INormalizedKeyComputerFactory normalizedKeyFactory =
                 JobGenHelper.variablesToAscNormalizedKeyComputerFactory(gbyCols, aggOpInputEnv, context);
@@ -268,6 +271,7 @@
         ExternalGroupOperatorDescriptor gbyOpDesc = new ExternalGroupOperatorDescriptor(spec, hashTableSize, inputSize,
                 keyAndDecFields, frameLimit, comparatorFactories, normalizedKeyFactory, aggregatorFactory, mergeFactory,
                 recordDescriptor, recordDescriptor, new HashSpillableTableFactory(hashFunctionFactories));
+        gbyOpDesc.setSourceLocation(gby.getSourceLocation());
         contributeOpDesc(builder, gby, gbyOpDesc);
         ILogicalOperator src = op.getInputs().get(0).getValue();
         builder.contributeGraphEdge(src, 0, op, 0);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
index 301b8f1..d06dde5 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
@@ -152,6 +152,7 @@
             opDesc = generateHashJoinRuntime(context, inputSchemas, keysLeft, keysRight, hashFunFactories,
                     comparatorFactories, predEvaluatorFactory, recDescriptor, spec);
         }
+        opDesc.setSourceLocation(op.getSourceLocation());
         contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
 
         ILogicalOperator src1 = op.getInputs().get(0).getValue();
@@ -164,68 +165,49 @@
             int[] keysLeft, int[] keysRight, IBinaryHashFunctionFactory[] hashFunFactories,
             IBinaryComparatorFactory[] comparatorFactories, IPredicateEvaluatorFactory predEvaluatorFactory,
             RecordDescriptor recDescriptor, IOperatorDescriptorRegistry spec) throws AlgebricksException {
-        IOperatorDescriptor opDesc;
-        try {
-            switch (kind) {
-                case INNER:
-                    opDesc = new HybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(), maxInputBuildSizeInFrames,
-                            aveRecordsPerFrame, getFudgeFactor(), keysLeft, keysRight, hashFunFactories,
-                            comparatorFactories, recDescriptor, predEvaluatorFactory, false, null);
-                    break;
-                case LEFT_OUTER:
-                    IMissingWriterFactory[] nonMatchWriterFactories =
-                            new IMissingWriterFactory[inputSchemas[1].getSize()];
-                    for (int j = 0; j < nonMatchWriterFactories.length; j++) {
-                        nonMatchWriterFactories[j] = context.getMissingWriterFactory();
-                    }
-                    opDesc = new HybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(), maxInputBuildSizeInFrames,
-                            aveRecordsPerFrame, getFudgeFactor(), keysLeft, keysRight, hashFunFactories,
-                            comparatorFactories, recDescriptor, predEvaluatorFactory, true, nonMatchWriterFactories);
-                    break;
-                default:
-                    throw new NotImplementedException();
-            }
-        } catch (HyracksDataException e) {
-            throw new AlgebricksException(e);
+        switch (kind) {
+            case INNER:
+                return new HybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(), maxInputBuildSizeInFrames,
+                        aveRecordsPerFrame, getFudgeFactor(), keysLeft, keysRight, hashFunFactories,
+                        comparatorFactories, recDescriptor, predEvaluatorFactory, false, null);
+            case LEFT_OUTER:
+                IMissingWriterFactory[] nonMatchWriterFactories = new IMissingWriterFactory[inputSchemas[1].getSize()];
+                for (int j = 0; j < nonMatchWriterFactories.length; j++) {
+                    nonMatchWriterFactories[j] = context.getMissingWriterFactory();
+                }
+                return new HybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(), maxInputBuildSizeInFrames,
+                        aveRecordsPerFrame, getFudgeFactor(), keysLeft, keysRight, hashFunFactories,
+                        comparatorFactories, recDescriptor, predEvaluatorFactory, true, nonMatchWriterFactories);
+            default:
+                throw new NotImplementedException();
         }
-        return opDesc;
     }
 
     private IOperatorDescriptor generateOptimizedHashJoinRuntime(JobGenContext context, IOperatorSchema[] inputSchemas,
             int[] keysLeft, int[] keysRight, IBinaryHashFunctionFamily[] hashFunFamilies,
             IBinaryComparatorFactory[] comparatorFactories, IPredicateEvaluatorFactory predEvaluatorFactory,
             RecordDescriptor recDescriptor, IOperatorDescriptorRegistry spec) throws AlgebricksException {
-        IOperatorDescriptor opDesc;
-        try {
-            switch (kind) {
-                case INNER:
-                    opDesc = new OptimizedHybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
-                            maxInputBuildSizeInFrames, getFudgeFactor(), keysLeft, keysRight, hashFunFamilies,
-                            comparatorFactories, recDescriptor,
-                            new JoinMultiComparatorFactory(comparatorFactories, keysLeft, keysRight),
-                            new JoinMultiComparatorFactory(comparatorFactories, keysRight, keysLeft),
-                            predEvaluatorFactory);
-                    break;
-                case LEFT_OUTER:
-                    IMissingWriterFactory[] nonMatchWriterFactories =
-                            new IMissingWriterFactory[inputSchemas[1].getSize()];
-                    for (int j = 0; j < nonMatchWriterFactories.length; j++) {
-                        nonMatchWriterFactories[j] = context.getMissingWriterFactory();
-                    }
-                    opDesc = new OptimizedHybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
-                            maxInputBuildSizeInFrames, getFudgeFactor(), keysLeft, keysRight, hashFunFamilies,
-                            comparatorFactories, recDescriptor,
-                            new JoinMultiComparatorFactory(comparatorFactories, keysLeft, keysRight),
-                            new JoinMultiComparatorFactory(comparatorFactories, keysRight, keysLeft),
-                            predEvaluatorFactory, true, nonMatchWriterFactories);
-                    break;
-                default:
-                    throw new NotImplementedException();
-            }
-        } catch (HyracksDataException e) {
-            throw new AlgebricksException(e);
+        switch (kind) {
+            case INNER:
+                return new OptimizedHybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
+                        maxInputBuildSizeInFrames, getFudgeFactor(), keysLeft, keysRight, hashFunFamilies,
+                        comparatorFactories, recDescriptor,
+                        new JoinMultiComparatorFactory(comparatorFactories, keysLeft, keysRight),
+                        new JoinMultiComparatorFactory(comparatorFactories, keysRight, keysLeft), predEvaluatorFactory);
+            case LEFT_OUTER:
+                IMissingWriterFactory[] nonMatchWriterFactories = new IMissingWriterFactory[inputSchemas[1].getSize()];
+                for (int j = 0; j < nonMatchWriterFactories.length; j++) {
+                    nonMatchWriterFactories[j] = context.getMissingWriterFactory();
+                }
+                return new OptimizedHybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
+                        maxInputBuildSizeInFrames, getFudgeFactor(), keysLeft, keysRight, hashFunFamilies,
+                        comparatorFactories, recDescriptor,
+                        new JoinMultiComparatorFactory(comparatorFactories, keysLeft, keysRight),
+                        new JoinMultiComparatorFactory(comparatorFactories, keysRight, keysLeft), predEvaluatorFactory,
+                        true, nonMatchWriterFactories);
+            default:
+                throw new NotImplementedException();
         }
-        return opDesc;
     }
 
     @Override
@@ -264,7 +246,7 @@
 }
 
 /**
- * {@ ITuplePairComparatorFactory} implementation for optimized hybrid hash join.
+ * {@code ITuplePairComparatorFactory} implementation for optimized hybrid hash join.
  */
 class JoinMultiComparatorFactory implements ITuplePairComparatorFactory {
     private static final long serialVersionUID = 1L;
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java
index 9c29c53..580d8e1 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java
@@ -105,15 +105,14 @@
         RecordDescriptor recDescriptor =
                 JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
         IOperatorDescriptorRegistry spec = builder.getJobSpec();
-        IOperatorDescriptor opDesc = null;
+        IOperatorDescriptor opDesc;
 
         switch (kind) {
-            case INNER: {
+            case INNER:
                 opDesc = new InMemoryHashJoinOperatorDescriptor(spec, keysLeft, keysRight, hashFunFactories,
                         comparatorFactories, recDescriptor, tableSize, predEvaluatorFactory, memSizeInFrames);
                 break;
-            }
-            case LEFT_OUTER: {
+            case LEFT_OUTER:
                 IMissingWriterFactory[] nonMatchWriterFactories = new IMissingWriterFactory[inputSchemas[1].getSize()];
                 for (int j = 0; j < nonMatchWriterFactories.length; j++) {
                     nonMatchWriterFactories[j] = context.getMissingWriterFactory();
@@ -122,11 +121,11 @@
                         comparatorFactories, predEvaluatorFactory, recDescriptor, true, nonMatchWriterFactories,
                         tableSize, memSizeInFrames);
                 break;
-            }
-            default: {
+            default:
                 throw new NotImplementedException();
-            }
         }
+
+        opDesc.setSourceLocation(op.getSourceLocation());
         contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
 
         ILogicalOperator src1 = op.getInputs().get(0).getValue();
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java
index fa0fb1a..6512700 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java
@@ -135,8 +135,10 @@
         Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints =
                 mp.getIndexInsertRuntime(dataSourceIndex, propagatedSchema, inputSchemas, typeEnv, primaryKeys,
                         secondaryKeys, additionalFilteringKeys, filterExpr, inputDesc, context, spec, true);
-        builder.contributeHyracksOperator(indexInsertDeleteOp, runtimeAndConstraints.first);
-        builder.contributeAlgebricksPartitionConstraint(runtimeAndConstraints.first, runtimeAndConstraints.second);
+        IOperatorDescriptor opDesc = runtimeAndConstraints.first;
+        opDesc.setSourceLocation(indexInsertDeleteOp.getSourceLocation());
+        builder.contributeHyracksOperator(indexInsertDeleteOp, opDesc);
+        builder.contributeAlgebricksPartitionConstraint(opDesc, runtimeAndConstraints.second);
         ILogicalOperator src = indexInsertDeleteOp.getInputs().get(0).getValue();
         builder.contributeGraphEdge(src, 0, indexInsertDeleteOp, 0);
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
index a66db35..92fa86d 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
@@ -118,19 +118,30 @@
 
         Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints = null;
         IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(insertDeleteUpsertOp);
-        if (insertDeleteUpsertOp.getOperation() == Kind.INSERT) {
-            runtimeAndConstraints = mp.getIndexInsertRuntime(dataSourceIndex, propagatedSchema, inputSchemas, typeEnv,
-                    primaryKeys, secondaryKeys, additionalFilteringKeys, filterExpr, inputDesc, context, spec, false);
-        } else if (insertDeleteUpsertOp.getOperation() == Kind.DELETE) {
-            runtimeAndConstraints = mp.getIndexDeleteRuntime(dataSourceIndex, propagatedSchema, inputSchemas, typeEnv,
-                    primaryKeys, secondaryKeys, additionalFilteringKeys, filterExpr, inputDesc, context, spec);
-        } else if (insertDeleteUpsertOp.getOperation() == Kind.UPSERT) {
-            runtimeAndConstraints = mp.getIndexUpsertRuntime(dataSourceIndex, propagatedSchema, inputSchemas, typeEnv,
-                    primaryKeys, secondaryKeys, additionalFilteringKeys, filterExpr, prevSecondaryKeys,
-                    prevAdditionalFilteringKey, inputDesc, context, spec);
+        Kind operation = insertDeleteUpsertOp.getOperation();
+        switch (operation) {
+            case INSERT:
+                runtimeAndConstraints =
+                        mp.getIndexInsertRuntime(dataSourceIndex, propagatedSchema, inputSchemas, typeEnv, primaryKeys,
+                                secondaryKeys, additionalFilteringKeys, filterExpr, inputDesc, context, spec, false);
+                break;
+            case DELETE:
+                runtimeAndConstraints =
+                        mp.getIndexDeleteRuntime(dataSourceIndex, propagatedSchema, inputSchemas, typeEnv, primaryKeys,
+                                secondaryKeys, additionalFilteringKeys, filterExpr, inputDesc, context, spec);
+                break;
+            case UPSERT:
+                runtimeAndConstraints = mp.getIndexUpsertRuntime(dataSourceIndex, propagatedSchema, inputSchemas,
+                        typeEnv, primaryKeys, secondaryKeys, additionalFilteringKeys, filterExpr, prevSecondaryKeys,
+                        prevAdditionalFilteringKey, inputDesc, context, spec);
+                break;
+            default:
+                throw new AlgebricksException("Unsupported Operation " + operation);
         }
-        builder.contributeHyracksOperator(insertDeleteUpsertOp, runtimeAndConstraints.first);
-        builder.contributeAlgebricksPartitionConstraint(runtimeAndConstraints.first, runtimeAndConstraints.second);
+        IOperatorDescriptor opDesc = runtimeAndConstraints.first;
+        opDesc.setSourceLocation(insertDeleteUpsertOp.getSourceLocation());
+        builder.contributeHyracksOperator(insertDeleteUpsertOp, opDesc);
+        builder.contributeAlgebricksPartitionConstraint(opDesc, runtimeAndConstraints.second);
         ILogicalOperator src = insertDeleteUpsertOp.getInputs().get(0).getValue();
         builder.contributeGraphEdge(src, 0, insertDeleteUpsertOp, 0);
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
index 6ded4a3..927fb66 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
@@ -106,22 +106,28 @@
         RecordDescriptor inputDesc = JobGenHelper.mkRecordDescriptor(
                 context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas[0], context);
 
-        Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints = null;
-        if (operation == Kind.INSERT) {
-            runtimeAndConstraints = mp.getInsertRuntime(dataSource, propagatedSchema, typeEnv, keys, payload,
-                    additionalFilteringKeys, additionalNonFilteringFields, inputDesc, context, spec, false);
-        } else if (operation == Kind.DELETE) {
-            runtimeAndConstraints = mp.getDeleteRuntime(dataSource, propagatedSchema, typeEnv, keys, payload,
-                    additionalFilteringKeys, inputDesc, context, spec);
-        } else if (operation == Kind.UPSERT) {
-            runtimeAndConstraints = mp.getUpsertRuntime(dataSource, inputSchemas[0], typeEnv, keys, payload,
-                    additionalFilteringKeys, additionalNonFilteringFields, inputDesc, context, spec);
-        } else {
-            throw new AlgebricksException("Unsupported Operation " + operation);
+        Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints;
+        switch (operation) {
+            case INSERT:
+                runtimeAndConstraints = mp.getInsertRuntime(dataSource, propagatedSchema, typeEnv, keys, payload,
+                        additionalFilteringKeys, additionalNonFilteringFields, inputDesc, context, spec, false);
+                break;
+            case DELETE:
+                runtimeAndConstraints = mp.getDeleteRuntime(dataSource, propagatedSchema, typeEnv, keys, payload,
+                        additionalFilteringKeys, inputDesc, context, spec);
+                break;
+            case UPSERT:
+                runtimeAndConstraints = mp.getUpsertRuntime(dataSource, inputSchemas[0], typeEnv, keys, payload,
+                        additionalFilteringKeys, additionalNonFilteringFields, inputDesc, context, spec);
+                break;
+            default:
+                throw new AlgebricksException("Unsupported Operation " + operation);
         }
 
-        builder.contributeHyracksOperator(insertDeleteOp, runtimeAndConstraints.first);
-        builder.contributeAlgebricksPartitionConstraint(runtimeAndConstraints.first, runtimeAndConstraints.second);
+        IOperatorDescriptor opDesc = runtimeAndConstraints.first;
+        opDesc.setSourceLocation(insertDeleteOp.getSourceLocation());
+        builder.contributeHyracksOperator(insertDeleteOp, opDesc);
+        builder.contributeAlgebricksPartitionConstraint(opDesc, runtimeAndConstraints.second);
         ILogicalOperator src = insertDeleteOp.getInputs().get(0).getValue();
         builder.contributeGraphEdge(src, 0, insertDeleteOp, 0);
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java
index 1d36cc0..544c546 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java
@@ -151,7 +151,7 @@
         } catch (HyracksException e) {
             throw new AlgebricksException(e);
         }
-
+        opDescriptor.setSourceLocation(op.getSourceLocation());
         contributeOpDesc(builder, (AbstractLogicalOperator) op, opDescriptor);
         for (int i = 0; i < op.getInputs().size(); i++) {
             builder.contributeGraphEdge(op.getInputs().get(i).getValue(), 0, op, i);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MaterializePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MaterializePOperator.java
index a48e3c2..fe8985a 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MaterializePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MaterializePOperator.java
@@ -74,6 +74,7 @@
                 JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
         MaterializingOperatorDescriptor materializationOpDesc =
                 new MaterializingOperatorDescriptor(builder.getJobSpec(), recDescriptor, isSingleActivity);
+        materializationOpDesc.setSourceLocation(op.getSourceLocation());
         contributeOpDesc(builder, (AbstractLogicalOperator) op, materializationOpDesc);
         ILogicalOperator src = op.getInputs().get(0).getValue();
         builder.contributeGraphEdge(src, 0, op, 0);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroPreSortedDistinctByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroPreSortedDistinctByPOperator.java
index 94d5fd4..fda879c 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroPreSortedDistinctByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroPreSortedDistinctByPOperator.java
@@ -34,7 +34,7 @@
 import org.apache.hyracks.algebricks.runtime.operators.group.MicroPreClusteredGroupRuntimeFactory;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import org.apache.hyracks.dataflow.std.group.AbstractAggregatorDescriptorFactory;
 
 public class MicroPreSortedDistinctByPOperator extends AbstractPreSortedDistinctByPOperator {
 
@@ -62,8 +62,9 @@
         IBinaryComparatorFactory[] comparatorFactories = JobGenHelper
                 .variablesToAscBinaryComparatorFactories(columnList, context.getTypeEnvironment(op), context);
         IAggregateEvaluatorFactory[] aggFactories = new IAggregateEvaluatorFactory[] {};
-        IAggregatorDescriptorFactory aggregatorFactory =
+        AbstractAggregatorDescriptorFactory aggregatorFactory =
                 new SimpleAlgebricksAccumulatingAggregatorFactory(aggFactories, keysAndDecs);
+        aggregatorFactory.setSourceLocation(op.getSourceLocation());
 
         RecordDescriptor recordDescriptor =
                 JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
@@ -73,6 +74,7 @@
         /* make fd columns part of the key but the comparator only compares the distinct key columns */
         MicroPreClusteredGroupRuntimeFactory runtime = new MicroPreClusteredGroupRuntimeFactory(keysAndDecs,
                 comparatorFactories, aggregatorFactory, inputRecordDesc, recordDescriptor, null);
+        runtime.setSourceLocation(op.getSourceLocation());
         builder.contributeMicroOperator(op, runtime, recordDescriptor);
         ILogicalOperator src = op.getInputs().get(0).getValue();
         builder.contributeGraphEdge(src, 0, op, 0);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroPreclusteredGroupByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroPreclusteredGroupByPOperator.java
index 629afa3..13308a1 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroPreclusteredGroupByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroPreclusteredGroupByPOperator.java
@@ -74,6 +74,7 @@
                 context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas[0], context);
         MicroPreClusteredGroupRuntimeFactory runtime = new MicroPreClusteredGroupRuntimeFactory(keys,
                 comparatorFactories, aggregatorFactory, inputRecordDesc, recordDescriptor, null);
+        runtime.setSourceLocation(gby.getSourceLocation());
         builder.contributeMicroOperator(gby, runtime, recordDescriptor);
         ILogicalOperator src = op.getInputs().get(0).getValue();
         builder.contributeGraphEdge(src, 0, op, 0);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroUnionAllPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroUnionAllPOperator.java
index f5e992e..da48049 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroUnionAllPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroUnionAllPOperator.java
@@ -49,6 +49,7 @@
                 JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
 
         MicroUnionAllRuntimeFactory runtime = new MicroUnionAllRuntimeFactory(op.getInputs().size());
+        runtime.setSourceLocation(op.getSourceLocation());
         builder.contributeMicroOperator(op, runtime, recordDescriptor);
 
         super.contributeRuntimeOperator(builder, context, op, opSchema, inputSchemas, outerPlanSchema);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedLoopJoinPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedLoopJoinPOperator.java
index 4d7bd7e1..83cea4a 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedLoopJoinPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedLoopJoinPOperator.java
@@ -139,15 +139,14 @@
         ITuplePairComparatorFactory comparatorFactory =
                 new TuplePairEvaluatorFactory(cond, context.getBinaryBooleanInspectorFactory());
         IOperatorDescriptorRegistry spec = builder.getJobSpec();
-        IOperatorDescriptor opDesc = null;
+        IOperatorDescriptor opDesc;
 
         switch (kind) {
-            case INNER: {
+            case INNER:
                 opDesc = new NestedLoopJoinOperatorDescriptor(spec, comparatorFactory, recDescriptor, memSize, false,
                         null);
                 break;
-            }
-            case LEFT_OUTER: {
+            case LEFT_OUTER:
                 IMissingWriterFactory[] nonMatchWriterFactories = new IMissingWriterFactory[inputSchemas[1].getSize()];
                 for (int j = 0; j < nonMatchWriterFactories.length; j++) {
                     nonMatchWriterFactories[j] = context.getMissingWriterFactory();
@@ -155,12 +154,12 @@
                 opDesc = new NestedLoopJoinOperatorDescriptor(spec, comparatorFactory, recDescriptor, memSize, true,
                         nonMatchWriterFactories);
                 break;
-            }
-            default: {
+            default:
                 throw new NotImplementedException();
-            }
         }
-        contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
+
+        opDesc.setSourceLocation(join.getSourceLocation());
+        contributeOpDesc(builder, join, opDesc);
 
         ILogicalOperator src1 = op.getInputs().get(0).getValue();
         builder.contributeGraphEdge(src1, 0, op, 0);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java
index 179bb73..7971e78 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java
@@ -98,6 +98,7 @@
             throws AlgebricksException {
         propagatedSchema.addAllVariables(outerPlanSchema);
         NestedTupleSourceRuntimeFactory runtime = new NestedTupleSourceRuntimeFactory();
+        runtime.setSourceLocation(op.getSourceLocation());
         RecordDescriptor recDesc =
                 JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
         builder.contributeMicroOperator(op, runtime, recDesc);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java
index 54e577f..dd4c65f 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java
@@ -34,7 +34,7 @@
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import org.apache.hyracks.dataflow.std.group.AbstractAggregatorDescriptorFactory;
 import org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
 
 public class PreSortedDistinctByPOperator extends AbstractPreSortedDistinctByPOperator {
@@ -64,14 +64,16 @@
         IBinaryComparatorFactory[] comparatorFactories = JobGenHelper
                 .variablesToAscBinaryComparatorFactories(columnList, context.getTypeEnvironment(op), context);
         IAggregateEvaluatorFactory[] aggFactories = new IAggregateEvaluatorFactory[] {};
-        IAggregatorDescriptorFactory aggregatorFactory =
+        AbstractAggregatorDescriptorFactory aggregatorFactory =
                 new SimpleAlgebricksAccumulatingAggregatorFactory(aggFactories, keysAndDecs);
+        aggregatorFactory.setSourceLocation(op.getSourceLocation());
 
         RecordDescriptor recordDescriptor =
                 JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
         /* make fd columns part of the key but the comparator only compares the distinct key columns */
         PreclusteredGroupOperatorDescriptor opDesc = new PreclusteredGroupOperatorDescriptor(spec, keysAndDecs,
                 comparatorFactories, aggregatorFactory, recordDescriptor);
+        opDesc.setSourceLocation(op.getSourceLocation());
 
         contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
 
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
index 0e0953c..e5076ce 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
@@ -27,7 +27,6 @@
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
@@ -38,7 +37,7 @@
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import org.apache.hyracks.dataflow.std.group.AbstractAggregatorDescriptorFactory;
 import org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
 
 public class PreclusteredGroupByPOperator extends AbstractPreclusteredGroupByPOperator {
@@ -71,7 +70,7 @@
         int fdColumns[] = getFdColumns(gby, inputSchemas[0]);
         // compile subplans and set the gby op. schema accordingly
         AlgebricksPipeline[] subplans = compileSubplans(inputSchemas[0], gby, opSchema, context);
-        IAggregatorDescriptorFactory aggregatorFactory;
+        AbstractAggregatorDescriptorFactory aggregatorFactory;
 
         List<ILogicalPlan> nestedPlans = gby.getNestedPlans();
         if (!nestedPlans.isEmpty() && nestedPlans.get(0).getRoots().get(0).getValue()
@@ -80,6 +79,7 @@
         } else {
             aggregatorFactory = new NestedPlansAccumulatingAggregatorFactory(subplans, keys, fdColumns);
         }
+        aggregatorFactory.setSourceLocation(gby.getSourceLocation());
 
         IOperatorDescriptorRegistry spec = builder.getJobSpec();
         IBinaryComparatorFactory[] comparatorFactories = JobGenHelper
@@ -89,8 +89,9 @@
 
         PreclusteredGroupOperatorDescriptor opDesc = new PreclusteredGroupOperatorDescriptor(spec, keys,
                 comparatorFactories, aggregatorFactory, recordDescriptor, groupAll, framesLimit);
+        opDesc.setSourceLocation(gby.getSourceLocation());
 
-        contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
+        contributeOpDesc(builder, gby, opDesc);
 
         ILogicalOperator src = op.getInputs().get(0).getValue();
         builder.contributeGraphEdge(src, 0, op, 0);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java
index 25d31d2..f34cecc 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java
@@ -22,7 +22,6 @@
 import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
@@ -50,9 +49,10 @@
         int outputArity = rop.getOutputArity();
         boolean[] outputMaterializationFlags = rop.getOutputMaterializationFlags();
 
-        ReplicateOperatorDescriptor splitOpDesc =
+        ReplicateOperatorDescriptor ropDesc =
                 new ReplicateOperatorDescriptor(spec, recDescriptor, outputArity, outputMaterializationFlags);
-        contributeOpDesc(builder, (AbstractLogicalOperator) op, splitOpDesc);
+        ropDesc.setSourceLocation(rop.getSourceLocation());
+        contributeOpDesc(builder, rop, ropDesc);
         ILogicalOperator src = op.getInputs().get(0).getValue();
         builder.contributeGraphEdge(src, 0, op, 0);
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
index 3a6ba74..d68be20 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
@@ -91,6 +91,7 @@
 
         RunningAggregateRuntimeFactory runtime =
                 new RunningAggregateRuntimeFactory(outColumns, runningAggFuns, projectionList);
+        runtime.setSourceLocation(ragg.getSourceLocation());
 
         // contribute one Asterix framewriter
         RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java
index 5084c18..6c9f6e2 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java
@@ -79,6 +79,7 @@
         IOperatorDescriptorRegistry spec = builder.getJobSpec();
 
         SinkOperatorDescriptor opDesc = new SinkOperatorDescriptor(spec, op.getInputs().size());
+        opDesc.setSourceLocation(op.getSourceLocation());
         contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
 
         for (int i = 0; i < op.getInputs().size(); i++) {
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
index f76b69b..6e37b3a 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
@@ -101,10 +101,12 @@
 
         IMetadataProvider<?, ?> mp = context.getMetadataProvider();
 
-        Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> runtime =
+        Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> runtimeAndConstraints =
                 mp.getWriteFileRuntime(write.getDataSink(), columns, pf, inputDesc);
+        IPushRuntimeFactory runtime = runtimeAndConstraints.first;
+        runtime.setSourceLocation(write.getSourceLocation());
 
-        builder.contributeMicroOperator(write, runtime.first, recDesc, runtime.second);
+        builder.contributeMicroOperator(write, runtime, recDesc, runtimeAndConstraints.second);
         ILogicalOperator src = write.getInputs().get(0).getValue();
         builder.contributeGraphEdge(src, 0, write, 0);
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java
index 1aeeca9..967e7e6 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java
@@ -63,7 +63,7 @@
 import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import org.apache.hyracks.dataflow.std.group.AbstractAggregatorDescriptorFactory;
 import org.apache.hyracks.dataflow.std.group.sort.SortGroupByOperatorDescriptor;
 
 public class SortGroupByPOperator extends AbstractPhysicalOperator {
@@ -258,10 +258,12 @@
         RecordDescriptor partialAggRecordDescriptor =
                 JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), localInputSchemas[0], context);
 
-        IAggregatorDescriptorFactory aggregatorFactory =
+        AbstractAggregatorDescriptorFactory aggregatorFactory =
                 new SimpleAlgebricksAccumulatingAggregatorFactory(aff, keyAndDecFields);
-        IAggregatorDescriptorFactory mergeFactory =
+        aggregatorFactory.setSourceLocation(gby.getSourceLocation());
+        AbstractAggregatorDescriptorFactory mergeFactory =
                 new SimpleAlgebricksAccumulatingAggregatorFactory(merges, keyAndDecFields);
+        mergeFactory.setSourceLocation(gby.getSourceLocation());
 
         INormalizedKeyComputerFactory normalizedKeyFactory = null;
         INormalizedKeyComputerFactoryProvider nkcfProvider = context.getNormalizedKeyComputerFactoryProvider();
@@ -275,6 +277,7 @@
         SortGroupByOperatorDescriptor gbyOpDesc = new SortGroupByOperatorDescriptor(spec, frameLimit, keys,
                 keyAndDecFields, normalizedKeyFactory, compFactories, aggregatorFactory, mergeFactory,
                 partialAggRecordDescriptor, recordDescriptor, false);
+        gbyOpDesc.setSourceLocation(gby.getSourceLocation());
 
         contributeOpDesc(builder, gby, gbyOpDesc);
         ILogicalOperator src = op.getInputs().get(0).getValue();
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SplitPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SplitPOperator.java
index c9fde4b..2f02eba 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SplitPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SplitPOperator.java
@@ -60,10 +60,11 @@
 
         IBinaryIntegerInspectorFactory intInsepctorFactory = context.getBinaryIntegerInspectorFactory();
 
-        SplitOperatorDescriptor splitOpDesc = new SplitOperatorDescriptor(spec, recDescriptor, outputArity,
+        SplitOperatorDescriptor sopDesc = new SplitOperatorDescriptor(spec, recDescriptor, outputArity,
                 brachingExprEvalFactory, intInsepctorFactory, defaultBranch, propageToAllBranchAsDefault);
+        sopDesc.setSourceLocation(sop.getSourceLocation());
 
-        contributeOpDesc(builder, (AbstractLogicalOperator) op, splitOpDesc);
+        contributeOpDesc(builder, sop, sopDesc);
         ILogicalOperator src = op.getInputs().get(0).getValue();
         builder.contributeGraphEdge(src, 0, op, 0);
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java
index 3a4249b..269a809 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StableSortPOperator.java
@@ -37,6 +37,7 @@
 import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.dataflow.std.sort.AbstractSorterOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.sort.TopKSorterOperatorDescriptor;
 
@@ -98,22 +99,21 @@
             i++;
         }
 
+        AbstractSorterOperatorDescriptor sortOpDesc;
         // topK == -1 means that a topK value is not provided.
         if (topK == -1) {
-            ExternalSortOperatorDescriptor sortOpDesc =
+            sortOpDesc =
                     new ExternalSortOperatorDescriptor(spec, maxNumberOfFrames, sortFields, nkcf, comps, recDescriptor);
-            contributeOpDesc(builder, (AbstractLogicalOperator) op, sortOpDesc);
-            ILogicalOperator src = op.getInputs().get(0).getValue();
-            builder.contributeGraphEdge(src, 0, op, 0);
         } else {
             // Since topK value is provided, topK optimization is possible.
             // We call topKSorter instead of calling ExternalSortOperator.
-            TopKSorterOperatorDescriptor sortOpDesc = new TopKSorterOperatorDescriptor(spec, maxNumberOfFrames, topK,
-                    sortFields, nkcf, comps, recDescriptor);
-            contributeOpDesc(builder, (AbstractLogicalOperator) op, sortOpDesc);
-            ILogicalOperator src = op.getInputs().get(0).getValue();
-            builder.contributeGraphEdge(src, 0, op, 0);
+            sortOpDesc = new TopKSorterOperatorDescriptor(spec, maxNumberOfFrames, topK, sortFields, nkcf, comps,
+                    recDescriptor);
         }
+        sortOpDesc.setSourceLocation(op.getSourceLocation());
+        contributeOpDesc(builder, (AbstractLogicalOperator) op, sortOpDesc);
+        ILogicalOperator src = op.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src, 0, op, 0);
     }
 
     @Override
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java
index da75da8..90732ce 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java
@@ -98,6 +98,7 @@
                 JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
         StreamLimitRuntimeFactory runtime = new StreamLimitRuntimeFactory(maxObjectsFact, offsetFact, null,
                 context.getBinaryIntegerInspectorFactory());
+        runtime.setSourceLocation(limit.getSourceLocation());
         builder.contributeMicroOperator(limit, runtime, recDesc);
         // and contribute one edge from its child
         ILogicalOperator src = limit.getInputs().get(0).getValue();
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java
index 3ff7dc1..0789cee 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java
@@ -68,6 +68,7 @@
             projectionList[i++] = pos;
         }
         StreamProjectRuntimeFactory runtime = new StreamProjectRuntimeFactory(projectionList, flushFramesRapidly);
+        runtime.setSourceLocation(project.getSourceLocation());
         RecordDescriptor recDesc =
                 JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
         builder.contributeMicroOperator(project, runtime, recDesc);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamSelectPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamSelectPOperator.java
index ddde5f3..a519275 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamSelectPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamSelectPOperator.java
@@ -70,6 +70,7 @@
                 new StreamSelectRuntimeFactory(cond, null, context.getBinaryBooleanInspectorFactory(),
                         select.getRetainMissing(), inputSchemas[0].findVariable(select.getMissingPlaceholderVariable()),
                         context.getMissingWriterFactory());
+        runtime.setSourceLocation(select.getSourceLocation());
         // contribute one Asterix framewriter
         RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
         builder.contributeMicroOperator(select, runtime, recDesc);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StringStreamingScriptPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StringStreamingScriptPOperator.java
index 01e9a0c..0e8005d 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StringStreamingScriptPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StringStreamingScriptPOperator.java
@@ -65,6 +65,7 @@
         StringStreamingScriptDescription sssd = (StringStreamingScriptDescription) scriptDesc;
         StringStreamingRuntimeFactory runtime = new StringStreamingRuntimeFactory(sssd.getCommand(),
                 sssd.getPrinterFactories(), sssd.getFieldDelimiter(), sssd.getParserFactory());
+        runtime.setSourceLocation(scriptOp.getSourceLocation());
         RecordDescriptor recDesc =
                 JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
         builder.contributeMicroOperator(scriptOp, runtime, recDesc);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java
index 95efbac..5ec6d0a 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java
@@ -101,6 +101,7 @@
         RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
         SubplanRuntimeFactory runtime =
                 new SubplanRuntimeFactory(np, missingWriterFactories, inputRecordDesc, recDesc, null);
+        runtime.setSourceLocation(subplan.getSourceLocation());
         builder.contributeMicroOperator(subplan, runtime, recDesc);
 
         ILogicalOperator src = op.getInputs().get(0).getValue();
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java
index cd696bc..57f8f42 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java
@@ -94,8 +94,10 @@
         Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints =
                 mp.getTokenizerRuntime(dataSourceIndex, propagatedSchema, inputSchemas, typeEnv, primaryKeys,
                         secondaryKeys, null, inputDesc, context, spec, true);
-        builder.contributeHyracksOperator(tokenizeOp, runtimeAndConstraints.first);
-        builder.contributeAlgebricksPartitionConstraint(runtimeAndConstraints.first, runtimeAndConstraints.second);
+        IOperatorDescriptor opDesc = runtimeAndConstraints.first;
+        opDesc.setSourceLocation(tokenizeOp.getSourceLocation());
+        builder.contributeHyracksOperator(tokenizeOp, opDesc);
+        builder.contributeAlgebricksPartitionConstraint(opDesc, runtimeAndConstraints.second);
         ILogicalOperator src = tokenizeOp.getInputs().get(0).getValue();
         builder.contributeGraphEdge(src, 0, tokenizeOp, 0);
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
index 4ccce92..fcd9fa7 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
@@ -50,6 +50,7 @@
 
         UnionAllOperatorDescriptor opDesc =
                 new UnionAllOperatorDescriptor(builder.getJobSpec(), op.getInputs().size(), recordDescriptor);
+        opDesc.setSourceLocation(op.getSourceLocation());
         contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
 
         super.contributeRuntimeOperator(builder, context, op, opSchema, inputSchemas, outerPlanSchema);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WriteResultPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WriteResultPOperator.java
index 7ec1914..70e596a 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WriteResultPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WriteResultPOperator.java
@@ -101,9 +101,10 @@
         JobSpecification spec = builder.getJobSpec();
         Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints = mp.getWriteResultRuntime(
                 dataSource, propagatedSchema, keys, payload, additionalFilteringKeys, context, spec);
-
-        builder.contributeHyracksOperator(writeResultOp, runtimeAndConstraints.first);
-        builder.contributeAlgebricksPartitionConstraint(runtimeAndConstraints.first, runtimeAndConstraints.second);
+        IOperatorDescriptor opDesc = runtimeAndConstraints.first;
+        opDesc.setSourceLocation(writeResultOp.getSourceLocation());
+        builder.contributeHyracksOperator(writeResultOp, opDesc);
+        builder.contributeAlgebricksPartitionConstraint(opDesc, runtimeAndConstraints.second);
         ILogicalOperator src = writeResultOp.getInputs().get(0).getValue();
         builder.contributeGraphEdge(src, 0, writeResultOp, 0);
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
index c574cd8..67199b9 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
@@ -262,6 +262,7 @@
     public static ILogicalOperator deepCopy(ILogicalOperator op) throws AlgebricksException {
         OperatorDeepCopyVisitor visitor = new OperatorDeepCopyVisitor();
         AbstractLogicalOperator copiedOperator = (AbstractLogicalOperator) op.accept(visitor, null);
+        copiedOperator.setSourceLocation(op.getSourceLocation());
         copiedOperator.setExecutionMode(op.getExecutionMode());
         copiedOperator.getAnnotations().putAll(op.getAnnotations());
         copiedOperator.setSchema(op.getSchema());
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/pom.xml b/hyracks-fullstack/algebricks/algebricks-rewriter/pom.xml
index d9dccce..dd1cbe6 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/pom.xml
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/pom.xml
@@ -49,6 +49,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-api</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
       <artifactId>hyracks-dataflow-common</artifactId>
       <version>${project.version}</version>
     </dependency>
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractDecorrelationRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractDecorrelationRule.java
index 7802b29..7fc2efe 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractDecorrelationRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractDecorrelationRule.java
@@ -41,6 +41,7 @@
 import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 import org.apache.hyracks.algebricks.rewriter.util.PhysicalOptimizationsUtil;
+import org.apache.hyracks.api.exceptions.SourceLocation;
 
 public abstract class AbstractDecorrelationRule implements IAlgebraicRewriteRule {
 
@@ -91,9 +92,11 @@
 
     protected void buildVarExprList(Collection<LogicalVariable> vars, IOptimizationContext context, GroupByOperator g,
             List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> outVeList) throws AlgebricksException {
+        SourceLocation sourceLoc = g.getSourceLocation();
         for (LogicalVariable ov : vars) {
             LogicalVariable newVar = context.newVar();
-            ILogicalExpression varExpr = new VariableReferenceExpression(newVar);
+            VariableReferenceExpression varExpr = new VariableReferenceExpression(newVar);
+            varExpr.setSourceLocation(sourceLoc);
             outVeList.add(new Pair<LogicalVariable, Mutable<ILogicalExpression>>(ov,
                     new MutableObject<ILogicalExpression>(varExpr)));
             for (ILogicalPlan p : g.getNestedPlans()) {
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractExtractExprRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractExtractExprRule.java
index bce72c1..036b3e1 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractExtractExprRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractExtractExprRule.java
@@ -36,6 +36,7 @@
             IOptimizationContext context) throws AlgebricksException {
         LogicalVariable v = context.newVar();
         AssignOperator a = new AssignOperator(v, new MutableObject<ILogicalExpression>(gExpr));
+        a.setSourceLocation(gExpr.getSourceLocation());
         a.getInputs().add(new MutableObject<ILogicalOperator>(opRef2.getValue()));
         opRef2.setValue(a);
         if (gExpr.getExpressionTag() == LogicalExpressionTag.CONSTANT) {
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceCombinerRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceCombinerRule.java
index 3f61cc0..ade2402 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceCombinerRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceCombinerRule.java
@@ -43,6 +43,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+import org.apache.hyracks.api.exceptions.SourceLocation;
 
 public abstract class AbstractIntroduceCombinerRule implements IAlgebraicRewriteRule {
 
@@ -67,7 +68,7 @@
 
     protected Pair<Boolean, Mutable<ILogicalOperator>> tryToPushAgg(AggregateOperator initAgg, GroupByOperator newGbyOp,
             Set<SimilarAggregatesInfo> toReplaceSet, IOptimizationContext context) throws AlgebricksException {
-
+        SourceLocation sourceLoc = initAgg.getSourceLocation();
         List<LogicalVariable> initVars = initAgg.getVariables();
         List<Mutable<ILogicalExpression>> initExprs = initAgg.getExpressions();
         int numExprs = initVars.size();
@@ -87,6 +88,7 @@
         for (int i = 0; i < numExprs; i++) {
             Mutable<ILogicalExpression> expRef = initExprs.get(i);
             AggregateFunctionCallExpression aggFun = (AggregateFunctionCallExpression) expRef.getValue();
+            SourceLocation aggFunSourceLoc = aggFun.getSourceLocation();
             IFunctionInfo fi1 = aggFun.getStepOneAggregate();
             // Clone the aggregate's args.
             List<Mutable<ILogicalExpression>> newArgs = new ArrayList<>(aggFun.getArguments().size());
@@ -98,10 +100,13 @@
             SimilarAggregatesInfo inf = new SimilarAggregatesInfo();
             LogicalVariable newAggVar = context.newVar();
             pushedVars.add(newAggVar);
-            inf.stepOneResult = new VariableReferenceExpression(newAggVar);
+            VariableReferenceExpression newAggVarRef = new VariableReferenceExpression(newAggVar);
+            newAggVarRef.setSourceLocation(aggFunSourceLoc);
+            inf.stepOneResult = newAggVarRef;
             inf.simAggs = new ArrayList<>();
             toReplaceSet.add(inf);
             AggregateFunctionCallExpression aggLocal = new AggregateFunctionCallExpression(fi1, false, newArgs);
+            aggLocal.setSourceLocation(aggFunSourceLoc);
             pushedExprs.add(new MutableObject<>(aggLocal));
             AggregateExprInfo aei = new AggregateExprInfo();
             aei.aggExprRef = expRef;
@@ -112,6 +117,7 @@
 
         if (!pushedVars.isEmpty()) {
             AggregateOperator pushedAgg = new AggregateOperator(pushedVars, pushedExprs);
+            pushedAgg.setSourceLocation(sourceLoc);
             pushedAgg.setExecutionMode(ExecutionMode.LOCAL);
             // If newGbyOp is null, then we optimizing an aggregate without group by.
             if (newGbyOp != null) {
@@ -133,6 +139,7 @@
 
                 // Hook up the nested aggregate op with the outer group by.
                 NestedTupleSourceOperator nts = new NestedTupleSourceOperator(new MutableObject<>(newGbyOp));
+                nts.setSourceLocation(sourceLoc);
                 nts.setExecutionMode(ExecutionMode.LOCAL);
                 bottomRef.setValue(nts);
                 pushedAgg.getInputs().add(inputRef);
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java
index a921301..dc48d96 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java
@@ -50,6 +50,7 @@
 import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
 import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
 import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
+import org.apache.hyracks.api.exceptions.SourceLocation;
 
 public abstract class AbstractIntroduceGroupByCombinerRule extends AbstractIntroduceCombinerRule {
 
@@ -105,7 +106,9 @@
         for (LogicalVariable var : freeVars) {
             if (!propagatedVars.contains(var)) {
                 LogicalVariable newDecorVar = context.newVar();
-                newGbyOp.addDecorExpression(newDecorVar, new VariableReferenceExpression(var));
+                VariableReferenceExpression varRef = new VariableReferenceExpression(var);
+                varRef.setSourceLocation(gbyOp.getSourceLocation());
+                newGbyOp.addDecorExpression(newDecorVar, varRef);
                 VariableUtilities.substituteVariables(gbyOp.getNestedPlans().get(0).getRoots().get(0).getValue(), var,
                         newDecorVar, context);
             }
@@ -128,10 +131,12 @@
 
     private GroupByOperator opToPush(GroupByOperator gbyOp, BookkeepingInfo bi, IOptimizationContext context)
             throws AlgebricksException {
+        SourceLocation sourceLoc = gbyOp.getSourceLocation();
         // Hook up input to new group-by.
         Mutable<ILogicalOperator> opRef3 = gbyOp.getInputs().get(0);
         ILogicalOperator op3 = opRef3.getValue();
         GroupByOperator newGbyOp = new GroupByOperator();
+        newGbyOp.setSourceLocation(sourceLoc);
         newGbyOp.getInputs().add(new MutableObject<ILogicalOperator>(op3));
         // Copy annotations.
         Map<String, Object> annotations = newGbyOp.getAnnotations();
@@ -199,7 +204,9 @@
         // set the vars in the new op
         int n = newOpGbyList.size();
         for (int i = 0; i < n; i++) {
-            newGbyOp.addGbyExpression(replGbyList.get(i), new VariableReferenceExpression(newOpGbyList.get(i)));
+            VariableReferenceExpression varRef = new VariableReferenceExpression(newOpGbyList.get(i));
+            varRef.setSourceLocation(sourceLoc);
+            newGbyOp.addGbyExpression(replGbyList.get(i), varRef);
             VariableUtilities.substituteVariables(gbyOp, newOpGbyList.get(i), replGbyList.get(i), false, context);
         }
 
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/BreakSelectIntoConjunctsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/BreakSelectIntoConjunctsRule.java
index d975cce..ab665b3 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/BreakSelectIntoConjunctsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/BreakSelectIntoConjunctsRule.java
@@ -32,6 +32,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+import org.apache.hyracks.api.exceptions.SourceLocation;
 
 public class BreakSelectIntoConjunctsRule implements IAlgebraicRewriteRule {
 
@@ -58,6 +59,8 @@
             return false;
         }
 
+        SourceLocation sourceLoc = select.getSourceLocation();
+
         Mutable<ILogicalOperator> childOfSelect = select.getInputs().get(0);
         boolean fst = true;
         ILogicalOperator botOp = select;
@@ -70,6 +73,7 @@
             } else {
                 SelectOperator newSelect = new SelectOperator(new MutableObject<ILogicalExpression>(e),
                         select.getRetainMissing(), select.getMissingPlaceholderVariable());
+                newSelect.setSourceLocation(sourceLoc);
                 List<Mutable<ILogicalOperator>> botInpList = botOp.getInputs();
                 botInpList.clear();
                 botInpList.add(new MutableObject<ILogicalOperator>(newSelect));
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ComplexJoinInferenceRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ComplexJoinInferenceRule.java
index 0b94fdd..28139d4 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ComplexJoinInferenceRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ComplexJoinInferenceRule.java
@@ -83,6 +83,7 @@
         ntsToEtsInSubplan(subplan, context);
         cleanupJoins(subplan);
         InnerJoinOperator join = new InnerJoinOperator(new MutableObject<ILogicalExpression>(ConstantExpression.TRUE));
+        join.setSourceLocation(op.getSourceLocation());
         join.getInputs().add(opRef3);
         opRef2.setValue(OperatorManipulationUtil.eliminateSingleSubplanOverEts(subplan));
         join.getInputs().add(new MutableObject<ILogicalOperator>(op));
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ComplexUnnestToProductRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ComplexUnnestToProductRule.java
index fa35a98..54fe09d 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ComplexUnnestToProductRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ComplexUnnestToProductRule.java
@@ -117,6 +117,7 @@
 
         InnerJoinOperator product =
                 new InnerJoinOperator(new MutableObject<ILogicalExpression>(ConstantExpression.TRUE));
+        product.setSourceLocation(op.getSourceLocation());
         // Outer branch.
         product.getInputs().add(new MutableObject<ILogicalOperator>(outerRoot));
         // Inner branch.
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ConsolidateSelectsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ConsolidateSelectsRule.java
index ae52c35..d9849b1 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ConsolidateSelectsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ConsolidateSelectsRule.java
@@ -79,6 +79,7 @@
             // Initialize the new conjuncts, if necessary.
             if (conj == null) {
                 conj = new ScalarFunctionCallExpression(andFn);
+                conj.setSourceLocation(firstSelect.getSourceLocation());
                 // Add the first select's condition.
                 conj.getArguments().add(new MutableObject<ILogicalExpression>(firstSelect.getCondition().getValue()));
             }
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/CopyLimitDownRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/CopyLimitDownRule.java
index 53548e4..1eaf9c7 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/CopyLimitDownRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/CopyLimitDownRule.java
@@ -38,6 +38,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.StreamLimitPOperator;
 import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+import org.apache.hyracks.api.exceptions.SourceLocation;
 
 public class CopyLimitDownRule implements IAlgebraicRewriteRule {
 
@@ -84,9 +85,11 @@
             ILogicalOperator safeOp = safeOpRef.getValue();
             Mutable<ILogicalOperator> unsafeOpRef = safeOp.getInputs().get(0);
             ILogicalOperator unsafeOp = unsafeOpRef.getValue();
+            SourceLocation sourceLoc = limitOp.getSourceLocation();
             LimitOperator limitCloneOp = null;
             if (limitOp.getOffset().getValue() == null) {
                 limitCloneOp = new LimitOperator(limitOp.getMaxObjects().getValue(), false);
+                limitCloneOp.setSourceLocation(sourceLoc);
             } else {
                 // Need to add an offset to the given limit value
                 // since the original topmost limit will use the offset value.
@@ -98,7 +101,9 @@
                         new MutableObject<ILogicalExpression>(limitOp.getMaxObjects().getValue().cloneExpression()));
                 addArgs.add(new MutableObject<ILogicalExpression>(limitOp.getOffset().getValue().cloneExpression()));
                 ScalarFunctionCallExpression maxPlusOffset = new ScalarFunctionCallExpression(finfoAdd, addArgs);
+                maxPlusOffset.setSourceLocation(sourceLoc);
                 limitCloneOp = new LimitOperator(maxPlusOffset, false);
+                limitCloneOp.setSourceLocation(sourceLoc);
             }
             limitCloneOp.setPhysicalOperator(new StreamLimitPOperator());
             limitCloneOp.getInputs().add(new MutableObject<ILogicalOperator>(unsafeOp));
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java
index baad59b..096520e 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java
@@ -141,6 +141,7 @@
             List<Pair<IOrder, Mutable<ILogicalExpression>>> orderExprs =
                     deepCopyOrderAndExpression(sourceOrderOp.getOrderExpressions());
             OrderOperator newOrderOp = new OrderOperator(orderExprs);
+            newOrderOp.setSourceLocation(sourceOrderOp.getSourceLocation());
             context.addToDontApplySet(this, newOrderOp);
             inputs.set(i, new MutableObject<ILogicalOperator>(newOrderOp));
             newOrderOp.getInputs().add(inputOpRef);
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index f06933a..066b6c1 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -89,6 +89,7 @@
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
 import org.apache.hyracks.algebricks.rewriter.util.PhysicalOptimizationsUtil;
+import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
 
 public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
@@ -526,16 +527,20 @@
     private Mutable<ILogicalOperator> enforceOrderProperties(List<LocalOrderProperty> oList,
             Mutable<ILogicalOperator> topOp, boolean isMicroOp, IOptimizationContext context)
             throws AlgebricksException {
+        SourceLocation sourceLoc = topOp.getValue().getSourceLocation();
         List<Pair<IOrder, Mutable<ILogicalExpression>>> oe = new LinkedList<>();
         for (LocalOrderProperty orderProperty : oList) {
             for (OrderColumn oc : orderProperty.getOrderColumns()) {
                 IOrder ordType = (oc.getOrder() == OrderKind.ASC) ? OrderOperator.ASC_ORDER : OrderOperator.DESC_ORDER;
-                Pair<IOrder, Mutable<ILogicalExpression>> pair = new Pair<>(ordType,
-                        new MutableObject<ILogicalExpression>(new VariableReferenceExpression(oc.getColumn())));
+                VariableReferenceExpression ocColumnRef = new VariableReferenceExpression(oc.getColumn());
+                ocColumnRef.setSourceLocation(sourceLoc);
+                Pair<IOrder, Mutable<ILogicalExpression>> pair =
+                        new Pair<>(ordType, new MutableObject<ILogicalExpression>(ocColumnRef));
                 oe.add(pair);
             }
         }
         OrderOperator oo = new OrderOperator(oe);
+        oo.setSourceLocation(sourceLoc);
         oo.setExecutionMode(AbstractLogicalOperator.ExecutionMode.LOCAL);
         if (isMicroOp) {
             oo.setPhysicalOperator(new InMemoryStableSortPOperator());
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java
index b95d6e4..87053c8 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java
@@ -48,6 +48,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
 import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+import org.apache.hyracks.api.exceptions.SourceLocation;
 
 /**
  * Factors out common sub-expressions by assigning them to a variables, and replacing the common sub-expressions with references to those variables.
@@ -255,7 +256,10 @@
                         // Also just replace the expr if we are replacing common exprs from within the same operator.
                         if (liveVars.contains(exprEqClass.getVariable()) || !liveVars.containsAll(usedVars)
                                 || op == exprEqClass.getFirstOperator()) {
-                            exprRef.setValue(new VariableReferenceExpression(exprEqClass.getVariable()));
+                            VariableReferenceExpression varRef =
+                                    new VariableReferenceExpression(exprEqClass.getVariable());
+                            varRef.setSourceLocation(expr.getSourceLocation());
+                            exprRef.setValue(varRef);
                             // Do not descend into children since this expr has been completely replaced.
                             return true;
                         }
@@ -267,7 +271,10 @@
                         VariableUtilities.getLiveVariables(op, liveVars);
                         //rewrite only when the variable is live
                         if (liveVars.contains(exprEqClass.getVariable())) {
-                            exprRef.setValue(new VariableReferenceExpression(exprEqClass.getVariable()));
+                            VariableReferenceExpression varRef =
+                                    new VariableReferenceExpression(exprEqClass.getVariable());
+                            varRef.setSourceLocation(expr.getSourceLocation());
+                            exprRef.setValue(varRef);
                             // Do not descend into children since this expr has been completely replaced.
                             return true;
                         }
@@ -295,6 +302,7 @@
 
         private boolean assignCommonExpression(ExprEquivalenceClass exprEqClass, ILogicalExpression expr)
                 throws AlgebricksException {
+            SourceLocation sourceLoc = expr.getSourceLocation();
             AbstractLogicalOperator firstOp = (AbstractLogicalOperator) exprEqClass.getFirstOperator();
             Mutable<ILogicalExpression> firstExprRef = exprEqClass.getFirstExpression();
             if (firstOp.getOperatorTag() == LogicalOperatorTag.INNERJOIN
@@ -313,6 +321,7 @@
                 // Place a Select operator beneath op that contains the enclosing expression.
                 SelectOperator selectOp =
                         new SelectOperator(new MutableObject<ILogicalExpression>(enclosingExpr), false, null);
+                selectOp.setSourceLocation(enclosingExpr.getSourceLocation());
                 selectOp.getInputs().add(new MutableObject<ILogicalOperator>(op.getInputs().get(0).getValue()));
                 op.getInputs().get(0).setValue(selectOp);
                 // Set firstOp to be the select below op, since we want to assign the common subexpr there.
@@ -324,12 +333,15 @@
             LogicalVariable newVar = context.newVar();
             AssignOperator newAssign = new AssignOperator(newVar,
                     new MutableObject<ILogicalExpression>(firstExprRef.getValue().cloneExpression()));
+            newAssign.setSourceLocation(sourceLoc);
             // Place assign below firstOp.
             newAssign.getInputs().add(new MutableObject<ILogicalOperator>(firstOp.getInputs().get(0).getValue()));
             newAssign.setExecutionMode(firstOp.getExecutionMode());
             firstOp.getInputs().get(0).setValue(newAssign);
             // Replace original expr with variable reference, and set var in expression equivalence class.
-            firstExprRef.setValue(new VariableReferenceExpression(newVar));
+            VariableReferenceExpression newVarRef = new VariableReferenceExpression(newVar);
+            newVarRef.setSourceLocation(sourceLoc);
+            firstExprRef.setValue(newVarRef);
             exprEqClass.setVariable(newVar);
             context.computeAndSetTypeEnvironmentForOperator(newAssign);
             context.computeAndSetTypeEnvironmentForOperator(firstOp);
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
index 4c2d910..3335d71 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
@@ -50,6 +50,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.StreamProjectPOperator;
 import org.apache.hyracks.algebricks.core.rewriter.base.HeuristicOptimizer;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+import org.apache.hyracks.api.exceptions.SourceLocation;
 
 public class ExtractCommonOperatorsRule implements IAlgebraicRewriteRule {
 
@@ -162,7 +163,9 @@
                 continue;
             }
             candidate = group.get(0);
+            SourceLocation candidateSourceLoc = candidate.getValue().getSourceLocation();
             ReplicateOperator rop = new ReplicateOperator(group.size(), materializationFlags);
+            rop.setSourceLocation(candidateSourceLoc);
             rop.setPhysicalOperator(new ReplicatePOperator());
             Mutable<ILogicalOperator> ropRef = new MutableObject<ILogicalOperator>(rop);
             AbstractLogicalOperator aopCandidate = (AbstractLogicalOperator) candidate.getValue();
@@ -204,7 +207,9 @@
             VariableUtilities.getLiveVariables(candidate.getValue(), liveVarsNew);
             ArrayList<Mutable<ILogicalExpression>> assignExprs = new ArrayList<Mutable<ILogicalExpression>>();
             for (LogicalVariable liveVar : liveVarsNew) {
-                assignExprs.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(liveVar)));
+                VariableReferenceExpression liveVarRef = new VariableReferenceExpression(liveVar);
+                liveVarRef.setSourceLocation(candidateSourceLoc);
+                assignExprs.add(new MutableObject<ILogicalExpression>(liveVarRef));
             }
             for (Mutable<ILogicalOperator> ref : group) {
                 if (ref.equals(candidate)) {
@@ -218,10 +223,14 @@
                     liveVars.add(variableMappingBack.get(liveVarsNew.get(i)));
                 }
 
+                SourceLocation refSourceLoc = ref.getValue().getSourceLocation();
+
                 AbstractLogicalOperator assignOperator = new AssignOperator(liveVars, assignExprs);
+                assignOperator.setSourceLocation(refSourceLoc);
                 assignOperator.setExecutionMode(rop.getExecutionMode());
                 assignOperator.setPhysicalOperator(new AssignPOperator());
                 AbstractLogicalOperator projectOperator = new ProjectOperator(liveVars);
+                projectOperator.setSourceLocation(refSourceLoc);
                 projectOperator.setPhysicalOperator(new StreamProjectPOperator());
                 projectOperator.setExecutionMode(rop.getExecutionMode());
                 AbstractLogicalOperator exchOp = new ExchangeOperator();
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractFunctionsFromJoinConditionRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractFunctionsFromJoinConditionRule.java
index 198510a..d937eb1 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractFunctionsFromJoinConditionRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractFunctionsFromJoinConditionRule.java
@@ -40,6 +40,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+import org.apache.hyracks.api.exceptions.SourceLocation;
 
 /**
  * Factors out function expressions from each comparison function or similarity function in join condition by
@@ -104,9 +105,11 @@
         } else if (AlgebricksBuiltinFunctions.isComparisonFunction(fi) || isComparisonFunction(fi)) {
             for (Mutable<ILogicalExpression> exprRef : fexp.getArguments()) {
                 if (exprRef.getValue().getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
+                    SourceLocation exprRefSourceLoc = exprRef.getValue().getSourceLocation();
                     LogicalVariable newVar = context.newVar();
                     AssignOperator newAssign = new AssignOperator(newVar,
                             new MutableObject<ILogicalExpression>(exprRef.getValue().cloneExpression()));
+                    newAssign.setSourceLocation(exprRefSourceLoc);
                     newAssign.setExecutionMode(joinOp.getExecutionMode());
 
                     // Place assign below joinOp.
@@ -137,7 +140,9 @@
 
                     if (modified) {
                         // Replace original expr with variable reference.
-                        exprRef.setValue(new VariableReferenceExpression(newVar));
+                        VariableReferenceExpression newVarRef = new VariableReferenceExpression(newVar);
+                        newVarRef.setSourceLocation(exprRefSourceLoc);
+                        exprRef.setValue(newVarRef);
                         context.computeAndSetTypeEnvironmentForOperator(newAssign);
                         context.computeAndSetTypeEnvironmentForOperator(joinOp);
                     }
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractGbyExpressionsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractGbyExpressionsRule.java
index 7a0012a..eb2bee6 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractGbyExpressionsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractGbyExpressionsRule.java
@@ -73,7 +73,9 @@
             ILogicalExpression expr = gbyPair.second.getValue();
             if (expr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
                 LogicalVariable v = extractExprIntoAssignOpRef(expr, opRef2, context);
-                gbyPair.second.setValue(new VariableReferenceExpression(v));
+                VariableReferenceExpression vRef = new VariableReferenceExpression(v);
+                vRef.setSourceLocation(expr.getSourceLocation());
+                gbyPair.second.setValue(vRef);
             }
         }
         return true;
@@ -88,7 +90,9 @@
             ILogicalExpression expr = decorPair.second.getValue();
             if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
                 LogicalVariable v = extractExprIntoAssignOpRef(expr, opRef2, context);
-                decorPair.second.setValue(new VariableReferenceExpression(v));
+                VariableReferenceExpression vRef = new VariableReferenceExpression(v);
+                vRef.setSourceLocation(expr.getSourceLocation());
+                decorPair.second.setValue(vRef);
             }
         }
         return true;
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractGroupByDecorVariablesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractGroupByDecorVariablesRule.java
index 05cc7b6..5bbc80d 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractGroupByDecorVariablesRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractGroupByDecorVariablesRule.java
@@ -76,7 +76,9 @@
             exprs.add(exprRef);
 
             // Normalizes the decor entry -- expression be a variable reference
-            decorVarExpr.second = new MutableObject<>(new VariableReferenceExpression(newVar));
+            VariableReferenceExpression newVarRef = new VariableReferenceExpression(newVar);
+            newVarRef.setSourceLocation(expr.getSourceLocation());
+            decorVarExpr.second = new MutableObject<>(newVarRef);
         }
         if (!changed) {
             return false;
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/FactorRedundantGroupAndDecorVarsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/FactorRedundantGroupAndDecorVarsRule.java
index 2f28a84..d549b90 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/FactorRedundantGroupAndDecorVarsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/FactorRedundantGroupAndDecorVarsRule.java
@@ -39,6 +39,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+import org.apache.hyracks.api.exceptions.SourceLocation;
 
 public class FactorRedundantGroupAndDecorVarsRule implements IAlgebraicRewriteRule {
 
@@ -78,8 +79,11 @@
             LogicalVariable lhs = varRhsToLhs.get(v);
             if (lhs != null) {
                 if (p.first != null) {
-                    AssignOperator assign = new AssignOperator(p.first,
-                            new MutableObject<ILogicalExpression>(new VariableReferenceExpression(lhs)));
+                    VariableReferenceExpression lhsRef = new VariableReferenceExpression(lhs);
+                    SourceLocation sourceLoc = p.second.getValue().getSourceLocation();
+                    lhsRef.setSourceLocation(sourceLoc);
+                    AssignOperator assign = new AssignOperator(p.first, new MutableObject<ILogicalExpression>(lhsRef));
+                    assign.setSourceLocation(sourceLoc);
                     ILogicalOperator op = opRef.getValue();
                     assign.getInputs().add(new MutableObject<ILogicalOperator>(op));
                     opRef.setValue(assign);
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushFunctionsBelowJoin.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushFunctionsBelowJoin.java
index d7090d2..e8f701c 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushFunctionsBelowJoin.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushFunctionsBelowJoin.java
@@ -195,7 +195,9 @@
                 LogicalVariable replacementVar = context.newVar();
                 assignVars.add(replacementVar);
                 assignExprs.add(new MutableObject<ILogicalExpression>(funcExpr));
-                funcExprRef.setValue(new VariableReferenceExpression(replacementVar));
+                VariableReferenceExpression replacementVarRef = new VariableReferenceExpression(replacementVar);
+                replacementVarRef.setSourceLocation(funcExpr.getSourceLocation());
+                funcExprRef.setValue(replacementVarRef);
                 funcIter.remove();
             }
         }
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/EliminateSubplanRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/EliminateSubplanRule.java
index 4cd15a0..2ce0e63 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/EliminateSubplanRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/EliminateSubplanRule.java
@@ -35,6 +35,7 @@
 import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
 import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+import org.apache.hyracks.api.exceptions.SourceLocation;
 
 public class EliminateSubplanRule implements IAlgebraicRewriteRule {
 
@@ -101,6 +102,7 @@
     private void elimSubplanOverEts(Mutable<ILogicalOperator> opRef, IOptimizationContext ctx)
             throws AlgebricksException {
         SubplanOperator subplan = (SubplanOperator) opRef.getValue();
+        SourceLocation sourceLoc = subplan.getSourceLocation();
         for (ILogicalPlan p : subplan.getNestedPlans()) {
             for (Mutable<ILogicalOperator> r : p.getRoots()) {
                 OperatorManipulationUtil.ntsToEts(r, ctx);
@@ -117,6 +119,7 @@
                 } else {
                     InnerJoinOperator j =
                             new InnerJoinOperator(new MutableObject<ILogicalExpression>(ConstantExpression.TRUE));
+                    j.setSourceLocation(sourceLoc);
                     j.getInputs().add(new MutableObject<ILogicalOperator>(topOp));
                     j.getInputs().add(r);
                     ctx.setOutputTypeEnvironment(j, j.computeOutputTypeEnvironment(ctx));
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/IntroduceGroupByForSubplanRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/IntroduceGroupByForSubplanRule.java
index 5118bf3..a988075 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/IntroduceGroupByForSubplanRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/IntroduceGroupByForSubplanRule.java
@@ -62,6 +62,7 @@
 import org.apache.hyracks.algebricks.core.config.AlgebricksConfig;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 import org.apache.hyracks.algebricks.rewriter.util.PhysicalOptimizationsUtil;
+import org.apache.hyracks.api.exceptions.SourceLocation;
 
 /**
  * The rule searches for SUBPLAN operator with a optional PROJECT operator and
@@ -236,26 +237,35 @@
             default:
                 break;
         }
+
+        SourceLocation sourceLoc = subplan.getSourceLocation();
+
         if (testForNull == null) {
             testForNull = context.newVar();
             AssignOperator tmpAsgn =
                     new AssignOperator(testForNull, new MutableObject<ILogicalExpression>(ConstantExpression.TRUE));
+            tmpAsgn.setSourceLocation(sourceLoc);
             tmpAsgn.getInputs().add(new MutableObject<ILogicalOperator>(rightRef.getValue()));
             rightRef.setValue(tmpAsgn);
             context.computeAndSetTypeEnvironmentForOperator(tmpAsgn);
         }
 
         IFunctionInfo finfoEq = context.getMetadataProvider().lookupFunction(AlgebricksBuiltinFunctions.IS_MISSING);
-        ILogicalExpression isNullTest = new ScalarFunctionCallExpression(finfoEq,
+        ScalarFunctionCallExpression isNullTest = new ScalarFunctionCallExpression(finfoEq,
                 new MutableObject<ILogicalExpression>(new VariableReferenceExpression(testForNull)));
+        isNullTest.setSourceLocation(sourceLoc);
         IFunctionInfo finfoNot = context.getMetadataProvider().lookupFunction(AlgebricksBuiltinFunctions.NOT);
         ScalarFunctionCallExpression nonNullTest =
                 new ScalarFunctionCallExpression(finfoNot, new MutableObject<ILogicalExpression>(isNullTest));
+        nonNullTest.setSourceLocation(sourceLoc);
         SelectOperator selectNonNull =
                 new SelectOperator(new MutableObject<ILogicalExpression>(nonNullTest), false, null);
+        selectNonNull.setSourceLocation(sourceLoc);
         GroupByOperator g = new GroupByOperator();
+        g.setSourceLocation(sourceLoc);
         Mutable<ILogicalOperator> newSubplanRef = new MutableObject<ILogicalOperator>(subplan);
         NestedTupleSourceOperator nts = new NestedTupleSourceOperator(new MutableObject<ILogicalOperator>(g));
+        nts.setSourceLocation(sourceLoc);
         opRef.setValue(g);
         selectNonNull.getInputs().add(new MutableObject<ILogicalOperator>(nts));
 
@@ -318,10 +328,12 @@
     private Map<LogicalVariable, LogicalVariable> buildVarExprList(Collection<LogicalVariable> vars,
             IOptimizationContext context, GroupByOperator g,
             List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> outVeList) throws AlgebricksException {
+        SourceLocation sourceLoc = g.getSourceLocation();
         Map<LogicalVariable, LogicalVariable> m = new HashMap<LogicalVariable, LogicalVariable>();
         for (LogicalVariable ov : vars) {
             LogicalVariable newVar = context.newVar();
             ILogicalExpression varExpr = new VariableReferenceExpression(newVar);
+            ((VariableReferenceExpression) varExpr).setSourceLocation(sourceLoc);
             outVeList.add(new Pair<LogicalVariable, Mutable<ILogicalExpression>>(ov,
                     new MutableObject<ILogicalExpression>(varExpr)));
             for (ILogicalPlan p : g.getNestedPlans()) {
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/IntroduceLeftOuterJoinForSubplanRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/IntroduceLeftOuterJoinForSubplanRule.java
index f809e96..e37be1b 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/IntroduceLeftOuterJoinForSubplanRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/IntroduceLeftOuterJoinForSubplanRule.java
@@ -88,6 +88,7 @@
                 }
                 ntsRef.setValue(opUnder.getValue());
                 LeftOuterJoinOperator loj = new LeftOuterJoinOperator(join.getCondition());
+                loj.setSourceLocation(join.getSourceLocation());
                 loj.getInputs().add(leftRef);
                 loj.getInputs().add(rightRef);
                 opRef.setValue(loj);
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/NestedSubplanToJoinRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/NestedSubplanToJoinRule.java
index d9acf53..53b8eff 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/NestedSubplanToJoinRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/NestedSubplanToJoinRule.java
@@ -39,6 +39,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+import org.apache.hyracks.api.exceptions.SourceLocation;
 
 /**
  * replace Subplan operators with nested loop joins where the join condition is true, if the Subplan
@@ -103,20 +104,24 @@
                 continue;
             }
 
+            SourceLocation sourceLoc = subplan.getSourceLocation();
+
             /**
              * Expends the input and roots into a DAG of nested loop joins.
              * Though joins should be left-outer joins, a left-outer join with condition TRUE is equivalent to an inner join.
              **/
             Mutable<ILogicalExpression> expr = new MutableObject<ILogicalExpression>(ConstantExpression.TRUE);
             Mutable<ILogicalOperator> nestedRootRef = nestedRoots.get(0);
-            ILogicalOperator join =
+            InnerJoinOperator join =
                     new InnerJoinOperator(expr, new MutableObject<ILogicalOperator>(subplanInput), nestedRootRef);
+            join.setSourceLocation(sourceLoc);
 
             /** rewrite the nested tuple source to be empty tuple source */
             rewriteNestedTupleSource(nestedRootRef, context);
 
             for (int i = 1; i < nestedRoots.size(); i++) {
                 join = new InnerJoinOperator(expr, new MutableObject<ILogicalOperator>(join), nestedRoots.get(i));
+                join.setSourceLocation(sourceLoc);
             }
             op1.getInputs().get(index).setValue(join);
             context.computeAndSetTypeEnvironmentForOperator(join);
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java
index f90de81..82b6f9c 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java
@@ -22,7 +22,11 @@
 
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
 
 public interface IPushRuntimeFactory extends Serializable {
     IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException;
+
+    default void setSourceLocation(SourceLocation sourceLoc) {
+    }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
index 0a578f6..b260a8a 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
@@ -150,8 +150,8 @@
                     ArrayTupleBuilder tb = outputWriter.getTupleBuilder();
                     byte[] data = tb.getByteArray();
                     if (data.length > memoryBudget) {
-                        throw HyracksDataException.create(ErrorCode.GROUP_BY_MEMORY_BUDGET_EXCEEDS, data.length,
-                                memoryBudget);
+                        throw HyracksDataException.create(ErrorCode.GROUP_BY_MEMORY_BUDGET_EXCEEDS, sourceLoc,
+                                data.length, memoryBudget);
                     }
                 }
             }
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
index 75b2fb2..f057515 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
@@ -37,11 +37,11 @@
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.std.group.AbstractAggregatorDescriptorFactory;
 import org.apache.hyracks.dataflow.std.group.AggregateState;
 import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptor;
-import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 
-public class NestedPlansRunningAggregatorFactory implements IAggregatorDescriptorFactory {
+public class NestedPlansRunningAggregatorFactory extends AbstractAggregatorDescriptorFactory {
 
     private static final long serialVersionUID = 1L;
     private final AlgebricksPipeline[] subplans;
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java
index 7b3fb46..525e452 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java
@@ -19,11 +19,10 @@
 package org.apache.hyracks.algebricks.runtime.operators.base;
 
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
-import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
-public abstract class AbstractOneInputOneOutputRuntimeFactory implements IPushRuntimeFactory {
+public abstract class AbstractOneInputOneOutputRuntimeFactory extends AbstractPushRuntimeFactory {
 
     private static final long serialVersionUID = 1L;
 
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractPushRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractPushRuntimeFactory.java
new file mode 100644
index 0000000..89f3696
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractPushRuntimeFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.runtime.operators.base;
+
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+
+public abstract class AbstractPushRuntimeFactory implements IPushRuntimeFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    protected SourceLocation sourceLoc;
+
+    @Override
+    public void setSourceLocation(SourceLocation sourceLoc) {
+        this.sourceLoc = sourceLoc;
+    }
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java
index f0e9406..1aceadc 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java
@@ -21,11 +21,10 @@
 import java.nio.ByteBuffer;
 
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
-import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
-public class SinkRuntimeFactory implements IPushRuntimeFactory {
+public class SinkRuntimeFactory extends AbstractPushRuntimeFactory {
 
     private static final long serialVersionUID = 1L;
 
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
index aefc99d..b1b652f 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
@@ -172,7 +172,7 @@
                         }
                     }
                 } catch (HyracksDataException e) {
-                    throw HyracksDataException.create(ErrorCode.ERROR_PROCESSING_TUPLE, e, tupleIndex);
+                    throw HyracksDataException.create(ErrorCode.ERROR_PROCESSING_TUPLE, e, sourceLoc, tupleIndex);
                 }
             }
 
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
index 67f4a77..7bd924d 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
@@ -19,15 +19,15 @@
 package org.apache.hyracks.algebricks.runtime.operators.std;
 
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
-import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputSourcePushRuntime;
+import org.apache.hyracks.algebricks.runtime.operators.base.AbstractPushRuntimeFactory;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 
-public class EmptyTupleSourceRuntimeFactory implements IPushRuntimeFactory {
+public class EmptyTupleSourceRuntimeFactory extends AbstractPushRuntimeFactory {
 
     private static final long serialVersionUID = 1L;
 
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
index 8e64092..f94672d 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
@@ -21,12 +21,12 @@
 import java.nio.ByteBuffer;
 
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
-import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
+import org.apache.hyracks.algebricks.runtime.operators.base.AbstractPushRuntimeFactory;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
-public class NestedTupleSourceRuntimeFactory implements IPushRuntimeFactory {
+public class NestedTupleSourceRuntimeFactory extends AbstractPushRuntimeFactory {
 
     private static final long serialVersionUID = 1L;
 
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java
index 8a06ecf..7d6f851 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java
@@ -21,12 +21,12 @@
 import org.apache.hyracks.algebricks.data.IAWriter;
 import org.apache.hyracks.algebricks.data.IPrinterFactory;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
-import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.operators.base.AbstractPushRuntimeFactory;
 import org.apache.hyracks.algebricks.runtime.writers.PrinterBasedWriterFactory;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 
-public class PrinterRuntimeFactory implements IPushRuntimeFactory {
+public class PrinterRuntimeFactory extends AbstractPushRuntimeFactory {
 
     private static final long serialVersionUID = 1L;
 
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java
index d41b464..eae8178 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java
@@ -28,12 +28,12 @@
 import org.apache.hyracks.algebricks.data.IAWriterFactory;
 import org.apache.hyracks.algebricks.data.IPrinterFactory;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
-import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.operators.base.AbstractPushRuntimeFactory;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
-public class SinkWriterRuntimeFactory implements IPushRuntimeFactory {
+public class SinkWriterRuntimeFactory extends AbstractPushRuntimeFactory {
 
     private static final long serialVersionUID = 1L;
 
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/union/MicroUnionAllRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/union/MicroUnionAllRuntimeFactory.java
index 1706e59..e727551 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/union/MicroUnionAllRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/union/MicroUnionAllRuntimeFactory.java
@@ -24,13 +24,13 @@
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
-import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.operators.base.AbstractPushRuntimeFactory;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
-public class MicroUnionAllRuntimeFactory implements IPushRuntimeFactory {
+public class MicroUnionAllRuntimeFactory extends AbstractPushRuntimeFactory {
 
     private static final long serialVersionUID = 1L;