[NO ISSUE][COMP] Add sanity checks to the optimizer

Details:
- Introduce sanity checking code in the optimizer that
  runs before and after each rule (when enabled)
- PlanStructureVerifier runs after each fired rule and checks
  that there are no shared references/instances in the plan
- PlanStabilityVerifier runs after each failed rule and checks
  that the rule did not make any changes in the plan
- Add new compiler property "compiler.internal.sanitycheck"
  which controls whether the above sanity checks are
  performed during query compilation.
  It's default value is 'false', so sanity checks are disabled
- Modify testsuite configuration to enable these sanity checks
- Remove ConstantExpression.setValue()
- Add IPlanPrettyPrinter.printExpression()

Change-Id: I4e53f57dd6263d3b292700490d9e57f5cd48cc19
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/7407
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Dmitry Lychagin <dmitry.lychagin@couchbase.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/hyracks-fullstack/algebricks/algebricks-core/pom.xml b/hyracks-fullstack/algebricks/algebricks-core/pom.xml
index 35acfb8..f0972b7 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/pom.xml
+++ b/hyracks-fullstack/algebricks/algebricks-core/pom.xml
@@ -102,5 +102,10 @@
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-core</artifactId>
     </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
index 726a5df..166ab9a 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
@@ -25,6 +25,8 @@
 import org.apache.hyracks.algebricks.core.algebra.expressions.IMergeAggregationExpressionFactory;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableEvalSizeEnvironment;
 import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import org.apache.hyracks.algebricks.core.algebra.plan.PlanStabilityVerifier;
+import org.apache.hyracks.algebricks.core.algebra.plan.PlanStructureVerifier;
 import org.apache.hyracks.algebricks.core.algebra.prettyprint.IPlanPrettyPrinter;
 import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
 import org.apache.hyracks.algebricks.core.algebra.properties.ILogicalPropertiesVector;
@@ -87,4 +89,8 @@
     public INodeDomain getComputationNodeDomain();
 
     public IWarningCollector getWarningCollector();
+
+    public PlanStructureVerifier getPlanStructureVerifier();
+
+    public PlanStabilityVerifier getPlanStabilityVerifier();
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/ConstantExpression.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/ConstantExpression.java
index 42ff3c0..40e729d 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/ConstantExpression.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/ConstantExpression.java
@@ -19,9 +19,7 @@
 package org.apache.hyracks.algebricks.core.algebra.expressions;
 
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -31,7 +29,7 @@
 import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionVisitor;
 
 public final class ConstantExpression extends AbstractLogicalExpression {
-    private IAlgebricksConstantValue value;
+    private final IAlgebricksConstantValue value;
 
     public static final ConstantExpression TRUE = new ConstantExpression(new IAlgebricksConstantValue() {
 
@@ -115,8 +113,6 @@
         }
     });
 
-    private Map<Object, IExpressionAnnotation> annotationMap = new HashMap<>();
-
     public ConstantExpression(IAlgebricksConstantValue value) {
         this.value = value;
     }
@@ -125,10 +121,6 @@
         return value;
     }
 
-    public void setValue(IAlgebricksConstantValue value) {
-        this.value = value;
-    }
-
     @Override
     public LogicalExpressionTag getExpressionTag() {
         return LogicalExpressionTag.CONSTANT;
@@ -170,21 +162,13 @@
 
     @Override
     public AbstractLogicalExpression cloneExpression() {
-        Map<Object, IExpressionAnnotation> m = new HashMap<>();
-        annotationMap.forEach((key, value1) -> m.put(key, value1.copy()));
         ConstantExpression c = new ConstantExpression(value);
-        c.annotationMap = m;
         c.setSourceLocation(sourceLoc);
         return c;
     }
 
-    public Map<Object, IExpressionAnnotation> getAnnotations() {
-        return annotationMap;
-    }
-
     @Override
     public boolean splitIntoConjuncts(List<Mutable<ILogicalExpression>> conjs) {
         return false;
     }
-
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/plan/PlanStabilityVerifier.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/plan/PlanStabilityVerifier.java
new file mode 100644
index 0000000..3f7a0c1
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/plan/PlanStabilityVerifier.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.core.algebra.plan;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.List;
+import java.util.function.BiFunction;
+import java.util.function.BiPredicate;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
+import org.apache.hyracks.algebricks.core.algebra.prettyprint.IPlanPrettyPrinter;
+import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+
+/**
+ * Verifies whether there were any changes in the plan and fails if it finds:
+ * <ul>
+ * <li>new or deleted operator reference: {@code MutableObject<ILogicalOperator>}</li>
+ * <li>new or deleted operator instance: {@code ILogicalOperator}</li>
+ * <li>new or deleted expression reference: {@code MutableObject<ILogicalExpression>}</li>
+ * <li>new or deleted expression instance {@code ILogicalExpression}</li>
+ * </ul>
+ *
+ * Usage:
+ * <ol>
+ * <li>Invoke {@link #recordPlanSignature(Mutable)} to save the plan signature</li>
+ * <li>Run an optimization rule on this plan
+ * <li>If the rule said that it didn't make any changes then
+ * invoke {@link #comparePlanSignature(Mutable)} to verify that</li>
+ * <li>(optionally) Invoke {@link #discardPlanSignature()} to discard the recorded state</li>
+ * </ol>
+ */
+public final class PlanStabilityVerifier {
+
+    static final String MSG_CREATED = "created";
+
+    static final String MSG_DELETED = "deleted";
+
+    static final String MSG_OPERATOR_REFERENCE = "operator reference (Mutable) to";
+
+    static final String MSG_OPERATOR_INSTANCE = "operator instance";
+
+    static final String MSG_EXPRESSION_REFERENCE = "expression reference (Mutable) to";
+
+    static final String MSG_EXPRESSION_INSTANCE = "expression instance";
+
+    private static final String ERROR_MESSAGE_TEMPLATE = "%s %s (%s)";
+
+    private static final int COLL_INIT_CAPACITY = 256;
+
+    private final PlanSignatureRecorderVisitor recorderVisitor = new PlanSignatureRecorderVisitor();
+
+    private final PlanStabilityVerifierVisitor verifierVisitor = new PlanStabilityVerifierVisitor();
+
+    private final List<Mutable<ILogicalOperator>> opRefColl = new ArrayList<>(COLL_INIT_CAPACITY);
+
+    private final List<ILogicalOperator> opColl = new ArrayList<>(COLL_INIT_CAPACITY);
+
+    private final List<Mutable<ILogicalExpression>> exprRefColl = new ArrayList<>(COLL_INIT_CAPACITY);
+
+    private final List<ILogicalExpression> exprColl = new ArrayList<>(COLL_INIT_CAPACITY);
+
+    private final Deque<Mutable<ILogicalOperator>> workQueue = new ArrayDeque<>(COLL_INIT_CAPACITY);
+
+    private final IPlanPrettyPrinter prettyPrinter;
+
+    public PlanStabilityVerifier(IPlanPrettyPrinter prettyPrinter) {
+        this.prettyPrinter = prettyPrinter;
+    }
+
+    public void recordPlanSignature(Mutable<ILogicalOperator> opRef) throws AlgebricksException {
+        reset();
+        walk(opRef, recorderVisitor);
+    }
+
+    public void comparePlanSignature(Mutable<ILogicalOperator> opRef) throws AlgebricksException {
+        if (opRefColl.isEmpty()) {
+            throw new IllegalStateException();
+        }
+        try {
+            walk(opRef, verifierVisitor);
+            ensureEmpty(opRefColl, MSG_DELETED, MSG_OPERATOR_REFERENCE, PlanStabilityVerifier::printOperator);
+            ensureEmpty(opColl, MSG_DELETED, MSG_OPERATOR_INSTANCE, PlanStabilityVerifier::printOperator);
+            ensureEmpty(exprRefColl, MSG_DELETED, MSG_EXPRESSION_REFERENCE, PlanStabilityVerifier::printExpression);
+            ensureEmpty(exprColl, MSG_DELETED, MSG_EXPRESSION_INSTANCE, PlanStabilityVerifier::printExpression);
+        } finally {
+            reset();
+        }
+    }
+
+    public void discardPlanSignature() {
+        reset();
+    }
+
+    private void reset() {
+        opRefColl.clear();
+        opColl.clear();
+        exprRefColl.clear();
+        exprColl.clear();
+    }
+
+    private final class PlanSignatureRecorderVisitor extends AbstractStabilityCheckingVisitor {
+        @Override
+        public void visit(Mutable<ILogicalOperator> opRef) throws AlgebricksException {
+            ILogicalOperator op = opRef.getValue();
+            // visit input ops of a multi-output op only if it's the first time we see this operator
+            boolean skipInputs = OperatorPropertiesUtil.isMultiOutputOperator(op) && findItem(opColl, op) >= 0;
+
+            opRefColl.add(opRef);
+            opColl.add(op);
+
+            if (!skipInputs) {
+                super.visit(opRef);
+            }
+        }
+
+        @Override
+        protected void addChildToWorkQueue(Mutable<ILogicalOperator> childOpRef, boolean addFirst)
+                throws AlgebricksException {
+            ILogicalOperator childOp = childOpRef.getValue();
+            if (!OperatorPropertiesUtil.isMultiOutputOperator(childOp) && opColl.contains(childOp)) {
+                throw new AlgebricksException("cycle: " + printOperator(childOp, prettyPrinter));
+            }
+            super.addChildToWorkQueue(childOpRef, addFirst);
+        }
+
+        @Override
+        protected void visitExpression(Mutable<ILogicalExpression> exprRef) {
+            exprRefColl.add(exprRef);
+            exprColl.add(exprRef.getValue());
+        }
+    }
+
+    private final class PlanStabilityVerifierVisitor extends AbstractStabilityCheckingVisitor {
+        @Override
+        public void visit(Mutable<ILogicalOperator> opRef) throws AlgebricksException {
+            int idx = findItem(opRefColl, opRef);
+            if (idx < 0) {
+                raiseException(MSG_CREATED, MSG_OPERATOR_REFERENCE, printOperator(opRef, prettyPrinter));
+            }
+            opRefColl.set(idx, null);
+
+            ILogicalOperator op = opRef.getValue();
+            idx = findItem(opColl, op);
+            if (idx < 0) {
+                raiseException(MSG_CREATED, MSG_OPERATOR_INSTANCE, printOperator(op, prettyPrinter));
+            }
+            opColl.set(idx, null);
+
+            // visit input ops of a multi-output op only if it's the last time we see this operator
+            boolean skipInputs = OperatorPropertiesUtil.isMultiOutputOperator(op) && findItem(opColl, op) >= 0;
+            if (!skipInputs) {
+                super.visit(opRef);
+            }
+        }
+
+        protected void visitExpression(Mutable<ILogicalExpression> exprRef) throws AlgebricksException {
+            int idx = findItem(exprRefColl, exprRef);
+            if (idx < 0) {
+                raiseException(MSG_CREATED, MSG_EXPRESSION_REFERENCE, printExpression(exprRef, prettyPrinter));
+            }
+            exprRefColl.set(idx, null);
+
+            ILogicalExpression expr = exprRef.getValue();
+            idx = findItem(exprColl, expr);
+            if (idx < 0) {
+                raiseException(MSG_CREATED, MSG_EXPRESSION_INSTANCE, printExpression(expr, prettyPrinter));
+            }
+            exprColl.set(idx, null);
+        }
+    }
+
+    private abstract class AbstractStabilityCheckingVisitor
+            implements IMutableReferenceVisitor<ILogicalOperator>, ILogicalExpressionReferenceTransform {
+
+        @Override
+        public void visit(Mutable<ILogicalOperator> opRef) throws AlgebricksException {
+            ILogicalOperator op = opRef.getValue();
+
+            op.acceptExpressionTransform(this);
+
+            for (Mutable<ILogicalOperator> inputOpRef : op.getInputs()) {
+                addChildToWorkQueue(inputOpRef, true);
+            }
+            if (op instanceof AbstractOperatorWithNestedPlans) {
+                for (ILogicalPlan nestedPlan : ((AbstractOperatorWithNestedPlans) op).getNestedPlans()) {
+                    for (Mutable<ILogicalOperator> nestedPlanRoot : nestedPlan.getRoots()) {
+                        addChildToWorkQueue(nestedPlanRoot, false);
+                    }
+                }
+            }
+        }
+
+        protected abstract void visitExpression(Mutable<ILogicalExpression> exprRef) throws AlgebricksException;
+
+        @Override
+        public final boolean transform(Mutable<ILogicalExpression> exprRef) throws AlgebricksException {
+            visitExpression(exprRef);
+            return false;
+        }
+
+        protected void addChildToWorkQueue(Mutable<ILogicalOperator> childOpRef, boolean addFirst)
+                throws AlgebricksException {
+            if (addFirst) {
+                workQueue.addFirst(childOpRef);
+            } else {
+                workQueue.add(childOpRef);
+            }
+
+        }
+    }
+
+    private void raiseException(String operationKind, String referenceKind, String entity) throws AlgebricksException {
+        String errorMessage = String.format(ERROR_MESSAGE_TEMPLATE, operationKind, referenceKind, entity);
+        throw new AlgebricksException(errorMessage);
+    }
+
+    private <T> void ensureEmpty(List<T> list, String operationKind, String referenceKind,
+            BiFunction<T, IPlanPrettyPrinter, String> printFunction) throws AlgebricksException {
+        int idx = findNonNull(list);
+        if (idx >= 0) {
+            T listItem = list.get(idx);
+            raiseException(operationKind, referenceKind, printFunction.apply(listItem, prettyPrinter));
+        }
+    }
+
+    private static <T> int findItem(List<T> list, T item) {
+        return indexOf(list, (listItem, paramItem) -> listItem == paramItem, item);
+    }
+
+    private static <T> int findNonNull(List<T> list) {
+        return indexOf(list, (listItem, none) -> listItem != null, null);
+    }
+
+    private static <T, U> int indexOf(List<T> list, BiPredicate<T, U> predicate, U predicateParam) {
+        for (int i = 0, n = list.size(); i < n; i++) {
+            T listItem = list.get(i);
+            if (predicate.test(listItem, predicateParam)) {
+                return i;
+            }
+        }
+        return -1;
+    }
+
+    static String printOperator(Mutable<ILogicalOperator> opRef, IPlanPrettyPrinter printer) {
+        return printOperator(opRef.getValue(), printer);
+    }
+
+    static String printOperator(ILogicalOperator op, IPlanPrettyPrinter printer) {
+        try {
+            return printer.reset().printOperator((AbstractLogicalOperator) op, false).toString();
+        } catch (AlgebricksException e) {
+            // shouldn't happen
+            return op.toString();
+        }
+    }
+
+    static String printExpression(Mutable<ILogicalExpression> exprRef, IPlanPrettyPrinter printer) {
+        return printExpression(exprRef.getValue(), printer);
+    }
+
+    static String printExpression(ILogicalExpression expr, IPlanPrettyPrinter printer) {
+        try {
+            return printer.reset().printExpression(expr).toString();
+        } catch (AlgebricksException e) {
+            // shouldn't happen
+            return expr.toString();
+        }
+    }
+
+    private void walk(Mutable<ILogicalOperator> opRef, IMutableReferenceVisitor<ILogicalOperator> visitor)
+            throws AlgebricksException {
+        if (!workQueue.isEmpty()) {
+            throw new IllegalStateException();
+        }
+        Mutable<ILogicalOperator> currentOpRef = opRef;
+        do {
+            visitor.visit(currentOpRef);
+            currentOpRef = workQueue.pollFirst();
+        } while (currentOpRef != null);
+    }
+
+    private interface IMutableReferenceVisitor<T> {
+        void visit(Mutable<T> ref) throws AlgebricksException;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/plan/PlanStructureVerifier.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/plan/PlanStructureVerifier.java
new file mode 100644
index 0000000..1fad860
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/plan/PlanStructureVerifier.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.core.algebra.plan;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
+import org.apache.hyracks.algebricks.core.algebra.prettyprint.IPlanPrettyPrinter;
+import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+
+/**
+ * Verifies plan structure and fails if it finds
+ * <ul>
+ * <li>a shared operator reference: {@code MutableObject<ILogicalOperator>}</li>
+ * <li>a shared operator instance: {@code ILogicalOperator} (except if it's a multi-output operator)</li>
+ * <li>a shared expression reference: {@code MutableObject<ILogicalExpression>}</li>
+ * <li>a shared expression instance {@code ILogicalExpression} (except if it's a {@code ConstantExpression})</li>
+ * </ul>
+ */
+public final class PlanStructureVerifier {
+
+    private static final String ERROR_MESSAGE_TEMPLATE_1 = "shared %s (%s) in %s";
+
+    private static final String ERROR_MESSAGE_TEMPLATE_2 = "shared %s (%s) between %s and %s";
+
+    private final ExpressionReferenceVerifierVisitor exprVisitor = new ExpressionReferenceVerifierVisitor();
+
+    private final Map<Mutable<ILogicalOperator>, ILogicalOperator> opRefMap = new IdentityHashMap<>();
+
+    private final Map<ILogicalOperator, ILogicalOperator> opMap = new IdentityHashMap<>();
+
+    private final Map<Mutable<ILogicalExpression>, ILogicalOperator> exprRefMap = new IdentityHashMap<>();
+
+    private final Map<ILogicalExpression, ILogicalOperator> exprMap = new IdentityHashMap<>();
+
+    private final Deque<Pair<Mutable<ILogicalOperator>, ILogicalOperator>> workQueue = new ArrayDeque<>();
+
+    private final IPlanPrettyPrinter prettyPrinter;
+
+    public PlanStructureVerifier(IPlanPrettyPrinter prettyPrinter) {
+        this.prettyPrinter = prettyPrinter;
+    }
+
+    public void verifyPlanStructure(Mutable<ILogicalOperator> opRef) throws AlgebricksException {
+        reset();
+        walk(opRef);
+        reset();
+    }
+
+    private void reset() {
+        opRefMap.clear();
+        opMap.clear();
+        exprRefMap.clear();
+        exprMap.clear();
+    }
+
+    private void walk(Mutable<ILogicalOperator> opRef) throws AlgebricksException {
+        if (!workQueue.isEmpty()) {
+            throw new IllegalStateException();
+        }
+        workQueue.add(new Pair<>(opRef, null));
+        for (;;) {
+            Pair<Mutable<ILogicalOperator>, ILogicalOperator> p = workQueue.pollFirst();
+            if (p == null) {
+                break;
+            }
+            Mutable<ILogicalOperator> currentOpRef = p.first;
+            ILogicalOperator currentOp = currentOpRef.getValue();
+            ILogicalOperator parentOp = p.second;
+
+            List<Mutable<ILogicalOperator>> childOps = visitOp(currentOpRef, parentOp);
+
+            for (Mutable<ILogicalOperator> childOpRef : childOps) {
+                ILogicalOperator childOp = childOpRef.getValue();
+                if (!OperatorPropertiesUtil.isMultiOutputOperator(childOp) && opMap.containsKey(childOp)) {
+                    throw new AlgebricksException(
+                            "cycle: " + PlanStabilityVerifier.printOperator(childOp, prettyPrinter));
+                }
+                workQueue.add(new Pair<>(childOpRef, currentOp));
+            }
+        }
+    }
+
+    private List<Mutable<ILogicalOperator>> visitOp(Mutable<ILogicalOperator> opRef, ILogicalOperator parentOp)
+            throws AlgebricksException {
+        ILogicalOperator op = opRef.getValue();
+        ILogicalOperator firstParentOp;
+        firstParentOp = opRefMap.put(opRef, parentOp);
+        if (firstParentOp != null) {
+            raiseException(PlanStabilityVerifier.MSG_OPERATOR_REFERENCE,
+                    PlanStabilityVerifier.printOperator(op, prettyPrinter), firstParentOp, parentOp);
+        }
+
+        if (OperatorPropertiesUtil.isMultiOutputOperator(op) && opMap.containsKey(op)) {
+            // don't visit input ops because we've already looked at them
+            return Collections.emptyList();
+        }
+
+        firstParentOp = opMap.put(op, parentOp);
+        if (firstParentOp != null) {
+            raiseException(PlanStabilityVerifier.MSG_OPERATOR_INSTANCE,
+                    PlanStabilityVerifier.printOperator(op, prettyPrinter), firstParentOp, parentOp);
+        }
+
+        exprVisitor.setOperator(op);
+        op.acceptExpressionTransform(exprVisitor);
+
+        List<Mutable<ILogicalOperator>> children = op.getInputs();
+        if (op instanceof AbstractOperatorWithNestedPlans) {
+            children = new ArrayList<>(children);
+            for (ILogicalPlan nestedPlan : ((AbstractOperatorWithNestedPlans) op).getNestedPlans()) {
+                children.addAll(nestedPlan.getRoots());
+            }
+        }
+        return children;
+    }
+
+    private void raiseException(String sharedReferenceKind, String sharedEntity, ILogicalOperator firstOp,
+            ILogicalOperator secondOp) throws AlgebricksException {
+        String errorMessage;
+        if (firstOp == secondOp) {
+            errorMessage = String.format(ERROR_MESSAGE_TEMPLATE_1, sharedReferenceKind, sharedEntity,
+                    PlanStabilityVerifier.printOperator(firstOp, prettyPrinter));
+        } else {
+            errorMessage = String.format(ERROR_MESSAGE_TEMPLATE_2, sharedReferenceKind, sharedEntity,
+                    PlanStabilityVerifier.printOperator(firstOp, prettyPrinter),
+                    PlanStabilityVerifier.printOperator(secondOp, prettyPrinter));
+        }
+        throw new AlgebricksException(errorMessage);
+    }
+
+    private final class ExpressionReferenceVerifierVisitor implements ILogicalExpressionReferenceTransform {
+
+        private ILogicalOperator currentOp;
+
+        void setOperator(ILogicalOperator currentOp) {
+            this.currentOp = Objects.requireNonNull(currentOp);
+        }
+
+        @Override
+        public boolean transform(Mutable<ILogicalExpression> exprRef) throws AlgebricksException {
+            ILogicalExpression expr = exprRef.getValue();
+            ILogicalOperator firstOp;
+            firstOp = exprRefMap.put(exprRef, currentOp);
+            if (firstOp != null) {
+                raiseException(PlanStabilityVerifier.MSG_EXPRESSION_REFERENCE,
+                        PlanStabilityVerifier.printExpression(expr, prettyPrinter), firstOp, currentOp);
+            }
+            if (expr.getExpressionTag() != LogicalExpressionTag.CONSTANT) {
+                firstOp = exprMap.put(expr, currentOp);
+                if (firstOp != null) {
+                    raiseException(PlanStabilityVerifier.MSG_EXPRESSION_INSTANCE,
+                            PlanStabilityVerifier.printExpression(expr, prettyPrinter), firstOp, currentOp);
+                }
+            }
+            return false;
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/IPlanPrettyPrinter.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/IPlanPrettyPrinter.java
index 922e0ee..faf4976 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/IPlanPrettyPrinter.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/IPlanPrettyPrinter.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.algebricks.core.algebra.prettyprint;
 
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 
@@ -28,7 +29,15 @@
 public interface IPlanPrettyPrinter {
 
     /** Prints the plan rooted at the operator argument. */
-    IPlanPrettyPrinter printOperator(AbstractLogicalOperator operator) throws AlgebricksException;
+    default IPlanPrettyPrinter printOperator(AbstractLogicalOperator operator) throws AlgebricksException {
+        return printOperator(operator, true);
+    }
+
+    /** Prints given operator and optionally it's inputs */
+    IPlanPrettyPrinter printOperator(AbstractLogicalOperator operator, boolean printInputs) throws AlgebricksException;
+
+    /** Prints given expression */
+    IPlanPrettyPrinter printExpression(ILogicalExpression expression) throws AlgebricksException;
 
     /** Prints the whole logical plan. */
     IPlanPrettyPrinter printPlan(ILogicalPlan plan) throws AlgebricksException;
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index 52c7e2a..1d560ce 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -94,18 +94,20 @@
     }
 
     @Override
-    public final IPlanPrettyPrinter printOperator(AbstractLogicalOperator op) throws AlgebricksException {
-        printOperatorImpl(op, 0);
+    public final IPlanPrettyPrinter printOperator(AbstractLogicalOperator op, boolean printInputs)
+            throws AlgebricksException {
+        printOperatorImpl(op, 0, printInputs);
         return this;
     }
 
     private void printPlanImpl(ILogicalPlan plan, int indent) throws AlgebricksException {
         for (Mutable<ILogicalOperator> root : plan.getRoots()) {
-            printOperatorImpl((AbstractLogicalOperator) root.getValue(), indent);
+            printOperatorImpl((AbstractLogicalOperator) root.getValue(), indent, true);
         }
     }
 
-    private void printOperatorImpl(AbstractLogicalOperator op, int indent) throws AlgebricksException {
+    private void printOperatorImpl(AbstractLogicalOperator op, int indent, boolean printInputs)
+            throws AlgebricksException {
         op.accept(this, indent);
         IPhysicalOperator pOp = op.getPhysicalOperator();
 
@@ -117,12 +119,20 @@
             appendln(buffer, " -- |" + op.getExecutionMode() + "|");
         }
 
-        for (Mutable<ILogicalOperator> i : op.getInputs()) {
-            printOperatorImpl((AbstractLogicalOperator) i.getValue(), indent + INIT_INDENT);
+        if (printInputs) {
+            for (Mutable<ILogicalOperator> i : op.getInputs()) {
+                printOperatorImpl((AbstractLogicalOperator) i.getValue(), indent + INIT_INDENT, printInputs);
+            }
         }
     }
 
     @Override
+    public IPlanPrettyPrinter printExpression(ILogicalExpression expression) throws AlgebricksException {
+        buffer.append(expression.accept(exprVisitor, 0));
+        return this;
+    }
+
+    @Override
     public Void visitAggregateOperator(AggregateOperator op, Integer indent) throws AlgebricksException {
         addIndent(indent).append("aggregate ").append(str(op.getVariables())).append(" <- ");
         pprintExprList(op.getExpressions(), indent);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
index 21ea331..999a818 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
@@ -159,8 +159,9 @@
     }
 
     @Override
-    public final IPlanPrettyPrinter printOperator(AbstractLogicalOperator op) throws AlgebricksException {
-        printOperatorImpl(op);
+    public final IPlanPrettyPrinter printOperator(AbstractLogicalOperator op, boolean printInputs)
+            throws AlgebricksException {
+        printOperatorImpl(op, printInputs);
         flushContentToWriter();
         return this;
     }
@@ -172,7 +173,7 @@
                 jsonGenerator.writeStartArray();
             }
             for (Mutable<ILogicalOperator> root : plan.getRoots()) {
-                printOperatorImpl((AbstractLogicalOperator) root.getValue());
+                printOperatorImpl((AbstractLogicalOperator) root.getValue(), true);
             }
             if (writeArrayOfRoots) {
                 jsonGenerator.writeEndArray();
@@ -182,7 +183,7 @@
         }
     }
 
-    private void printOperatorImpl(AbstractLogicalOperator op) throws AlgebricksException {
+    private void printOperatorImpl(AbstractLogicalOperator op, boolean printInputs) throws AlgebricksException {
         try {
             jsonGenerator.writeStartObject();
             op.accept(this, null);
@@ -192,10 +193,10 @@
                 jsonGenerator.writeStringField("physical-operator", pOp.toString());
             }
             jsonGenerator.writeStringField("execution-mode", op.getExecutionMode().toString());
-            if (!op.getInputs().isEmpty()) {
+            if (printInputs && !op.getInputs().isEmpty()) {
                 jsonGenerator.writeArrayFieldStart("inputs");
                 for (Mutable<ILogicalOperator> k : op.getInputs()) {
-                    printOperatorImpl((AbstractLogicalOperator) k.getValue());
+                    printOperatorImpl((AbstractLogicalOperator) k.getValue(), printInputs);
                 }
                 jsonGenerator.writeEndArray();
             }
@@ -206,6 +207,16 @@
     }
 
     @Override
+    public IPlanPrettyPrinter printExpression(ILogicalExpression expression) throws AlgebricksException {
+        try {
+            jsonGenerator.writeString(expression.accept(exprVisitor, null));
+            return this;
+        } catch (IOException e) {
+            throw new AlgebricksException(e, ErrorCode.ERROR_PRINTING_PLAN);
+        }
+    }
+
+    @Override
     public Void visitAggregateOperator(AggregateOperator op, Void indent) throws AlgebricksException {
         try {
             jsonGenerator.writeStringField(OPERATOR_FIELD, "aggregate");
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
index 49d4748..c6589b0 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
@@ -352,4 +352,9 @@
                 && rightChild.getExecutionMode().equals(AbstractLogicalOperator.ExecutionMode.UNPARTITIONED);
         return unPartitioned ? StructuralPropertiesVector.EMPTY_PROPERTIES_VECTOR : partitionedPropertiesVector;
     }
+
+    public static boolean isMultiOutputOperator(ILogicalOperator op) {
+        LogicalOperatorTag opTag = op.getOperatorTag();
+        return opTag == LogicalOperatorTag.REPLICATE || opTag == LogicalOperatorTag.SPLIT;
+    }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java
index 0ab21b4..33e27ac 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java
@@ -28,4 +28,5 @@
     public static final int SORT_SAMPLES = 100;
     public static final boolean SORT_PARALLEL = true;
     public static final boolean INDEX_ONLY_DEFAULT = true;
+    public static final boolean SANITYCHECK_DEFAULT = false;
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AbstractRuleController.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AbstractRuleController.java
index 96e15da..4287b26 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AbstractRuleController.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AbstractRuleController.java
@@ -29,17 +29,24 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
 import org.apache.hyracks.algebricks.core.algebra.prettyprint.IPlanPrettyPrinter;
 import org.apache.hyracks.algebricks.core.config.AlgebricksConfig;
+import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.util.LogRedactionUtil;
 
 public abstract class AbstractRuleController {
 
+    protected final boolean isTraceEnabled;
+
+    protected boolean isSanityCheckEnabled;
+
     protected IOptimizationContext context;
 
     public AbstractRuleController() {
+        isTraceEnabled = AlgebricksConfig.ALGEBRICKS_LOGGER.isTraceEnabled();
     }
 
     public void setContext(IOptimizationContext context) {
         this.context = context;
+        this.isSanityCheckEnabled = context.getPhysicalOptimizationConfig().isSanityCheckEnabled();
     }
 
     /**
@@ -63,31 +70,20 @@
         return rewriteOperatorRef(opRef, rule, true, false);
     }
 
-    private String getPlanString(Mutable<ILogicalOperator> opRef) throws AlgebricksException {
-        if (AlgebricksConfig.ALGEBRICKS_LOGGER.isTraceEnabled() && context != null) {
-            IPlanPrettyPrinter prettyPrinter = context.getPrettyPrinter();
-            return prettyPrinter.reset().printOperator((AbstractLogicalOperator) opRef.getValue()).toString();
-        }
-        return null;
-    }
-
-    private void printRuleApplication(IAlgebraicRewriteRule rule, String beforePlan, String afterPlan) {
-        if (AlgebricksConfig.ALGEBRICKS_LOGGER.isTraceEnabled()) {
-            AlgebricksConfig.ALGEBRICKS_LOGGER.trace(">> Rule " + rule.getClass() + " fired.\n");
-            AlgebricksConfig.ALGEBRICKS_LOGGER.trace(">> Before plan\n" + LogRedactionUtil.userData(beforePlan) + "\n");
-            AlgebricksConfig.ALGEBRICKS_LOGGER.trace(">> After plan\n" + LogRedactionUtil.userData(afterPlan) + "\n");
-        }
-    }
-
     protected boolean rewriteOperatorRef(Mutable<ILogicalOperator> opRef, IAlgebraicRewriteRule rule,
             boolean enterNestedPlans, boolean fullDFS) throws AlgebricksException {
 
         String preBeforePlan = getPlanString(opRef);
+        sanityCheckBeforeRewrite(rule, opRef);
         if (rule.rewritePre(opRef, context)) {
             String preAfterPlan = getPlanString(opRef);
-            printRuleApplication(rule, preBeforePlan, preAfterPlan);
+            printRuleApplication(rule, "fired", preBeforePlan, preAfterPlan);
+            sanityCheckAfterRewrite(rule, opRef, true, preBeforePlan);
             return true;
+        } else {
+            sanityCheckAfterRewrite(rule, opRef, false, preBeforePlan);
         }
+
         boolean rewritten = false;
         AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
 
@@ -118,12 +114,80 @@
         }
 
         String postBeforePlan = getPlanString(opRef);
+        sanityCheckBeforeRewrite(rule, opRef);
         if (rule.rewritePost(opRef, context)) {
             String postAfterPlan = getPlanString(opRef);
-            printRuleApplication(rule, postBeforePlan, postAfterPlan);
+            printRuleApplication(rule, "fired", postBeforePlan, postAfterPlan);
+            sanityCheckAfterRewrite(rule, opRef, true, postBeforePlan);
             return true;
+        } else {
+            sanityCheckAfterRewrite(rule, opRef, false, postBeforePlan);
         }
 
         return rewritten;
     }
+
+    private void sanityCheckBeforeRewrite(IAlgebraicRewriteRule rule, Mutable<ILogicalOperator> opRef)
+            throws AlgebricksException {
+        if (isSanityCheckEnabled) {
+            sanityCheckBeforeRewriteImpl(rule, opRef);
+        }
+    }
+
+    private void sanityCheckAfterRewrite(IAlgebraicRewriteRule rule, Mutable<ILogicalOperator> opRef, boolean rewritten,
+            String beforePlan) throws AlgebricksException {
+        if (isSanityCheckEnabled) {
+            sanityCheckAfterRewriteImpl(rule, opRef, rewritten, beforePlan);
+        }
+    }
+
+    private void sanityCheckBeforeRewriteImpl(IAlgebraicRewriteRule rule, Mutable<ILogicalOperator> opRef)
+            throws AlgebricksException {
+        try {
+            context.getPlanStabilityVerifier().recordPlanSignature(opRef);
+        } catch (AlgebricksException e) {
+            throw AlgebricksException.create(ErrorCode.COMPILATION_ILLEGAL_STATE,
+                    String.format("Illegal state before rule %s. %s", rule.getClass().getName(), e.getMessage()));
+        }
+    }
+
+    private void sanityCheckAfterRewriteImpl(IAlgebraicRewriteRule rule, Mutable<ILogicalOperator> opRef,
+            boolean rewritten, String beforePlan) throws AlgebricksException {
+        if (rewritten) {
+            try {
+                context.getPlanStructureVerifier().verifyPlanStructure(opRef);
+            } catch (AlgebricksException e) {
+                throw AlgebricksException.create(ErrorCode.COMPILATION_ILLEGAL_STATE,
+                        String.format("Fired rule %s produced illegal %s", rule.getClass().getName(), e.getMessage()));
+            }
+        } else {
+            try {
+                context.getPlanStabilityVerifier().comparePlanSignature(opRef);
+            } catch (AlgebricksException e) {
+                if (isTraceEnabled) {
+                    printRuleApplication(rule, "not fired, but failed sanity check: " + e.getMessage(), beforePlan,
+                            getPlanString(opRef));
+                }
+                throw AlgebricksException.create(ErrorCode.COMPILATION_ILLEGAL_STATE,
+                        String.format("Non-fired rule %s unexpectedly %s", rule.getClass().getName(), e.getMessage()));
+            }
+        }
+        context.getPlanStabilityVerifier().discardPlanSignature();
+    }
+
+    private String getPlanString(Mutable<ILogicalOperator> opRef) throws AlgebricksException {
+        if (isTraceEnabled && context != null) {
+            IPlanPrettyPrinter prettyPrinter = context.getPrettyPrinter();
+            return prettyPrinter.reset().printOperator((AbstractLogicalOperator) opRef.getValue()).toString();
+        }
+        return null;
+    }
+
+    private void printRuleApplication(IAlgebraicRewriteRule rule, String status, String beforePlan, String afterPlan) {
+        if (isTraceEnabled) {
+            AlgebricksConfig.ALGEBRICKS_LOGGER.trace(">> Rule " + rule.getClass().getName() + " " + status + "\n");
+            AlgebricksConfig.ALGEBRICKS_LOGGER.trace(">> Before plan\n" + LogRedactionUtil.userData(beforePlan) + "\n");
+            AlgebricksConfig.ALGEBRICKS_LOGGER.trace(">> After plan\n" + LogRedactionUtil.userData(afterPlan) + "\n");
+        }
+    }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
index 0fa0023..ecb8390 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
@@ -38,6 +38,8 @@
 import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableEvalSizeEnvironment;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
 import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import org.apache.hyracks.algebricks.core.algebra.plan.PlanStabilityVerifier;
+import org.apache.hyracks.algebricks.core.algebra.plan.PlanStructureVerifier;
 import org.apache.hyracks.algebricks.core.algebra.prettyprint.IPlanPrettyPrinter;
 import org.apache.hyracks.algebricks.core.algebra.properties.DefaultNodeGroupDomain;
 import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
@@ -89,6 +91,8 @@
     private final IPlanPrettyPrinter prettyPrinter;
     private final IConflictingTypeResolver conflictingTypeResovler;
     private final IWarningCollector warningCollector;
+    private final PlanStructureVerifier planStructureVerifier;
+    private final PlanStabilityVerifier planStabilityVerifier;
 
     public AlgebricksOptimizationContext(int varCounter, IExpressionEvalSizeComputer expressionEvalSizeComputer,
             IMergeAggregationExpressionFactory mergeAggregationExpressionFactory,
@@ -106,6 +110,9 @@
         this.prettyPrinter = prettyPrinter;
         this.conflictingTypeResovler = conflictingTypeResovler;
         this.warningCollector = warningCollector;
+        boolean isSanityCheckEnabled = physicalOptimizationConfig.isSanityCheckEnabled();
+        this.planStructureVerifier = isSanityCheckEnabled ? new PlanStructureVerifier(prettyPrinter) : null;
+        this.planStabilityVerifier = isSanityCheckEnabled ? new PlanStabilityVerifier(prettyPrinter) : null;
     }
 
     @Override
@@ -337,4 +344,14 @@
     public IWarningCollector getWarningCollector() {
         return warningCollector;
     }
+
+    @Override
+    public PlanStructureVerifier getPlanStructureVerifier() {
+        return planStructureVerifier;
+    }
+
+    @Override
+    public PlanStabilityVerifier getPlanStabilityVerifier() {
+        return planStabilityVerifier;
+    }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java
index 25d53b0..ab06488 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java
@@ -30,6 +30,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
 import org.apache.hyracks.algebricks.core.config.AlgebricksConfig;
+import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.util.LogRedactionUtil;
 import org.apache.logging.log4j.Level;
 
@@ -58,6 +59,7 @@
         }
 
         logPlanAt("Plan Before Optimization", Level.TRACE);
+        sanityCheckBeforeOptimization(plan);
         runLogicalOptimizationSets(plan, logicalRewrites);
         computeSchemaBottomUpForPlan(plan);
         runPhysicalOptimizationSets(plan, physicalRewrites);
@@ -118,4 +120,17 @@
         }
         runOptimizationSets(plan, optimizationSet);
     }
+
+    private void sanityCheckBeforeOptimization(ILogicalPlan plan) throws AlgebricksException {
+        if (context.getPhysicalOptimizationConfig().isSanityCheckEnabled()) {
+            try {
+                for (Mutable<ILogicalOperator> opRef : plan.getRoots()) {
+                    context.getPlanStructureVerifier().verifyPlanStructure(opRef);
+                }
+            } catch (AlgebricksException e) {
+                throw AlgebricksException.create(ErrorCode.COMPILATION_ILLEGAL_STATE,
+                        String.format("Initial plan contains illegal %s", e.getMessage()));
+            }
+        }
+    }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
index 22eeb23..4f6fee0 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
@@ -40,6 +40,7 @@
     private static final String SORT_PARALLEL = "SORT_PARALLEL";
     private static final String SORT_SAMPLES = "SORT_SAMPLES";
     private static final String INDEX_ONLY = "INDEX_ONLY";
+    private static final String SANITY_CHECK = "SANITY_CHECK";
 
     private Properties properties = new Properties();
 
@@ -181,6 +182,14 @@
         return getBoolean(INDEX_ONLY, AlgebricksConfig.INDEX_ONLY_DEFAULT);
     }
 
+    public void setSanityCheckEnabled(boolean sanityCheck) {
+        setBoolean(SANITY_CHECK, sanityCheck);
+    }
+
+    public boolean isSanityCheckEnabled() {
+        return getBoolean(SANITY_CHECK, AlgebricksConfig.SANITYCHECK_DEFAULT);
+    }
+
     private void setInt(String property, int value) {
         properties.setProperty(property, Integer.toString(value));
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/Substitution.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/Substitution.java
deleted file mode 100644
index 4495314..0000000
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/Substitution.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.core.utils;
-
-public class Substitution<T> {
-    public T substituted;
-    public T substitutedWith;
-
-    public Substitution(T substituted, T substitutedWith) {
-        this.substituted = substituted;
-        this.substitutedWith = substitutedWith;
-    }
-}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/test/java/org/apache/hyracks/algebricks/core/algebra/plan/PlanStabilityVerifierTest.java b/hyracks-fullstack/algebricks/algebricks-core/src/test/java/org/apache/hyracks/algebricks/core/algebra/plan/PlanStabilityVerifierTest.java
new file mode 100644
index 0000000..7502788
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/test/java/org/apache/hyracks/algebricks/core/algebra/plan/PlanStabilityVerifierTest.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.core.algebra.plan;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.junit.Assert;
+import org.junit.Test;
+
+public final class PlanStabilityVerifierTest extends PlanVerifierTestBase {
+
+    final PlanStabilityVerifier verifier = new PlanStabilityVerifier(planPrinter);
+
+    @Test
+    public void testVerifySuccess() throws Exception {
+        Mutable<ILogicalOperator> opRef1 = createSamplePlan1();
+        verifier.recordPlanSignature(opRef1);
+        verifier.comparePlanSignature(opRef1);
+    }
+
+    @Test
+    public void testAddOperator() throws Exception {
+        AssignOperator op1 = newAssign(newVar(), newMutable(ConstantExpression.TRUE));
+        Mutable<ILogicalOperator> opRef1 = newMutable(op1);
+
+        verifier.recordPlanSignature(opRef1);
+
+        AssignOperator op2 = newAssign(newVar(), newMutable(ConstantExpression.FALSE));
+        op1.getInputs().add(newMutable(op2));
+
+        try {
+            verifier.comparePlanSignature(opRef1);
+            Assert.fail("Expected to catch " + AlgebricksException.class.getName());
+        } catch (AlgebricksException e) {
+            Assert.assertTrue(e.getMessage(), e.getMessage().contains("created operator reference"));
+            Assert.assertTrue(e.getMessage(), e.getMessage().contains(printOp(op2)));
+        }
+    }
+
+    @Test
+    public void testAddOperatorInNestedPlan() throws Exception {
+        AssignOperator nestedOp1 = newAssign(newVar(), newMutable(ConstantExpression.TRUE));
+
+        SubplanOperator op = new SubplanOperator(nestedOp1);
+        Mutable<ILogicalOperator> opRef = newMutable(op);
+
+        verifier.recordPlanSignature(opRef);
+
+        AssignOperator nestedOp2 = newAssign(newVar(), newMutable(ConstantExpression.FALSE));
+        nestedOp1.getInputs().add(newMutable(nestedOp2));
+
+        try {
+            verifier.comparePlanSignature(opRef);
+            Assert.fail("Expected to catch " + AlgebricksException.class.getName());
+        } catch (AlgebricksException e) {
+            Assert.assertTrue(e.getMessage(), e.getMessage().contains("created operator reference"));
+            Assert.assertTrue(e.getMessage(), e.getMessage().contains(printOp(nestedOp2)));
+        }
+    }
+
+    @Test
+    public void testRemoveOperator() throws Exception {
+        AssignOperator op1 = newAssign(newVar(), newMutable(ConstantExpression.TRUE));
+        Mutable<ILogicalOperator> opRef1 = newMutable(op1);
+
+        AssignOperator op2 = newAssign(newVar(), newMutable(ConstantExpression.FALSE));
+        op1.getInputs().add(newMutable(op2));
+
+        verifier.recordPlanSignature(opRef1);
+
+        op1.getInputs().clear();
+
+        try {
+            verifier.comparePlanSignature(opRef1);
+            Assert.fail("Expected to catch " + AlgebricksException.class.getName());
+        } catch (AlgebricksException e) {
+            Assert.assertTrue(e.getMessage(), e.getMessage().contains("deleted operator reference"));
+            Assert.assertTrue(e.getMessage(), e.getMessage().contains(printOp(op2)));
+        }
+    }
+
+    @Test
+    public void testRemoveOperatorInNestedPlan() throws Exception {
+        AssignOperator nestedOp1 = newAssign(newVar(), newMutable(ConstantExpression.TRUE));
+
+        AssignOperator nestedOp2 = newAssign(newVar(), newMutable(ConstantExpression.FALSE));
+        nestedOp1.getInputs().add(newMutable(nestedOp2));
+
+        SubplanOperator op = new SubplanOperator(nestedOp1);
+        Mutable<ILogicalOperator> opRef = newMutable(op);
+
+        verifier.recordPlanSignature(opRef);
+
+        nestedOp1.getInputs().clear();
+
+        try {
+            verifier.comparePlanSignature(opRef);
+            Assert.fail("Expected to catch " + AlgebricksException.class.getName());
+        } catch (AlgebricksException e) {
+            Assert.assertTrue(e.getMessage(), e.getMessage().contains("deleted operator reference"));
+            Assert.assertTrue(e.getMessage(), e.getMessage().contains(printOp(nestedOp2)));
+        }
+    }
+
+    @Test
+    public void testReplaceOperator() throws Exception {
+        AssignOperator op1 = newAssign(newVar(), newMutable(ConstantExpression.TRUE));
+        Mutable<ILogicalOperator> opRef1 = newMutable(op1);
+
+        AssignOperator op2 = newAssign(newVar(), newMutable(ConstantExpression.FALSE));
+        op1.getInputs().add(newMutable(op2));
+
+        verifier.recordPlanSignature(opRef1);
+
+        AssignOperator op3 = newAssign(newVar(), newMutable(ConstantExpression.TRUE));
+        op1.getInputs().clear();
+        op1.getInputs().add(newMutable(op3));
+
+        try {
+            verifier.comparePlanSignature(opRef1);
+            Assert.fail("Expected to catch " + AlgebricksException.class.getName());
+        } catch (AlgebricksException e) {
+            Assert.assertTrue(e.getMessage(), e.getMessage().contains("created operator reference"));
+            Assert.assertTrue(e.getMessage(), e.getMessage().contains(printOp(op3)));
+        }
+    }
+
+    @Test
+    public void testReplaceOperatorInNestedPlan() throws Exception {
+        AssignOperator nestedOp1 = newAssign(newVar(), newMutable(ConstantExpression.TRUE));
+
+        AssignOperator nestedOp2 = newAssign(newVar(), newMutable(ConstantExpression.FALSE));
+        nestedOp1.getInputs().add(newMutable(nestedOp2));
+
+        SubplanOperator op = new SubplanOperator(nestedOp1);
+        Mutable<ILogicalOperator> opRef = newMutable(op);
+
+        verifier.recordPlanSignature(opRef);
+
+        AssignOperator nestedOp3 = newAssign(newVar(), newMutable(ConstantExpression.TRUE));
+        nestedOp1.getInputs().clear();
+        nestedOp1.getInputs().add(newMutable(nestedOp3));
+
+        try {
+            verifier.comparePlanSignature(opRef);
+            Assert.fail("Expected to catch " + AlgebricksException.class.getName());
+        } catch (AlgebricksException e) {
+            Assert.assertTrue(e.getMessage(), e.getMessage().contains("created operator reference"));
+            Assert.assertTrue(e.getMessage(), e.getMessage().contains(printOp(nestedOp3)));
+        }
+    }
+
+    @Test
+    public void testAddExpression() throws Exception {
+        AssignOperator op = newAssign(newVar(), newMutable(ConstantExpression.TRUE));
+        Mutable<ILogicalOperator> opRef = newMutable(op);
+
+        verifier.recordPlanSignature(opRef);
+
+        op.getVariables().add(newVar());
+        LogicalVariable testVar = newVar();
+        op.getExpressions().add(newMutable(newVarRef(testVar)));
+
+        try {
+            verifier.comparePlanSignature(opRef);
+            Assert.fail("Expected to catch " + AlgebricksException.class.getName());
+        } catch (AlgebricksException e) {
+            Assert.assertTrue(e.getMessage(), e.getMessage().contains("created expression reference"));
+            Assert.assertTrue(e.getMessage(), e.getMessage().contains(printVar(testVar)));
+        }
+    }
+
+    @Test
+    public void testRemoveExpression() throws Exception {
+        AssignOperator op = newAssign(newVar(), newMutable(ConstantExpression.TRUE));
+        op.getVariables().add(newVar());
+        LogicalVariable testVar = newVar();
+        op.getExpressions().add(newMutable(newVarRef(testVar)));
+
+        Mutable<ILogicalOperator> opRef = newMutable(op);
+
+        verifier.recordPlanSignature(opRef);
+
+        op.getVariables().remove(1);
+        op.getExpressions().remove(1);
+
+        try {
+            verifier.comparePlanSignature(opRef);
+            Assert.fail("Expected to catch " + AlgebricksException.class.getName());
+        } catch (AlgebricksException e) {
+            Assert.assertTrue(e.getMessage(), e.getMessage().contains("deleted expression reference"));
+            Assert.assertTrue(e.getMessage(), e.getMessage().contains(printVar(testVar)));
+        }
+    }
+
+    @Test
+    public void testReplaceExpression() throws Exception {
+        AssignOperator op = newAssign(newVar(), newMutable(ConstantExpression.TRUE));
+        Mutable<ILogicalOperator> opRef = newMutable(op);
+
+        verifier.recordPlanSignature(opRef);
+
+        LogicalVariable testVar = newVar();
+        op.getExpressions().get(0).setValue(newVarRef(testVar));
+
+        try {
+            verifier.comparePlanSignature(opRef);
+            Assert.fail("Expected to catch " + AlgebricksException.class.getName());
+        } catch (AlgebricksException e) {
+            Assert.assertEquals(String.format("created expression instance (%s)", printVar(testVar)), e.getMessage());
+        }
+    }
+
+    @Test
+    public void testCycleBeforeRecord() {
+        AssignOperator op1 = newAssign(newVar(), newMutable(ConstantExpression.TRUE));
+        Mutable<ILogicalOperator> opRef1 = newMutable(op1);
+
+        AssignOperator op2 = newAssign(newVar(), newMutable(ConstantExpression.FALSE));
+        Mutable<ILogicalOperator> opRef2 = newMutable(op2);
+
+        op1.getInputs().add(opRef2);
+        op2.getInputs().add(opRef1);
+
+        try {
+            verifier.recordPlanSignature(opRef1);
+        } catch (AlgebricksException e) {
+            Assert.assertTrue(e.getMessage(), e.getMessage().contains("cycle"));
+        }
+    }
+
+    @Test
+    public void testCycleBeforeCompare() throws Exception {
+        AssignOperator op1 = newAssign(newVar(), newMutable(ConstantExpression.TRUE));
+        Mutable<ILogicalOperator> opRef1 = newMutable(op1);
+
+        AssignOperator op2 = newAssign(newVar(), newMutable(ConstantExpression.FALSE));
+        Mutable<ILogicalOperator> opRef2 = newMutable(op2);
+
+        op1.getInputs().add(opRef2);
+
+        verifier.recordPlanSignature(opRef1);
+
+        op2.getInputs().add(new MutableObject<>(op1));
+
+        try {
+            verifier.comparePlanSignature(opRef1);
+        } catch (AlgebricksException e) {
+            Assert.assertTrue(e.getMessage(), e.getMessage().contains("created operator reference"));
+        }
+    }
+
+    @Test
+    public void testApiDiscard() throws Exception {
+        AssignOperator op = newAssign(newVar(), newMutable(newVarRef(newVar())));
+        Mutable<ILogicalOperator> opRef = newMutable(op);
+
+        verifier.recordPlanSignature(opRef);
+        verifier.comparePlanSignature(opRef);
+
+        verifier.discardPlanSignature();
+
+        try {
+            verifier.comparePlanSignature(opRef);
+            Assert.fail("Expected to catch " + IllegalStateException.class.getName());
+        } catch (IllegalStateException e) {
+            // expected
+        }
+    }
+
+    @Test
+    public void testApiRecordAfterRecordFailure() throws Exception {
+        AssignOperator op = newAssign(newVar(), newMutable(newVarRef(newVar())));
+        Mutable<ILogicalOperator> opRef = newMutable(op);
+
+        verifier.recordPlanSignature(opRef);
+        verifier.recordPlanSignature(opRef); // ok. the previously recorded state is discarded
+        verifier.comparePlanSignature(opRef);
+    }
+
+    @Test
+    public void testApiCompareWithoutRecordFailure() throws Exception {
+        AssignOperator op = newAssign(newVar(), newMutable(newVarRef(newVar())));
+        Mutable<ILogicalOperator> opRef = newMutable(op);
+
+        try {
+            verifier.comparePlanSignature(opRef);
+            Assert.fail("Expected to catch " + IllegalStateException.class.getName());
+        } catch (IllegalStateException e) {
+            // expected
+        }
+    }
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/test/java/org/apache/hyracks/algebricks/core/algebra/plan/PlanStructureVerifierTest.java b/hyracks-fullstack/algebricks/algebricks-core/src/test/java/org/apache/hyracks/algebricks/core/algebra/plan/PlanStructureVerifierTest.java
new file mode 100644
index 0000000..fa4061d
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/test/java/org/apache/hyracks/algebricks/core/algebra/plan/PlanStructureVerifierTest.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.core.algebra.plan;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import org.junit.Assert;
+import org.junit.Test;
+
+public final class PlanStructureVerifierTest extends PlanVerifierTestBase {
+
+    final PlanStructureVerifier verifier = new PlanStructureVerifier(planPrinter);
+
+    @Test
+    public void testVerifySuccess() throws Exception {
+        Mutable<ILogicalOperator> opRef1 = createSamplePlan1();
+        verifier.verifyPlanStructure(opRef1);
+    }
+
+    @Test
+    public void testSharedExpressionReferenceInSameOp() {
+        LogicalVariable v1 = newVar();
+        AssignOperator op1 = newAssign(v1, newMutable(ConstantExpression.TRUE));
+
+        Mutable<ILogicalExpression> v1Ref = newMutable(newVarRef(v1));
+
+        AssignOperator op2 = newAssign(newVar(), v1Ref, newVar(), v1Ref);
+        op2.getInputs().add(newMutable(op1));
+
+        try {
+            verifier.verifyPlanStructure(newMutable(op2));
+            Assert.fail("Expected to catch " + AlgebricksException.class.getName());
+        } catch (AlgebricksException e) {
+            Assert.assertTrue(e.getMessage(), e.getMessage().contains("shared expression reference"));
+            Assert.assertTrue(e.getMessage(), e.getMessage().contains("to (" + printVar(v1) + ")"));
+            Assert.assertTrue(e.getMessage(), e.getMessage().contains("in " + printOp(op2)));
+        }
+    }
+
+    @Test
+    public void testSharedExpressionInSameOp() {
+        LogicalVariable v1 = newVar();
+        AssignOperator op1 = newAssign(v1, newMutable(ConstantExpression.TRUE));
+
+        ILogicalExpression v1Ref = newVarRef(v1);
+
+        AssignOperator op2 = newAssign(newVar(), newMutable(v1Ref), newVar(), newMutable(v1Ref));
+        op2.getInputs().add(newMutable(op1));
+
+        try {
+            verifier.verifyPlanStructure(newMutable(op2));
+            Assert.fail("Expected to catch " + AlgebricksException.class.getName());
+        } catch (AlgebricksException e) {
+            Assert.assertTrue(e.getMessage(), e.getMessage().contains("shared expression instance"));
+            Assert.assertTrue(e.getMessage(), e.getMessage().contains("(" + printVar(v1) + ")"));
+            Assert.assertTrue(e.getMessage(), e.getMessage().contains("in " + printOp(op2)));
+        }
+    }
+
+    @Test
+    public void testSharedExpressionReferenceBetweenOps() {
+        LogicalVariable v1 = newVar();
+        AssignOperator op1 = newAssign(v1, newMutable(ConstantExpression.TRUE));
+
+        Mutable<ILogicalExpression> v1Ref = newMutable(newVarRef(v1));
+
+        AssignOperator op2 = newAssign(newVar(), v1Ref);
+        op2.getInputs().add(newMutable(op1));
+
+        AssignOperator op3 = newAssign(newVar(), v1Ref);
+        op3.getInputs().add(newMutable(op2));
+
+        try {
+            verifier.verifyPlanStructure(newMutable(op3));
+            Assert.fail("Expected to catch " + AlgebricksException.class.getName());
+        } catch (AlgebricksException e) {
+            Assert.assertTrue(e.getMessage(), e.getMessage().contains("shared expression reference"));
+            Assert.assertTrue(e.getMessage(), e.getMessage().contains("to (" + printVar(v1) + ")"));
+            Assert.assertTrue(e.getMessage(), e.getMessage().contains(printOp(op2)));
+            Assert.assertTrue(e.getMessage(), e.getMessage().contains(printOp(op3)));
+        }
+    }
+
+    @Test
+    public void testSharedExpressionBetweenOps() {
+        LogicalVariable v1 = newVar();
+        AssignOperator op1 = newAssign(v1, newMutable(ConstantExpression.TRUE));
+
+        ILogicalExpression v1Ref = newVarRef(v1);
+
+        AssignOperator op2 = newAssign(newVar(), newMutable(v1Ref));
+        op2.getInputs().add(newMutable(op1));
+
+        AssignOperator op3 = newAssign(newVar(), newMutable(v1Ref));
+        op3.getInputs().add(newMutable(op2));
+
+        try {
+            verifier.verifyPlanStructure(newMutable(op3));
+            Assert.fail("Expected to catch " + AlgebricksException.class.getName());
+        } catch (AlgebricksException e) {
+            Assert.assertTrue(e.getMessage(), e.getMessage().contains("shared expression instance"));
+            Assert.assertTrue(e.getMessage(), e.getMessage().contains("(" + printVar(v1) + ")"));
+            Assert.assertTrue(e.getMessage(), e.getMessage().contains(printOp(op2)));
+            Assert.assertTrue(e.getMessage(), e.getMessage().contains(printOp(op3)));
+        }
+    }
+
+    @Test
+    public void testSharedOperatorReferenceInSameOp() {
+        AssignOperator op1 = newAssign(newVar(), newMutable(ConstantExpression.FALSE));
+        Mutable<ILogicalOperator> opRef1 = newMutable(op1);
+
+        InnerJoinOperator op2 = new InnerJoinOperator(newMutable(ConstantExpression.TRUE));
+        op2.getInputs().add(opRef1);
+        op2.getInputs().add(opRef1);
+
+        try {
+            verifier.verifyPlanStructure(newMutable(op2));
+            Assert.fail("Expected to catch " + AlgebricksException.class.getName());
+        } catch (AlgebricksException e) {
+            Assert.assertTrue(e.getMessage(), e.getMessage().contains("shared operator reference"));
+            Assert.assertTrue(e.getMessage(), e.getMessage().contains("to (" + printOp(op1) + ")"));
+            Assert.assertTrue(e.getMessage(), e.getMessage().contains("in " + printOp(op2)));
+        }
+    }
+
+    @Test
+    public void testSharedOperatorInSameOp() {
+        AssignOperator op1 = newAssign(newVar(), newMutable(ConstantExpression.FALSE));
+
+        InnerJoinOperator op2 = new InnerJoinOperator(newMutable(ConstantExpression.TRUE));
+        op2.getInputs().add(newMutable(op1));
+        op2.getInputs().add(newMutable(op1));
+
+        try {
+            verifier.verifyPlanStructure(newMutable(op2));
+            Assert.fail("Expected to catch " + AlgebricksException.class.getName());
+        } catch (AlgebricksException e) {
+            Assert.assertTrue(e.getMessage(), e.getMessage().contains("shared operator instance"));
+            Assert.assertTrue(e.getMessage(), e.getMessage().contains("(" + printOp(op1) + ")"));
+            Assert.assertTrue(e.getMessage(), e.getMessage().contains("in " + printOp(op2)));
+        }
+    }
+
+    @Test
+    public void testSharedOperatorReferenceBetweenOps() {
+        InnerJoinOperator op1 = new InnerJoinOperator(newMutable(ConstantExpression.TRUE));
+
+        InnerJoinOperator op2 = new InnerJoinOperator(newMutable(ConstantExpression.TRUE));
+        op1.getInputs().add(newMutable(op2));
+
+        InnerJoinOperator op3 = new InnerJoinOperator(newMutable(ConstantExpression.FALSE));
+        op1.getInputs().add(newMutable(op3));
+
+        AssignOperator op4 = newAssign(newVar(), newMutable(ConstantExpression.FALSE));
+        Mutable<ILogicalOperator> opRef4 = newMutable(op4);
+
+        op2.getInputs().add(opRef4);
+        op2.getInputs().add(newMutable(newAssign(newVar(), newMutable(ConstantExpression.MISSING))));
+
+        op3.getInputs().add(opRef4);
+        op3.getInputs().add(newMutable(newAssign(newVar(), newMutable(ConstantExpression.MISSING))));
+
+        try {
+            verifier.verifyPlanStructure(newMutable(op1));
+            Assert.fail("Expected to catch " + AlgebricksException.class.getName());
+        } catch (AlgebricksException e) {
+            Assert.assertTrue(e.getMessage(), e.getMessage().contains("shared operator reference"));
+            Assert.assertTrue(e.getMessage(), e.getMessage().contains("to (" + printOp(op4) + ")"));
+            Assert.assertTrue(e.getMessage(), e.getMessage().contains(printOp(op2)));
+            Assert.assertTrue(e.getMessage(), e.getMessage().contains(printOp(op3)));
+        }
+    }
+
+    @Test
+    public void testSharedOperatorBetweenOps() {
+        InnerJoinOperator op1 = new InnerJoinOperator(newMutable(ConstantExpression.TRUE));
+
+        InnerJoinOperator op2 = new InnerJoinOperator(newMutable(ConstantExpression.TRUE));
+        op1.getInputs().add(newMutable(op2));
+
+        InnerJoinOperator op3 = new InnerJoinOperator(newMutable(ConstantExpression.FALSE));
+        op1.getInputs().add(newMutable(op3));
+
+        AssignOperator op4 = newAssign(newVar(), newMutable(ConstantExpression.FALSE));
+
+        op2.getInputs().add(newMutable(op4));
+        op2.getInputs().add(newMutable(newAssign(newVar(), newMutable(ConstantExpression.MISSING))));
+
+        op3.getInputs().add(newMutable(op4));
+        op3.getInputs().add(newMutable(newAssign(newVar(), newMutable(ConstantExpression.MISSING))));
+
+        try {
+            verifier.verifyPlanStructure(newMutable(op1));
+            Assert.fail("Expected to catch " + AlgebricksException.class.getName());
+        } catch (AlgebricksException e) {
+            Assert.assertTrue(e.getMessage(), e.getMessage().contains("shared operator instance"));
+            Assert.assertTrue(e.getMessage(), e.getMessage().contains("(" + printOp(op4) + ")"));
+            Assert.assertTrue(e.getMessage(), e.getMessage().contains(printOp(op2)));
+            Assert.assertTrue(e.getMessage(), e.getMessage().contains(printOp(op3)));
+        }
+    }
+
+    @Test
+    public void testCycle() {
+        AssignOperator op1 = newAssign(newVar(), newMutable(ConstantExpression.TRUE));
+        Mutable<ILogicalOperator> opRef1 = newMutable(op1);
+
+        AssignOperator op2 = newAssign(newVar(), newMutable(ConstantExpression.FALSE));
+        Mutable<ILogicalOperator> opRef2 = newMutable(op2);
+
+        op1.getInputs().add(opRef2);
+        op2.getInputs().add(opRef1);
+
+        try {
+            verifier.verifyPlanStructure(opRef1);
+            Assert.fail("Expected to catch " + AlgebricksException.class.getName());
+        } catch (AlgebricksException e) {
+            Assert.assertTrue(e.getMessage(), e.getMessage().contains("cycle"));
+        }
+    }
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/test/java/org/apache/hyracks/algebricks/core/algebra/plan/PlanVerifierTestBase.java b/hyracks-fullstack/algebricks/algebricks-core/src/test/java/org/apache/hyracks/algebricks/core/algebra/plan/PlanVerifierTestBase.java
new file mode 100644
index 0000000..7bfbaa9
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/test/java/org/apache/hyracks/algebricks/core/algebra/plan/PlanVerifierTestBase.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.core.algebra.plan;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.prettyprint.IPlanPrettyPrinter;
+import org.apache.hyracks.algebricks.core.algebra.prettyprint.PlanPrettyPrinter;
+
+public abstract class PlanVerifierTestBase {
+    final IPlanPrettyPrinter planPrinter = PlanPrettyPrinter.createStringPlanPrettyPrinter();
+
+    int varCounter;
+
+    EmptyTupleSourceOperator newETS() {
+        return new EmptyTupleSourceOperator();
+    }
+
+    AssignOperator newAssign(LogicalVariable var, Mutable<ILogicalExpression> exprRef) {
+        return new AssignOperator(var, exprRef);
+    }
+
+    AssignOperator newAssign(LogicalVariable var1, Mutable<ILogicalExpression> exprRef1, LogicalVariable var2,
+            Mutable<ILogicalExpression> exprRef2) {
+        AssignOperator op = new AssignOperator(var1, exprRef1);
+        op.getVariables().add(var2);
+        op.getExpressions().add(exprRef2);
+        return op;
+    }
+
+    <T> Mutable<T> newMutable(T item) {
+        return new MutableObject<>(item);
+    }
+
+    VariableReferenceExpression newVarRef(LogicalVariable var) {
+        return new VariableReferenceExpression(var);
+    }
+
+    LogicalVariable newVar() {
+        return new LogicalVariable(varCounter++);
+    }
+
+    String printOp(ILogicalOperator op) {
+        try {
+            return planPrinter.reset().printOperator((AbstractLogicalOperator) op, false).toString();
+        } catch (AlgebricksException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    String printExpr(ILogicalExpression expr) {
+        try {
+            return planPrinter.reset().printExpression(expr).toString();
+        } catch (AlgebricksException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    String printVar(LogicalVariable var) {
+        return printExpr(newVarRef(var));
+    }
+
+    Mutable<ILogicalOperator> createSamplePlan1() {
+        AssignOperator op1 = newAssign(newVar(), newMutable(ConstantExpression.TRUE));
+
+        SubplanOperator op2 = new SubplanOperator(newAssign(newVar(), newMutable(ConstantExpression.TRUE)));
+        op1.getInputs().add(newMutable(op2));
+
+        InnerJoinOperator op3 = new InnerJoinOperator(newMutable(ConstantExpression.TRUE));
+        op2.getInputs().add(newMutable(op3));
+
+        AssignOperator op4 = new AssignOperator(newVar(), newMutable(ConstantExpression.TRUE));
+        op3.getInputs().add(newMutable(op4));
+
+        AssignOperator op5 = new AssignOperator(newVar(), newMutable(ConstantExpression.FALSE));
+        op3.getInputs().add(newMutable(op5));
+
+        ReplicateOperator op6 = new ReplicateOperator(2);
+
+        op4.getInputs().add(newMutable(op6));
+        op6.getOutputs().add(newMutable(op4));
+
+        op5.getInputs().add(newMutable(op6));
+        op6.getOutputs().add(newMutable(op5));
+
+        op6.getInputs().add(newMutable(newETS()));
+
+        return newMutable(op1);
+    }
+}