diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceLSMComponentFilterRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceLSMComponentFilterRule.java
index bd5b7c4..42af276 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceLSMComponentFilterRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceLSMComponentFilterRule.java
@@ -320,16 +320,13 @@
 
     private IntersectOperator createIntersectWithFilter(List<LogicalVariable> outputFilterVars,
             List<List<LogicalVariable>> filterVars, IntersectOperator intersect) throws AlgebricksException {
-        List<LogicalVariable> outputVars = new ArrayList<>();
-        outputVars.addAll(intersect.getOutputVars());
-        outputVars.addAll(outputFilterVars);
-
-        List<List<LogicalVariable>> compareVars = new ArrayList<>(intersect.getNumInput());
-        for (int i = 0; i < intersect.getNumInput(); i++) {
-            compareVars.add(new ArrayList<>(intersect.getCompareVariables(i)));
+        int nInput = intersect.getNumInput();
+        List<List<LogicalVariable>> inputCompareVars = new ArrayList<>(nInput);
+        for (int i = 0; i < nInput; i++) {
+            inputCompareVars.add(new ArrayList<>(intersect.getInputCompareVariables(i)));
         }
-
-        IntersectOperator intersectWithFilter = new IntersectOperator(outputVars, compareVars, filterVars);
+        IntersectOperator intersectWithFilter = new IntersectOperator(intersect.getOutputCompareVariables(),
+                outputFilterVars, inputCompareVars, filterVars);
         intersectWithFilter.setSourceLocation(intersect.getSourceLocation());
         intersectWithFilter.getInputs().addAll(intersect.getInputs());
         return intersectWithFilter;
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java
index f21eb42..78c4c5e 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java
@@ -573,15 +573,17 @@
     @Override
     public ILogicalOperator visitIntersectOperator(IntersectOperator op, Void arg) throws AlgebricksException {
         visitMultiInputOperator(op);
-        List<LogicalVariable> outputVars = op.getOutputVars();
-        for (int i = 0; i < op.getNumInput(); i++) {
-            List<LogicalVariable> inputVars = op.getInputVariables(i);
-            if (inputVars.size() != outputVars.size()) {
-                throw new CompilationException(ErrorCode.COMPILATION_ERROR, op.getSourceLocation(),
-                        "The cardinality of input and output are not equal for Intersection");
-            }
-            for (int j = 0; j < inputVars.size(); j++) {
-                updateInputToOutputVarMapping(inputVars.get(j), outputVars.get(j), false);
+
+        int nInput = op.getNumInput();
+        boolean hasExtraVars = op.hasExtraVariables();
+        List<LogicalVariable> outputCompareVars = op.getOutputCompareVariables();
+        List<LogicalVariable> outputExtraVars = op.getOutputExtraVariables();
+        for (int i = 0; i < nInput; i++) {
+            updateInputToOutputVarMappings(op.getInputCompareVariables(i), outputCompareVars, false,
+                    op.getSourceLocation());
+            if (hasExtraVars) {
+                updateInputToOutputVarMappings(op.getInputExtraVariables(i), outputExtraVars, false,
+                        op.getSourceLocation());
             }
         }
         return op;
@@ -764,6 +766,18 @@
         }
     }
 
+    private void updateInputToOutputVarMappings(List<LogicalVariable> oldVarList, List<LogicalVariable> newVarList,
+            boolean inNts, SourceLocation sourceLoc) throws CompilationException {
+        int oldVarCount = oldVarList.size();
+        if (oldVarCount != newVarList.size()) {
+            throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc,
+                    "The cardinality of input and output are not equal");
+        }
+        for (int i = 0; i < oldVarCount; i++) {
+            updateInputToOutputVarMapping(oldVarList.get(i), newVarList.get(i), inNts);
+        }
+    }
+
     private void addPrimaryKeys(Map<LogicalVariable, LogicalVariable> varMap) {
         for (Entry<LogicalVariable, LogicalVariable> entry : varMap.entrySet()) {
             List<LogicalVariable> dependencyVars = context.findPrimaryKey(entry.getKey());
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IntersectOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IntersectOperator.java
index 4af2d86..36d2db5 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IntersectOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IntersectOperator.java
@@ -20,9 +20,10 @@
 package org.apache.hyracks.algebricks.core.algebra.operators.logical;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
-import java.util.stream.Collectors;
 
+import org.apache.commons.collections4.ListUtils;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
@@ -38,41 +39,52 @@
 
 public class IntersectOperator extends AbstractLogicalOperator {
 
-    private final List<List<LogicalVariable>> inputVars;
-    private final List<List<LogicalVariable>> compareVars;
-    private final List<LogicalVariable> outputVars;
-    private List<List<LogicalVariable>> extraVars;
+    private final List<LogicalVariable> outputCompareVars;
+    private final List<List<LogicalVariable>> inputCompareVars;
 
-    public IntersectOperator(List<LogicalVariable> outputVars, List<List<LogicalVariable>> compareVars)
+    private final List<LogicalVariable> outputExtraVars;
+    private final List<List<LogicalVariable>> inputExtraVars;
+
+    public IntersectOperator(List<LogicalVariable> outputCompareVars, List<List<LogicalVariable>> inputCompareVars)
             throws AlgebricksException {
-        this(outputVars, compareVars,
-                compareVars.stream().map(vars -> new ArrayList<LogicalVariable>()).collect(Collectors.toList()));
+        this(outputCompareVars, Collections.emptyList(), inputCompareVars, Collections.emptyList());
     }
 
-    public IntersectOperator(List<LogicalVariable> outputVars, List<List<LogicalVariable>> compareVars,
-            List<List<LogicalVariable>> extraVars) throws AlgebricksException {
-        int numCompareFields = compareVars.get(0).size();
-        if (compareVars.stream().anyMatch(vlist -> vlist.size() != numCompareFields)) {
-            throw AlgebricksException.create(ErrorCode.INVALID_INPUT_PARAMETER);
+    public IntersectOperator(List<LogicalVariable> outputCompareVars, List<LogicalVariable> outputExtraVars,
+            List<List<LogicalVariable>> inputCompareVars, List<List<LogicalVariable>> inputExtraVars)
+            throws AlgebricksException {
+        int numCompareVars = outputCompareVars.size();
+        for (List<LogicalVariable> vars : inputCompareVars) {
+            if (vars.size() != numCompareVars) {
+                throw AlgebricksException.create(ErrorCode.INVALID_INPUT_PARAMETER);
+            }
         }
-        int numExtraFields = extraVars.get(0).size();
-        if (extraVars.stream().anyMatch(vlist -> vlist.size() != numExtraFields)) {
-            throw AlgebricksException.create(ErrorCode.INVALID_INPUT_PARAMETER);
-        }
-        if (outputVars.size() != numCompareFields + numExtraFields) {
-            throw AlgebricksException.create(ErrorCode.INVALID_INPUT_PARAMETER);
+        if (outputExtraVars == null || outputExtraVars.isEmpty()) {
+            if (inputExtraVars != null && !inputExtraVars.isEmpty()) {
+                throw AlgebricksException.create(ErrorCode.INVALID_INPUT_PARAMETER);
+            }
+        } else {
+            if (inputExtraVars == null || inputExtraVars.isEmpty()) {
+                throw AlgebricksException.create(ErrorCode.INVALID_INPUT_PARAMETER);
+            }
+            int numExtraVars = outputExtraVars.size();
+            for (List<LogicalVariable> vars : inputExtraVars) {
+                if (vars.size() != numExtraVars) {
+                    throw AlgebricksException.create(ErrorCode.INVALID_INPUT_PARAMETER);
+                }
+            }
         }
 
-        this.outputVars = new ArrayList<>(outputVars);
-        this.compareVars = new ArrayList<>(compareVars);
-        this.inputVars = new ArrayList<>(compareVars.size());
-        for (List<LogicalVariable> vars : compareVars) {
-            this.inputVars.add(new ArrayList<>(vars));
+        this.outputCompareVars = new ArrayList<>(outputCompareVars);
+        this.inputCompareVars = new ArrayList<>(inputCompareVars);
+        this.outputExtraVars = new ArrayList<>();
+        if (outputExtraVars != null) {
+            this.outputExtraVars.addAll(outputExtraVars);
         }
-        for (int i = 0; i < extraVars.size(); i++) {
-            this.inputVars.get(i).addAll(extraVars.get(i));
+        this.inputExtraVars = new ArrayList<>();
+        if (inputExtraVars != null) {
+            this.inputExtraVars.addAll(inputExtraVars);
         }
-        this.extraVars = extraVars;
     }
 
     @Override
@@ -81,8 +93,8 @@
     }
 
     @Override
-    public void recomputeSchema() throws AlgebricksException {
-        schema = outputVars;
+    public void recomputeSchema() {
+        schema = concatOutputVariables();
     }
 
     @Override
@@ -103,52 +115,59 @@
 
     @Override
     public VariablePropagationPolicy getVariablePropagationPolicy() {
-        return new FilteredVariablePropagationPolicy(outputVars);
+        return new FilteredVariablePropagationPolicy(concatOutputVariables());
     }
 
     @Override
     public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
         IVariableTypeEnvironment typeEnv = ctx.getOutputTypeEnvironment(inputs.get(0).getValue());
 
-        List<LogicalVariable> compareVars0 = compareVars.get(0);
-        for (int i = 1; i < inputs.size(); i++) {
-            checkTypeConsistency(typeEnv, compareVars0, ctx.getOutputTypeEnvironment(inputs.get(i).getValue()),
-                    compareVars.get(i));
+        List<LogicalVariable> inputCompareVars0 = inputCompareVars.get(0);
+        for (int i = 1, n = inputs.size(); i < n; i++) {
+            checkTypeConsistency(typeEnv, inputCompareVars0, ctx.getOutputTypeEnvironment(inputs.get(i).getValue()),
+                    inputCompareVars.get(i));
         }
 
         IVariableTypeEnvironment env =
                 new NonPropagatingTypeEnvironment(ctx.getExpressionTypeComputer(), ctx.getMetadataProvider());
-        int i = 0;
-        for (; i < compareVars0.size(); i++) {
-            env.setVarType(outputVars.get(i), typeEnv.getVarType(compareVars0.get(i)));
+        for (int i = 0, n = outputCompareVars.size(); i < n; i++) {
+            env.setVarType(outputCompareVars.get(i), typeEnv.getVarType(inputCompareVars0.get(i)));
         }
-        if (extraVars != null) {
-            List<LogicalVariable> extraVars0 = extraVars.get(0);
-            for (int k = 0; k < extraVars0.size(); k++) {
-                env.setVarType(outputVars.get(i + k), typeEnv.getVarType(extraVars0.get(k)));
+        if (hasExtraVariables()) {
+            List<LogicalVariable> inputExtraVars0 = inputExtraVars.get(0);
+            for (int i = 0, n = outputExtraVars.size(); i < n; i++) {
+                env.setVarType(outputExtraVars.get(i), typeEnv.getVarType(inputExtraVars0.get(i)));
             }
         }
         return env;
     }
 
-    public List<LogicalVariable> getOutputVars() {
-        return outputVars;
-    }
-
     public int getNumInput() {
-        return compareVars.size();
+        return inputCompareVars.size();
     }
 
-    public List<LogicalVariable> getCompareVariables(int inputIndex) {
-        return compareVars.get(inputIndex);
+    public boolean hasExtraVariables() {
+        return !outputExtraVars.isEmpty();
     }
 
-    public List<List<LogicalVariable>> getExtraVariables() {
-        return extraVars;
+    public List<LogicalVariable> getInputCompareVariables(int inputIndex) {
+        return inputCompareVars.get(inputIndex);
     }
 
-    public List<LogicalVariable> getInputVariables(int inputIndex) {
-        return this.inputVars.get(inputIndex);
+    public List<LogicalVariable> getInputExtraVariables(int inputIndex) {
+        return inputExtraVars.get(inputIndex);
+    }
+
+    public List<LogicalVariable> getOutputCompareVariables() {
+        return outputCompareVars;
+    }
+
+    public List<LogicalVariable> getOutputExtraVariables() {
+        return outputExtraVars;
+    }
+
+    private List<LogicalVariable> concatOutputVariables() {
+        return ListUtils.union(outputCompareVars, outputExtraVars);
     }
 
     private void checkTypeConsistency(IVariableTypeEnvironment expected, List<LogicalVariable> expectedVariables,
@@ -165,5 +184,4 @@
             }
         }
     }
-
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
index 3a1d085..7b3f53a 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
@@ -371,20 +371,35 @@
             return Boolean.FALSE;
         }
         IntersectOperator intersetOpArg = (IntersectOperator) copyAndSubstituteVar(op, arg);
-        List<LogicalVariable> variables = op.getOutputVars();
-        List<LogicalVariable> variablesArg = intersetOpArg.getOutputVars();
-        if (variables.size() != variablesArg.size()) {
+        List<LogicalVariable> outputCompareVars = op.getOutputCompareVariables();
+        List<LogicalVariable> outputCompareVarsArg = intersetOpArg.getOutputCompareVariables();
+        if (outputCompareVars.size() != outputCompareVarsArg.size()) {
             return Boolean.FALSE;
         }
-        if (!VariableUtilities.varListEqualUnordered(variables, variablesArg)) {
+        if (!VariableUtilities.varListEqualUnordered(outputCompareVars, outputCompareVarsArg)) {
+            return Boolean.FALSE;
+        }
+        boolean hasExtraVars = op.hasExtraVariables();
+        List<LogicalVariable> outputExtraVars = op.getOutputExtraVariables();
+        List<LogicalVariable> outputExtraVarsArg = intersetOpArg.getOutputExtraVariables();
+        if (outputExtraVars.size() != outputExtraVarsArg.size()) {
+            return Boolean.FALSE;
+        }
+        if (!VariableUtilities.varListEqualUnordered(outputExtraVars, outputExtraVarsArg)) {
             return Boolean.FALSE;
         }
 
-        if (op.getNumInput() != intersetOpArg.getNumInput()) {
+        int nInput = op.getNumInput();
+        if (nInput != intersetOpArg.getNumInput()) {
             return Boolean.FALSE;
         }
-        for (int i = 0; i < op.getNumInput(); i++) {
-            if (!VariableUtilities.varListEqualUnordered(op.getInputVariables(i), intersetOpArg.getInputVariables(i))) {
+        for (int i = 0; i < nInput; i++) {
+            if (!VariableUtilities.varListEqualUnordered(op.getInputCompareVariables(i),
+                    intersetOpArg.getInputCompareVariables(i))) {
+                return Boolean.FALSE;
+            }
+            if (hasExtraVars && !VariableUtilities.varListEqualUnordered(op.getInputExtraVariables(i),
+                    intersetOpArg.getInputExtraVariables(i))) {
                 return Boolean.FALSE;
             }
         }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
index ed2152c..bfeb83f 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
@@ -505,18 +505,37 @@
             return;
         }
         IntersectOperator opArg = (IntersectOperator) arg;
-        if (op.getNumInput() != opArg.getNumInput()) {
+        int nInput = op.getNumInput();
+        if (nInput != opArg.getNumInput()) {
             return;
         }
-        for (int i = 0; i < op.getNumInput(); i++) {
-            for (int j = 0; j < op.getInputVariables(i).size(); j++) {
-                if (!varEquivalent(op.getInputVariables(i).get(j), opArg.getInputVariables(i).get(j))) {
+        boolean hasExtraVars = op.hasExtraVariables();
+        if (hasExtraVars && !opArg.hasExtraVariables()) {
+            return;
+        }
+        for (int i = 0; i < nInput; i++) {
+            List<LogicalVariable> inputCompareVars = op.getInputCompareVariables(i);
+            List<LogicalVariable> inputCompareVarsArg = opArg.getInputCompareVariables(i);
+            for (int j = 0, n = inputCompareVars.size(); j < n; j++) {
+                if (!varEquivalent(inputCompareVars.get(j), inputCompareVarsArg.get(j))) {
                     return;
                 }
             }
-
+            if (hasExtraVars) {
+                List<LogicalVariable> inputExtraVars = op.getInputExtraVariables(i);
+                List<LogicalVariable> inputExtraVarsArg = opArg.getInputExtraVariables(i);
+                for (int j = 0, n = inputExtraVars.size(); j < n; j++) {
+                    if (!varEquivalent(inputExtraVars.get(j), inputExtraVarsArg.get(j))) {
+                        return;
+                    }
+                }
+            }
         }
-        mapVariables(op.getOutputVars(), opArg.getOutputVars());
+
+        mapVariables(op.getOutputCompareVariables(), opArg.getOutputCompareVariables());
+        if (hasExtraVars) {
+            mapVariables(op.getOutputExtraVariables(), opArg.getOutputExtraVariables());
+        }
     }
 
     private boolean varEquivalent(LogicalVariable left, LogicalVariable right) {
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
index 609f1fb..1c02a6c 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
@@ -520,30 +520,25 @@
     @Override
     public ILogicalOperator visitIntersectOperator(IntersectOperator op, ILogicalOperator arg)
             throws AlgebricksException {
-        List<List<LogicalVariable>> liveVarsInInputs = getLiveVarsInInputs(op);
-        List<LogicalVariable> outputCopy = new ArrayList<>();
-        for (LogicalVariable var : op.getOutputVars()) {
-            outputCopy.add(deepCopyVariable(var));
+        int nInput = op.getNumInput();
+        List<LogicalVariable> outputCompareVarsCopy = deepCopyVariableList(op.getOutputCompareVariables());
+        boolean hasExtraVars = op.hasExtraVariables();
+        List<LogicalVariable> outputExtraVarsCopy =
+                hasExtraVars ? deepCopyVariableList(op.getOutputExtraVariables()) : null;
+        List<List<LogicalVariable>> inputCompareVarsCopy = new ArrayList<>(nInput);
+        List<List<LogicalVariable>> inputExtraVarsCopy = hasExtraVars ? new ArrayList<>(nInput) : null;
+        for (int i = 0; i < nInput; i++) {
+            inputCompareVarsCopy.add(deepCopyVariableList(op.getInputCompareVariables(i)));
+            if (hasExtraVars) {
+                inputExtraVarsCopy.add(deepCopyVariableList(op.getInputExtraVariables(i)));
+            }
         }
-        IntersectOperator opCopy = new IntersectOperator(outputCopy, liveVarsInInputs);
+        IntersectOperator opCopy = new IntersectOperator(outputCompareVarsCopy, outputExtraVarsCopy,
+                inputCompareVarsCopy, inputExtraVarsCopy);
         deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
         return opCopy;
     }
 
-    private List<List<LogicalVariable>> getLiveVarsInInputs(AbstractLogicalOperator op) throws AlgebricksException {
-        List<Mutable<ILogicalOperator>> copiedInputs = new ArrayList<>();
-        for (Mutable<ILogicalOperator> childRef : op.getInputs()) {
-            copiedInputs.add(deepCopyOperatorReference(childRef, null));
-        }
-        List<List<LogicalVariable>> liveVarsInInputs = new ArrayList<>();
-        for (Mutable<ILogicalOperator> inputOpRef : copiedInputs) {
-            List<LogicalVariable> liveVars = new ArrayList<>();
-            VariableUtilities.getLiveVariables(inputOpRef.getValue(), liveVars);
-            liveVarsInInputs.add(liveVars);
-        }
-        return liveVarsInInputs;
-    }
-
     @Override
     public ILogicalOperator visitUnnestMapOperator(UnnestMapOperator op, ILogicalOperator arg)
             throws AlgebricksException {
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
index 33e1bfa..62ea79d 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
@@ -209,18 +209,19 @@
 
     @Override
     public ILogicalOperator visitIntersectOperator(IntersectOperator op, Void arg) throws AlgebricksException {
-        List<LogicalVariable> newOutputVars = new ArrayList<>(op.getOutputVars());
-        int numInput = op.getNumInput();
-        List<List<LogicalVariable>> newCompareVars = new ArrayList<>(numInput);
-        List<List<LogicalVariable>> extraVars = op.getExtraVariables();
-        List<List<LogicalVariable>> newExtraVars = extraVars != null ? new ArrayList<>(numInput) : null;
-        for (int i = 0; i < numInput; i++) {
-            newCompareVars.add(new ArrayList<>(op.getCompareVariables(i)));
-            if (extraVars != null) {
-                newExtraVars.add(new ArrayList<>(extraVars.get(i)));
+        int nInput = op.getNumInput();
+        boolean hasExtraVars = op.hasExtraVariables();
+        List<LogicalVariable> newOutputCompareVars = new ArrayList<>(op.getOutputCompareVariables());
+        List<LogicalVariable> newOutputExtraVars = hasExtraVars ? new ArrayList<>(op.getOutputExtraVariables()) : null;
+        List<List<LogicalVariable>> newInputCompareVars = new ArrayList<>(nInput);
+        List<List<LogicalVariable>> newInputExtraVars = hasExtraVars ? new ArrayList<>(nInput) : null;
+        for (int i = 0; i < nInput; i++) {
+            newInputCompareVars.add(new ArrayList<>(op.getInputCompareVariables(i)));
+            if (hasExtraVars) {
+                newInputExtraVars.add(new ArrayList<>(op.getInputExtraVariables(i)));
             }
         }
-        return new IntersectOperator(newOutputVars, newCompareVars, newExtraVars);
+        return new IntersectOperator(newOutputCompareVars, newOutputExtraVars, newInputCompareVars, newInputExtraVars);
     }
 
     @Override
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
index 7084a60..5d9d7895 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
@@ -221,7 +221,10 @@
 
     @Override
     public Void visitIntersectOperator(IntersectOperator op, Void arg) throws AlgebricksException {
-        producedVariables.addAll(op.getOutputVars());
+        producedVariables.addAll(op.getOutputCompareVariables());
+        if (op.hasExtraVariables()) {
+            producedVariables.addAll(op.getOutputExtraVariables());
+        }
         return null;
     }
 
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index dfd88ec..f7c7287 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -271,16 +271,15 @@
     @Override
     public Void visitIntersectOperator(IntersectOperator op, Pair<LogicalVariable, LogicalVariable> pair)
             throws AlgebricksException {
-        for (int i = 0; i < op.getOutputVars().size(); i++) {
-            if (op.getOutputVars().get(i).equals(pair.first)) {
-                op.getOutputVars().set(i, pair.second);
-            }
+        boolean hasExtraVars = op.hasExtraVariables();
+        substInArray(op.getOutputCompareVariables(), pair.first, pair.second);
+        if (hasExtraVars) {
+            substInArray(op.getOutputExtraVariables(), pair.first, pair.second);
         }
-        for (int i = 0; i < op.getNumInput(); i++) {
-            for (int j = 0; j < op.getInputVariables(i).size(); j++) {
-                if (op.getInputVariables(i).get(j).equals(pair.first)) {
-                    op.getInputVariables(i).set(j, pair.second);
-                }
+        for (int i = 0, n = op.getNumInput(); i < n; i++) {
+            substInArray(op.getInputCompareVariables(i), pair.first, pair.second);
+            if (hasExtraVars) {
+                substInArray(op.getInputExtraVariables(i), pair.first, pair.second);
             }
         }
         return null;
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index e298200..e20e534 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -288,12 +288,20 @@
 
     @Override
     public Void visitIntersectOperator(IntersectOperator op, Void arg) throws AlgebricksException {
+        boolean hasExtraVars = op.hasExtraVariables();
         for (int i = 0; i < op.getNumInput(); i++) {
-            for (LogicalVariable var : op.getInputVariables(i)) {
+            for (LogicalVariable var : op.getInputCompareVariables(i)) {
                 if (!usedVariables.contains(var)) {
                     usedVariables.add(var);
                 }
             }
+            if (hasExtraVars) {
+                for (LogicalVariable var : op.getInputExtraVariables(i)) {
+                    if (!usedVariables.contains(var)) {
+                        usedVariables.add(var);
+                    }
+                }
+            }
         }
         return null;
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java
index d484fdf..3cf0fb9 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java
@@ -23,6 +23,7 @@
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -66,17 +67,18 @@
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator iop,
             IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         IntersectOperator intersectOp = (IntersectOperator) iop;
-        StructuralPropertiesVector[] pv = new StructuralPropertiesVector[intersectOp.getNumInput()];
-        for (int i = 0; i < intersectOp.getNumInput(); i++) {
+        int numInput = intersectOp.getNumInput();
+        StructuralPropertiesVector[] pv = new StructuralPropertiesVector[numInput];
+        for (int i = 0; i < numInput; i++) {
             List<ILocalStructuralProperty> localProps = new ArrayList<>();
             List<OrderColumn> orderColumns = new ArrayList<>();
-            for (LogicalVariable column : intersectOp.getCompareVariables(i)) {
+            for (LogicalVariable column : intersectOp.getInputCompareVariables(i)) {
                 orderColumns.add(new OrderColumn(column, OrderOperator.IOrder.OrderKind.ASC));
             }
             localProps.add(new LocalOrderProperty(orderColumns));
             IPartitioningProperty pp = null;
             if (intersectOp.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) {
-                Set<LogicalVariable> partitioningVariables = new HashSet<>(intersectOp.getCompareVariables(i));
+                Set<LogicalVariable> partitioningVariables = new HashSet<>(intersectOp.getInputCompareVariables(i));
                 pp = new UnorderedPartitionedProperty(partitioningVariables, null);
             }
             pv[i] = new StructuralPropertiesVector(pp, localProps);
@@ -85,21 +87,22 @@
     }
 
     @Override
-    public void computeDeliveredProperties(ILogicalOperator iop, IOptimizationContext context)
-            throws AlgebricksException {
+    public void computeDeliveredProperties(ILogicalOperator iop, IOptimizationContext context) {
         IntersectOperator op = (IntersectOperator) iop;
         IPartitioningProperty pp =
                 op.getInputs().get(0).getValue().getDeliveredPhysicalProperties().getPartitioningProperty();
 
-        HashMap<LogicalVariable, LogicalVariable> varMaps = new HashMap<>(op.getOutputVars().size());
-        for (int i = 0; i < op.getOutputVars().size(); i++) {
-            varMaps.put(op.getInputVariables(0).get(i), op.getOutputVars().get(i));
+        List<LogicalVariable> outputCompareVars = op.getOutputCompareVariables();
+        int numCompareVars = outputCompareVars.size();
+        Map<LogicalVariable, LogicalVariable> varMaps = new HashMap<>(numCompareVars);
+        for (int i = 0; i < numCompareVars; i++) {
+            varMaps.put(op.getInputCompareVariables(0).get(i), outputCompareVars.get(i));
         }
         pp.substituteColumnVars(varMaps);
 
         List<ILocalStructuralProperty> propsLocal = new ArrayList<>();
         List<OrderColumn> orderColumns = new ArrayList<>();
-        for (LogicalVariable var : op.getOutputVars()) {
+        for (LogicalVariable var : outputCompareVars) {
             orderColumns.add(new OrderColumn(var, OrderOperator.IOrder.OrderKind.ASC));
         }
         propsLocal.add(new LocalOrderProperty(orderColumns));
@@ -115,30 +118,31 @@
         int nInput = logicalOp.getNumInput();
         int[][] compareFields = new int[nInput][];
 
-        List<LogicalVariable> compareVars0 = logicalOp.getCompareVariables(0);
+        List<LogicalVariable> inputCompareVars0 = logicalOp.getInputCompareVariables(0);
         IVariableTypeEnvironment inputTypeEnv0 = context.getTypeEnvironment(logicalOp.getInputs().get(0).getValue());
         IBinaryComparatorFactory[] comparatorFactories =
-                JobGenHelper.variablesToAscBinaryComparatorFactories(compareVars0, inputTypeEnv0, context);
+                JobGenHelper.variablesToAscBinaryComparatorFactories(inputCompareVars0, inputTypeEnv0, context);
 
         INormalizedKeyComputerFactoryProvider nkcfProvider = context.getNormalizedKeyComputerFactoryProvider();
         INormalizedKeyComputerFactory nkcf = null;
         if (nkcfProvider != null) {
-            Object type = inputTypeEnv0.getVarType(compareVars0.get(0));
+            Object type = inputTypeEnv0.getVarType(inputCompareVars0.get(0));
             if (type != null) {
                 nkcf = nkcfProvider.getNormalizedKeyComputerFactory(type, true);
             }
         }
 
-        for (int i = 0; i < logicalOp.getNumInput(); i++) {
-            compareFields[i] = JobGenHelper.variablesToFieldIndexes(logicalOp.getCompareVariables(i), inputSchemas[i]);
+        for (int i = 0; i < nInput; i++) {
+            compareFields[i] =
+                    JobGenHelper.variablesToFieldIndexes(logicalOp.getInputCompareVariables(i), inputSchemas[i]);
         }
 
         int[][] extraFields = null;
-        if (logicalOp.getExtraVariables() != null) {
-            extraFields = new int[logicalOp.getNumInput()][];
-            for (int i = 0; i < logicalOp.getNumInput(); i++) {
+        if (logicalOp.hasExtraVariables()) {
+            extraFields = new int[nInput][];
+            for (int i = 0; i < nInput; i++) {
                 extraFields[i] =
-                        JobGenHelper.variablesToFieldIndexes(logicalOp.getExtraVariables().get(i), inputSchemas[i]);
+                        JobGenHelper.variablesToFieldIndexes(logicalOp.getInputExtraVariables(i), inputSchemas[i]);
             }
         }
 
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index 2dd55c1..142d4cc 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -250,46 +250,24 @@
 
     @Override
     public Void visitIntersectOperator(IntersectOperator op, Integer indent) throws AlgebricksException {
-        addIndent(indent).append("intersect (");
-
-        buffer.append('[');
-        for (int i = 0; i < op.getOutputVars().size(); i++) {
+        addIndent(indent).append("intersect ");
+        pprintVarList(op.getOutputCompareVariables());
+        if (op.hasExtraVariables()) {
+            buffer.append(" extra ");
+            pprintVarList(op.getOutputExtraVariables());
+        }
+        buffer.append(" <- [");
+        for (int i = 0, n = op.getNumInput(); i < n; i++) {
             if (i > 0) {
                 buffer.append(", ");
             }
-            buffer.append(str(op.getOutputVars().get(i)));
+            pprintVarList(op.getInputCompareVariables(i));
+            if (op.hasExtraVariables()) {
+                buffer.append(" extra ");
+                pprintVarList(op.getInputExtraVariables(i));
+            }
         }
-        buffer.append("] <- [");
-        for (int i = 0; i < op.getNumInput(); i++) {
-            if (i > 0) {
-                buffer.append(", ");
-            }
-            buffer.append('[');
-            for (int j = 0; j < op.getInputVariables(i).size(); j++) {
-                if (j > 0) {
-                    buffer.append(", ");
-                }
-                buffer.append(str(op.getInputVariables(i).get(j)));
-            }
-            buffer.append("] cmp [");
-            for (int j = 0; j < op.getCompareVariables(i).size(); j++) {
-                if (j > 0) {
-                    buffer.append(", ");
-                }
-                buffer.append(str(op.getCompareVariables(i).get(j)));
-            }
-            if (op.getExtraVariables() != null) {
-                buffer.append("] ext [");
-                for (int j = 0; j < op.getExtraVariables().get(i).size(); j++) {
-                    if (j > 0) {
-                        buffer.append(", ");
-                    }
-                    buffer.append(str(op.getExtraVariables().get(i).get(j)));
-                }
-            }
-            buffer.append(']');
-        }
-        buffer.append("])");
+        buffer.append(']');
         return null;
     }
 
@@ -593,6 +571,20 @@
         }
     }
 
+    protected void pprintVarList(List<LogicalVariable> variables) throws AlgebricksException {
+        buffer.append('[');
+        boolean first = true;
+        for (LogicalVariable var : variables) {
+            if (first) {
+                first = false;
+            } else {
+                buffer.append(", ");
+            }
+            buffer.append(str(var));
+        }
+        buffer.append(']');
+    }
+
     protected void pprintExprList(List<Mutable<ILogicalExpression>> expressions, Integer indent)
             throws AlgebricksException {
         buffer.append("[");
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
index 266cfd4..14c3549 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
@@ -356,18 +356,39 @@
     public Void visitIntersectOperator(IntersectOperator op, Integer indent) throws AlgebricksException {
         addIndent(indent).append("\"operator\": \"intersect\",\n");
 
-        addIndent(indent).append("\"output-variables\": [");
-        appendVars(op.getOutputVars());
-        buffer.append("],");
-        addIndent(indent).append("\"input_variables\": [");
-
-        for (int i = 0; i < op.getNumInput(); i++) {
-            if (i > 0) {
-                buffer.append(",\n");
-            }
-            appendVars(op.getInputVariables(i));
+        addIndent(indent).append("\"output-compare-variables\": [");
+        appendVars(op.getOutputCompareVariables());
+        buffer.append(']');
+        if (op.hasExtraVariables()) {
+            buffer.append(",\n");
+            addIndent(indent).append("\"output-extra-variables\": [");
+            appendVars(op.getOutputExtraVariables());
+            buffer.append(']');
         }
-        buffer.append("]");
+        buffer.append(",\n");
+        addIndent(indent).append("\"input-compare-variables\": [");
+        for (int i = 0, n = op.getNumInput(); i < n; i++) {
+            if (i > 0) {
+                buffer.append(", ");
+            }
+            buffer.append('[');
+            appendVars(op.getInputCompareVariables(i));
+            buffer.append(']');
+        }
+        buffer.append(']');
+        if (op.hasExtraVariables()) {
+            buffer.append(",\n");
+            addIndent(indent).append("\"input-extra-variables\": [");
+            for (int i = 0, n = op.getNumInput(); i < n; i++) {
+                if (i > 0) {
+                    buffer.append(", ");
+                }
+                buffer.append('[');
+                appendVars(op.getInputExtraVariables(i));
+                buffer.append(']');
+            }
+            buffer.append(']');
+        }
         return null;
     }
 
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
index 799e74b..a1ae42b 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
@@ -306,26 +306,21 @@
     public String visitIntersectOperator(IntersectOperator op, Boolean showDetails) throws AlgebricksException {
         stringBuilder.setLength(0);
         stringBuilder.append("intersect (");
-        stringBuilder.append('[');
-        for (int i = 0; i < op.getOutputVars().size(); i++) {
-            if (i > 0) {
-                stringBuilder.append(", ");
-            }
-            stringBuilder.append(str(op.getOutputVars().get(i)));
+        pprintVarList(op.getOutputCompareVariables());
+        if (op.hasExtraVariables()) {
+            stringBuilder.append(" extra ");
+            pprintVarList(op.getOutputExtraVariables());
         }
-        stringBuilder.append("] <- [");
-        for (int i = 0; i < op.getNumInput(); i++) {
+        stringBuilder.append(" <- [");
+        for (int i = 0, n = op.getNumInput(); i < n; i++) {
             if (i > 0) {
                 stringBuilder.append(", ");
             }
-            stringBuilder.append('[');
-            for (int j = 0; j < op.getInputVariables(i).size(); j++) {
-                if (j > 0) {
-                    stringBuilder.append(", ");
-                }
-                stringBuilder.append(str(op.getInputVariables(i).get(j)));
+            pprintVarList(op.getInputCompareVariables(i));
+            if (op.hasExtraVariables()) {
+                stringBuilder.append(" extra ");
+                pprintVarList(op.getInputExtraVariables(i));
             }
-            stringBuilder.append(']');
         }
         stringBuilder.append("])");
         appendSchema(op, showDetails);
@@ -659,6 +654,12 @@
         return stringBuilder.toString();
     }
 
+    protected void pprintVarList(List<LogicalVariable> variables) {
+        stringBuilder.append("[");
+        variables.forEach(var -> stringBuilder.append(str(var)).append(", "));
+        stringBuilder.append("]");
+    }
+
     private void printExprList(List<Mutable<ILogicalExpression>> expressions) {
         stringBuilder.append("[");
         expressions.forEach(exprRef -> stringBuilder.append(exprRef.getValue().toString()).append(", "));
