Intersect the 2ndary indexes before primary search

The following commits from your working branch will be included:

Change-Id: Ic16c67c529ca19d8b1a5439ddef22760945fd0d7
Reviewed-on: https://asterix-gerrit.ics.uci.edu/577
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
index 7e9da44..977107c 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
@@ -53,4 +53,5 @@
     UPDATE,
     WRITE,
     WRITE_RESULT,
+    INTERSECT,
 }
\ No newline at end of file
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
index c581e82..82d0b0e 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
@@ -73,4 +73,5 @@
     UNNEST,
     UPDATE,
     WRITE_RESULT,
+    INTERSECT,
 }
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IntersectOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IntersectOperator.java
new file mode 100644
index 0000000..e64be2b
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IntersectOperator.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.core.algebra.operators.logical;
+
+import java.util.List;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.properties.FilteredVariablePropagationPolicy;
+import org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
+import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
+import org.apache.hyracks.algebricks.core.algebra.typing.NonPropagatingTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+import org.apache.hyracks.algebricks.core.config.AlgebricksConfig;
+
+public class IntersectOperator extends AbstractLogicalOperator {
+
+    private final List<List<LogicalVariable>> inputVars;
+    private final List<LogicalVariable> outputVars;
+
+    public IntersectOperator(List<LogicalVariable> outputVars, List<List<LogicalVariable>> inputVars)
+            throws AlgebricksException {
+        if (outputVars.size() != inputVars.get(0).size()) {
+            throw new AlgebricksException("The number of output variables is different with the input variable number");
+        }
+        if (inputVars.stream().anyMatch(vlist -> vlist.size() != outputVars.size())) {
+            throw new AlgebricksException("The schemas of input variables are not consistent");
+        }
+        this.outputVars = outputVars;
+        this.inputVars = inputVars;
+    }
+
+    @Override
+    public LogicalOperatorTag getOperatorTag() {
+        return LogicalOperatorTag.INTERSECT;
+    }
+
+    @Override
+    public void recomputeSchema() throws AlgebricksException {
+        schema = outputVars;
+    }
+
+    @Override
+    public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform transform)
+            throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg) throws AlgebricksException {
+        return visitor.visitIntersectOperator(this, arg);
+    }
+
+    @Override
+    public boolean isMap() {
+        return false;
+    }
+
+    @Override
+    public VariablePropagationPolicy getVariablePropagationPolicy() {
+        return new FilteredVariablePropagationPolicy(outputVars);
+    }
+
+    @Override
+    public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
+        IVariableTypeEnvironment typeEnv = ctx.getOutputTypeEnvironment(inputs.get(0).getValue());
+
+        for (int i = 1; i < inputs.size(); i++) {
+            checkTypeConsistency(typeEnv, inputVars.get(0), ctx.getOutputTypeEnvironment(inputs.get(i).getValue()),
+                    inputVars.get(i));
+        }
+
+        IVariableTypeEnvironment env = new NonPropagatingTypeEnvironment(ctx.getExpressionTypeComputer(),
+                ctx.getMetadataProvider());
+        for (int i = 0; i < outputVars.size(); i++) {
+            env.setVarType(outputVars.get(i), typeEnv.getVarType(inputVars.get(0).get(i)));
+        }
+        return typeEnv;
+    }
+
+    public List<LogicalVariable> getOutputVars() {
+        return outputVars;
+    }
+
+    public int getNumInput() {
+        return inputVars.size();
+    }
+
+    public List<LogicalVariable> getInputVariables(int inputIndex) {
+        return inputVars.get(inputIndex);
+    }
+
+    private void checkTypeConsistency(IVariableTypeEnvironment expected, List<LogicalVariable> expectedVariables,
+            IVariableTypeEnvironment actual, List<LogicalVariable> actualVariables) throws AlgebricksException {
+        for (int i = 0; i < expectedVariables.size(); i++) {
+            Object expectedType = expected.getVarType(expectedVariables.get(i));
+            Object actualType = actual.getVarType(actualVariables.get(i));
+            if (!expectedType.equals(actualType)) {
+                AlgebricksConfig.ALGEBRICKS_LOGGER
+                        .warning("Type of two variables are not equal." + expectedVariables.get(i) + " is of type: "
+                                + expectedType + actualVariables.get(i) + " is of type: " + actualType);
+            }
+        }
+    }
+
+}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
index b2c97c3..398b4d2 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
@@ -59,6 +59,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
@@ -475,6 +476,12 @@
     }
 
     @Override
+    public Void visitIntersectOperator(IntersectOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        setEmptyFDsEqClasses(op, ctx);
+        return null;
+    }
+
+    @Override
     public Void visitUnnestMapOperator(UnnestMapOperator op, IOptimizationContext ctx) throws AlgebricksException {
         fdsEqClassesForAbstractUnnestOperator(op, ctx);
         return null;
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
index 7a4e7e1..eb6cd15 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
@@ -48,6 +48,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
@@ -331,6 +332,32 @@
     }
 
     @Override
+    public Boolean visitIntersectOperator(IntersectOperator op, ILogicalOperator arg) throws AlgebricksException {
+        if (op.getOperatorTag() != LogicalOperatorTag.INTERSECT){
+            return Boolean.FALSE;
+        }
+        IntersectOperator intersetOpArg = (IntersectOperator) copyAndSubstituteVar(op, arg);
+        List<LogicalVariable> variables = op.getOutputVars();
+        List<LogicalVariable> variablesArg = intersetOpArg.getOutputVars();
+        if (variables.size() != variablesArg.size()){
+            return Boolean.FALSE;
+        }
+        if (!VariableUtilities.varListEqualUnordered(variables, variablesArg)){
+            return Boolean.FALSE;
+        }
+
+        if (op.getNumInput() != intersetOpArg.getNumInput()){
+            return Boolean.FALSE;
+        }
+        for (int i = 0; i < op.getNumInput(); i++){
+            if (!VariableUtilities.varListEqualUnordered(op.getInputVariables(i), intersetOpArg.getInputVariables(i))){
+                return Boolean.FALSE;
+            }
+        }
+        return Boolean.TRUE;
+    }
+
+    @Override
     public Boolean visitUnnestOperator(UnnestOperator op, ILogicalOperator arg) throws AlgebricksException {
         AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
         if (aop.getOperatorTag() != LogicalOperatorTag.UNNEST)
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
index c46ffde..7b0b944 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
@@ -49,6 +49,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
@@ -202,6 +203,13 @@
     }
 
     @Override
+    public Void visitIntersectOperator(IntersectOperator op, ILogicalOperator arg) throws AlgebricksException {
+        mapChildren(op, arg);
+        mapVariablesForIntersect(op, arg);
+        return null;
+    }
+
+    @Override
     public Void visitUnnestOperator(UnnestOperator op, ILogicalOperator arg) throws AlgebricksException {
         mapVariablesStandard(op, arg);
         return null;
@@ -428,6 +436,22 @@
         }
     }
 
+    private void mapVariablesForIntersect(IntersectOperator op, ILogicalOperator arg) {
+        IntersectOperator opArg = (IntersectOperator) arg;
+        if (op.getNumInput() != opArg.getNumInput()){
+            return;
+        }
+        for (int i = 0; i < op.getNumInput(); i++){
+            for (int j = 0; j < op.getInputVariables(i).size(); j++){
+                if (!varEquivalent(op.getInputVariables(i).get(j), opArg.getInputVariables(i).get(j))){
+                    return;
+                }
+            }
+
+        }
+        mapVariables(op.getOutputVars(), opArg.getOutputVars());
+    }
+
     private boolean varEquivalent(LogicalVariable left, LogicalVariable right) {
         if (variableMapping.get(right) == null)
             return false;
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
index b3f6639..9d83ae5 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
@@ -44,6 +44,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
@@ -464,6 +465,33 @@
     }
 
     @Override
+    public ILogicalOperator visitIntersectOperator(IntersectOperator op, ILogicalOperator arg)
+            throws AlgebricksException {
+        List<List<LogicalVariable>> liveVarsInInputs = getLiveVarsInInputs(op);
+        List<LogicalVariable> outputCopy = new ArrayList<>();
+        for (LogicalVariable var : op.getOutputVars()){
+            outputCopy.add(deepCopyVariable(var));
+        }
+        IntersectOperator opCopy = new IntersectOperator(outputCopy, liveVarsInInputs);
+        deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
+        return opCopy;
+    }
+
+    private List<List<LogicalVariable>> getLiveVarsInInputs(AbstractLogicalOperator op) throws AlgebricksException {
+        List<Mutable<ILogicalOperator>> copiedInputs = new ArrayList<>();
+        for (Mutable<ILogicalOperator> childRef : op.getInputs()) {
+            copiedInputs.add(deepCopyOperatorReference(childRef, null));
+        }
+        List<List<LogicalVariable>> liveVarsInInputs = new ArrayList<>();
+        for (Mutable<ILogicalOperator> inputOpRef : copiedInputs) {
+            List<LogicalVariable> liveVars = new ArrayList<>();
+            VariableUtilities.getLiveVariables(inputOpRef.getValue(), liveVars);
+            liveVarsInInputs.add(liveVars);
+        }
+        return liveVarsInInputs;
+    }
+
+    @Override
     public ILogicalOperator visitUnnestMapOperator(UnnestMapOperator op, ILogicalOperator arg)
             throws AlgebricksException {
         UnnestMapOperator opCopy = new UnnestMapOperator(deepCopyVariableList(op.getVariables()),
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
index 4e5b13e..9de0992 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
@@ -39,6 +39,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
@@ -229,6 +230,11 @@
     }
 
     @Override
+    public Void visitIntersectOperator(IntersectOperator op, IOptimizationContext arg) throws AlgebricksException {
+        return null;
+    }
+
+    @Override
     public Void visitUnnestMapOperator(UnnestMapOperator op, IOptimizationContext arg) throws AlgebricksException {
         // TODO Auto-generated method stub
         return null;
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
index 2e402fc..8d436d1 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
@@ -44,6 +44,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
@@ -206,6 +207,16 @@
     }
 
     @Override
+    public ILogicalOperator visitIntersectOperator(IntersectOperator op, Void arg) throws AlgebricksException {
+        List<LogicalVariable> outputVar = new ArrayList<>(op.getOutputVars());
+        List<List<LogicalVariable>> inputVars = new ArrayList<>(op.getNumInput());
+        for(int i = 0; i < op.getNumInput(); i++){
+            inputVars.add(new ArrayList<>(op.getInputVariables(i)));
+        }
+        return new IntersectOperator(outputVar, inputVars);
+    }
+
+    @Override
     public ILogicalOperator visitUnnestOperator(UnnestOperator op, Void arg) throws AlgebricksException {
         return new UnnestOperator(op.getVariable(), deepCopyExpressionRef(op.getExpressionRef()),
                 op.getPositionalVariable(), op.getPositionalVariableType(), op.getPositionWriter());
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
index 8df772b..d23ff94 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
@@ -44,6 +44,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
@@ -204,6 +205,12 @@
     }
 
     @Override
+    public Void visitIntersectOperator(IntersectOperator op, Void arg) throws AlgebricksException {
+        producedVariables.addAll(op.getOutputVars());
+        return null;
+    }
+
+    @Override
     public Void visitUnnestMapOperator(UnnestMapOperator op, Void arg) throws AlgebricksException {
         producedVariables.addAll(op.getVariables());
         return null;
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
index b488df1..5c6d3c3 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
@@ -44,6 +44,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
@@ -230,6 +231,12 @@
     }
 
     @Override
+    public Void visitIntersectOperator(IntersectOperator op, Void arg) throws AlgebricksException {
+        VariableUtilities.getProducedVariables(op, schemaVariables);
+        return null;
+    }
+
+    @Override
     public Void visitUnnestMapOperator(UnnestMapOperator op, Void arg) throws AlgebricksException {
         if (op.propagatesInput()) {
             standardLayout(op);
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index 91ff073..d8e25f7 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -42,6 +42,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
@@ -303,6 +304,24 @@
     }
 
     @Override
+    public Void visitIntersectOperator(IntersectOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+            throws AlgebricksException {
+        for (int i = 0; i < op.getOutputVars().size(); i++) {
+            if (op.getOutputVars().get(i).equals(pair.first)){
+                op.getOutputVars().set(i, pair.second);
+            }
+        }
+        for(int i = 0; i < op.getNumInput(); i++){
+            for (int j = 0; j < op.getInputVariables(i).size(); j++){
+                if (op.getInputVariables(i).get(j).equals(pair.first)){
+                    op.getInputVariables(i).set(j, pair.second);
+                }
+            }
+        }
+        return null;
+    }
+
+    @Override
     public Void visitUnnestMapOperator(UnnestMapOperator op, Pair<LogicalVariable, LogicalVariable> pair)
             throws AlgebricksException {
         List<LogicalVariable> variables = op.getVariables();
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index cfc57c2..ef02feb 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -43,6 +43,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
@@ -297,6 +298,18 @@
     }
 
     @Override
+    public Void visitIntersectOperator(IntersectOperator op, Void arg) throws AlgebricksException {
+        for (int i = 0; i < op.getNumInput(); i++) {
+            for (LogicalVariable var : op.getInputVariables(i)) {
+                if (!usedVariables.contains(var)) {
+                    usedVariables.add(var);
+                }
+            }
+        }
+        return null;
+    }
+
+    @Override
     public Void visitUnnestMapOperator(UnnestMapOperator op, Void arg) {
         op.getExpressionRef().getValue().getUsedVariables(usedVariables);
         if (op.getAdditionalFilteringExpressions() != null) {
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java
new file mode 100644
index 0000000..b6d0f1f
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import org.apache.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import org.apache.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.dataflow.std.intersect.IntersectOperatorDescriptor;
+
+public class IntersectPOperator extends AbstractPhysicalOperator {
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.INTERSECT;
+    }
+
+    @Override
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator iop,
+            IPhysicalPropertiesVector reqdByParent) {
+        IntersectOperator intersectOp = (IntersectOperator) iop;
+        StructuralPropertiesVector[] pv = new StructuralPropertiesVector[intersectOp.getNumInput()];
+        for (int i = 0; i < intersectOp.getNumInput(); i++) {
+            List<ILocalStructuralProperty> localProps = new ArrayList<>();
+            List<OrderColumn> orderColumns = new ArrayList<>();
+            for (LogicalVariable column : intersectOp.getInputVariables(i)) {
+                orderColumns.add(new OrderColumn(column, OrderOperator.IOrder.OrderKind.ASC));
+            }
+            localProps.add(new LocalOrderProperty(orderColumns));
+            IPartitioningProperty pp = null;
+            if (intersectOp.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) {
+                pp = new RandomPartitioningProperty(null);
+            }
+            pv[i] = new StructuralPropertiesVector(pp, localProps);
+        }
+        return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator iop, IOptimizationContext context)
+            throws AlgebricksException {
+        IntersectOperator op = (IntersectOperator) iop;
+        IPartitioningProperty pp = op.getInputs().get(0).getValue().getDeliveredPhysicalProperties()
+                .getPartitioningProperty();
+
+        HashMap<LogicalVariable, LogicalVariable> varMaps = new HashMap<>(op.getOutputVars().size());
+        for (int i = 0; i < op.getOutputVars().size(); i++) {
+            varMaps.put(op.getInputVariables(0).get(i), op.getOutputVars().get(i));
+        }
+        pp.substituteColumnVars(varMaps);
+
+        List<ILocalStructuralProperty> propsLocal = new ArrayList<>();
+        List<OrderColumn> orderColumns = new ArrayList<>();
+        for (LogicalVariable var : op.getOutputVars()) {
+            orderColumns.add(new OrderColumn(var, OrderOperator.IOrder.OrderKind.ASC));
+        }
+        propsLocal.add(new LocalOrderProperty(orderColumns));
+        deliveredProperties = new StructuralPropertiesVector(pp, propsLocal);
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+                    throws AlgebricksException {
+        // logical op should have checked all the mismatch issues.
+        IntersectOperator logicalOp = (IntersectOperator) op;
+        int nInput = logicalOp.getNumInput();
+        int[][] compareFields = new int[nInput][];
+
+        IBinaryComparatorFactory[] comparatorFactories = JobGenHelper.variablesToAscBinaryComparatorFactories(
+                logicalOp.getInputVariables(0), context.getTypeEnvironment(op), context);
+
+        INormalizedKeyComputerFactoryProvider nkcfProvider = context.getNormalizedKeyComputerFactoryProvider();
+        INormalizedKeyComputerFactory nkcf = null;
+
+        if (nkcfProvider != null) {
+            Object type = context.getTypeEnvironment(op).getVarType(logicalOp.getInputVariables(0).get(0));
+            if (type != null) {
+                nkcf = nkcfProvider.getNormalizedKeyComputerFactory(type, true);
+            }
+        }
+
+        for (int i = 0; i < logicalOp.getNumInput(); i++) {
+            compareFields[i] = JobGenHelper.variablesToFieldIndexes(logicalOp.getInputVariables(i), inputSchemas[i]);
+        }
+
+        IOperatorDescriptorRegistry spec = builder.getJobSpec();
+        RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema,
+                context);
+
+        IntersectOperatorDescriptor opDescriptor = null;
+        try {
+            opDescriptor = new IntersectOperatorDescriptor(spec, nInput, compareFields, nkcf, comparatorFactories,
+                    recordDescriptor);
+        } catch (HyracksException e) {
+            throw new AlgebricksException(e);
+        }
+
+        contributeOpDesc(builder, (AbstractLogicalOperator) op, opDescriptor);
+        for (int i = 0; i < op.getInputs().size(); i++) {
+            builder.contributeGraphEdge(op.getInputs().get(i).getValue(), 0, op, i);
+        }
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return false;
+    }
+
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return false;
+    }
+}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
index 4d51bf0..2c407b7 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
@@ -75,18 +75,6 @@
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
             IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
             throws AlgebricksException {
-        UnionAllOperator unionOp = (UnionAllOperator) op;
-        int n = unionOp.getVariableMappings().size();
-        int[] leftColumns = new int[n];
-        int[] rightColumns = new int[n];
-        int i = 0;
-        for (Triple<LogicalVariable, LogicalVariable, LogicalVariable> t : unionOp.getVariableMappings()) {
-            int posLeft = inputSchemas[0].findVariable(t.first);
-            leftColumns[i] = posLeft;
-            int posRight = inputSchemas[1].findVariable(t.second);
-            rightColumns[i] = posRight;
-            ++i;
-        }
 
         IOperatorDescriptorRegistry spec = builder.getJobSpec();
         RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index e310ff9..0514fbd 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -42,6 +42,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
@@ -253,6 +254,36 @@
     }
 
     @Override
+    public String visitIntersectOperator(IntersectOperator op, Integer indent) throws AlgebricksException {
+        StringBuilder builder = new StringBuilder();
+        addIndent(builder, indent).append("intersect (");
+
+        builder.append('[');
+        for (int i = 0; i < op.getOutputVars().size(); i++) {
+            if (i > 0) {
+                builder.append(", ");
+            }
+            builder.append(op.getOutputVars().get(i));
+        }
+        builder.append("] <- [");
+        for (int i = 0; i < op.getNumInput(); i++) {
+            if (i > 0) {
+                builder.append(", ");
+            }
+            builder.append('[');
+            for (int j = 0; j < op.getInputVariables(i).size(); j++) {
+                if (j > 0) {
+                    builder.append(", ");
+                }
+                builder.append(op.getInputVariables(i).get(j));
+            }
+            builder.append(']');
+        }
+        builder.append("])");
+        return builder.toString();
+    }
+
+    @Override
     public String visitUnnestOperator(UnnestOperator op, Integer indent) throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
         addIndent(buffer, indent).append("unnest " + op.getVariable());
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/BroadcastPartitioningProperty.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/BroadcastPartitioningProperty.java
index 5926a20..f3ba030 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/BroadcastPartitioningProperty.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/BroadcastPartitioningProperty.java
@@ -57,4 +57,8 @@
         this.domain = domain;
     }
 
+    @Override
+    public void substituteColumnVars(Map<LogicalVariable, LogicalVariable> varMap) {
+    }
+
 }
\ No newline at end of file
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPartitioningProperty.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPartitioningProperty.java
index 3142d10..89ac374 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPartitioningProperty.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPartitioningProperty.java
@@ -26,11 +26,11 @@
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 
 public interface IPartitioningProperty extends IStructuralProperty {
-    public enum PartitioningType {
+    enum PartitioningType {
         UNPARTITIONED, RANDOM, BROADCAST, UNORDERED_PARTITIONED, ORDERED_PARTITIONED
     }
 
-    static final INodeDomain DOMAIN_FOR_UNPARTITIONED_DATA = new INodeDomain() {
+    INodeDomain DOMAIN_FOR_UNPARTITIONED_DATA = new INodeDomain() {
         @Override
         public boolean sameAs(INodeDomain domain) {
             return domain == this;
@@ -42,7 +42,7 @@
         }
     };
 
-    public static final IPartitioningProperty UNPARTITIONED = new IPartitioningProperty() {
+    IPartitioningProperty UNPARTITIONED = new IPartitioningProperty() {
 
         @Override
         public PartitioningType getPartitioningType() {
@@ -72,14 +72,20 @@
         public void setNodeDomain(INodeDomain domain) {
             throw new IllegalStateException();
         }
+
+        @Override
+        public void substituteColumnVars(Map<LogicalVariable, LogicalVariable> variableMap) {
+        }
     };
 
-    public abstract PartitioningType getPartitioningType();
+    PartitioningType getPartitioningType();
 
-    public abstract void normalize(Map<LogicalVariable, EquivalenceClass> equivalenceClasses,
+    void normalize(Map<LogicalVariable, EquivalenceClass> equivalenceClasses,
             List<FunctionalDependency> fds);
 
-    public abstract INodeDomain getNodeDomain();
+    INodeDomain getNodeDomain();
 
-    public abstract void setNodeDomain(INodeDomain domain);
+    void setNodeDomain(INodeDomain domain);
+
+    void substituteColumnVars(Map<LogicalVariable, LogicalVariable> varMap);
 }
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java
index f28bc56..5808da1 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java
@@ -81,4 +81,13 @@
         this.domain = domain;
     }
 
+    @Override
+    public void substituteColumnVars(Map<LogicalVariable, LogicalVariable> varMap) {
+        for (OrderColumn orderColumn : orderColumns){
+            if (varMap.containsKey(orderColumn.getColumn())){
+                orderColumn.setColumn(varMap.get(orderColumn.getColumn()));
+            }
+        }
+    }
+
 }
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/RandomPartitioningProperty.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/RandomPartitioningProperty.java
index 24fe8e72..917fdd8 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/RandomPartitioningProperty.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/RandomPartitioningProperty.java
@@ -62,4 +62,8 @@
         this.domain = domain;
     }
 
+    @Override
+    public void substituteColumnVars(Map<LogicalVariable, LogicalVariable> varMap) {
+    }
+
 }
\ No newline at end of file
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/UnorderedPartitionedProperty.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/UnorderedPartitionedProperty.java
index de3b102..17e0cb3 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/UnorderedPartitionedProperty.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/UnorderedPartitionedProperty.java
@@ -65,4 +65,13 @@
         this.domain = domain;
     }
 
+    @Override
+    public void substituteColumnVars(Map<LogicalVariable, LogicalVariable> varMap) {
+        for (Map.Entry<LogicalVariable, LogicalVariable> var : varMap.entrySet()){
+            if (columnSet.remove(var.getKey())){
+                columnSet.add(var.getValue());
+            }
+        }
+    }
+
 }
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
index b5099b1..0aa6676 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
@@ -226,6 +226,12 @@
         return op.accept(visitor, null);
     }
 
+    public static ILogicalOperator deepCopyWithExcutionMode(ILogicalOperator op) throws AlgebricksException {
+        OperatorDeepCopyVisitor visitor = new OperatorDeepCopyVisitor();
+        AbstractLogicalOperator newOp = (AbstractLogicalOperator) op.accept(visitor, null);
+        newOp.setExecutionMode(op.getExecutionMode());
+        return newOp;
+    }
     /**
      * Compute type environment of a newly generated operator {@code op} and its input.
      *
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
index 53c8b69..6509e2a 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
@@ -32,6 +32,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
@@ -95,6 +96,8 @@
 
     public R visitUnionOperator(UnionAllOperator op, T arg) throws AlgebricksException;
 
+    public R visitIntersectOperator(IntersectOperator op, T arg) throws AlgebricksException;
+
     public R visitUnnestOperator(UnnestOperator op, T arg) throws AlgebricksException;
 
     public R visitOuterUnnestOperator(OuterUnnestOperator op, T arg) throws AlgebricksException;
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
index 186ac6f..1a61f2e 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
@@ -76,18 +76,18 @@
         int n = op.getInputs().size();
         IOperatorSchema[] schemas = new IOperatorSchema[n];
         int i = 0;
-        for (Mutable<ILogicalOperator> opRef2 : op.getInputs()) {
-            List<Mutable<ILogicalOperator>> parents = operatorVisitedToParents.get(opRef2);
+        for (Mutable<ILogicalOperator> opChild : op.getInputs()) {
+            List<Mutable<ILogicalOperator>> parents = operatorVisitedToParents.get(opChild);
             if (parents == null) {
                 parents = new ArrayList<Mutable<ILogicalOperator>>();
-                operatorVisitedToParents.put(opRef2, parents);
+                operatorVisitedToParents.put(opChild, parents);
                 parents.add(opRef);
-                compileOpRef(opRef2, spec, builder, outerPlanSchema);
-                schemas[i++] = context.getSchema(opRef2.getValue());
+                compileOpRef(opChild, spec, builder, outerPlanSchema);
+                schemas[i++] = context.getSchema(opChild.getValue());
             } else {
                 if (!parents.contains(opRef))
                     parents.add(opRef);
-                schemas[i++] = context.getSchema(opRef2.getValue());
+                schemas[i++] = context.getSchema(opChild.getValue());
                 continue;
             }
         }
diff --git a/algebricks/algebricks-examples/piglet-example/pom.xml b/algebricks/algebricks-examples/piglet-example/pom.xml
index ae2ec51..20bc586 100644
--- a/algebricks/algebricks-examples/piglet-example/pom.xml
+++ b/algebricks/algebricks-examples/piglet-example/pom.xml
@@ -48,6 +48,7 @@
       <plugin>
         <groupId>org.codehaus.mojo</groupId>
         <artifactId>build-helper-maven-plugin</artifactId>
+        <version>1.10</version>
         <executions>
           <execution>
             <id>add-source</id>
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index e99d126..73bba8f 100644
--- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -64,6 +64,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.IndexBulkloadPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.IndexInsertDeleteUpsertPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.InsertDeleteUpsertPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.IntersectPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.MicroPreclusteredGroupByPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.NestedTupleSourcePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.PreSortedDistinctByPOperator;
@@ -249,7 +250,10 @@
                     op.setPhysicalOperator(new UnionAllPOperator());
                     break;
                 }
-
+                case INTERSECT: {
+                    op.setPhysicalOperator(new IntersectPOperator());
+                    break;
+                }
                 case UNNEST: {
                     op.setPhysicalOperator(new UnnestPOperator());
                     break;
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/resources/memory/FrameManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/resources/memory/FrameManager.java
index 3d3ed8db..8df1f38 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/resources/memory/FrameManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/resources/memory/FrameManager.java
@@ -52,7 +52,8 @@
         ByteBuffer buffer = ByteBuffer.allocate(bytes);
         if (bytes / minFrameSize > FrameConstants.MAX_NUM_MINFRAME) {
             throw new HyracksDataException(
-                    "Unable to allocate frame larger than:" + FrameConstants.MAX_NUM_MINFRAME + " bytes");
+                    "Unable to allocate frame larger than:" + FrameConstants.MAX_NUM_MINFRAME * minFrameSize
+                            + " bytes");
         }
         FrameHelper.serializeFrameSize(buffer, (byte) (bytes / minFrameSize));
         return (ByteBuffer) buffer.clear();
@@ -74,8 +75,8 @@
             buffer.position(pos);
 
             if (newSizeInBytes / minFrameSize > FrameConstants.MAX_NUM_MINFRAME) {
-                throw new HyracksDataException("Unable to allocate frame of size bigger than MinFrameSize * "
-                        + FrameConstants.MAX_NUM_MINFRAME);
+                throw new HyracksDataException("Unable to allocate frame of size bigger than: "
+                        + FrameConstants.MAX_NUM_MINFRAME * minFrameSize + " bytes");
             }
             FrameHelper.serializeFrameSize(buffer, (byte) (newSizeInBytes / minFrameSize));
             return buffer;
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameFixedFieldAppender.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameFixedFieldAppender.java
index 21a7a71..e62e9e7 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameFixedFieldAppender.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameFixedFieldAppender.java
@@ -49,18 +49,22 @@
         leftOverSize = 0;
     }
 
+    /**
+     * Reset frame states and copy the left over data into the new frame
+     *
+     * @param frame
+     * @throws HyracksDataException
+     */
+    public void resetWithLeftOverData(IFrame frame) throws HyracksDataException {
+        super.reset(frame, true);
+        copyLeftOverDataFromeBufferToFrame();
+    }
+
     @Override
     public void write(IFrameWriter outWriter, boolean clearFrame) throws HyracksDataException {
         super.write(outWriter, clearFrame);
         if (clearFrame) {
-            if (leftOverSize > 0) {
-                if (!canHoldNewTuple(0, leftOverSize)) {
-                    throw new HyracksDataException(
-                            "The given frame can not be extended to insert the leftover data from the last record");
-                }
-                System.arraycopy(cachedLeftOverFields, 0, array, tupleDataEndOffset, leftOverSize);
-                leftOverSize = 0;
-            }
+            copyLeftOverDataFromeBufferToFrame();
         }
     }
 
@@ -85,13 +89,13 @@
             return true;
         } else {
             if (currentField > 0) {
-                copyLeftOverData();
+                copyLeftOverDataFromFrameToBuffer();
             }
             return false;
         }
     }
 
-    private void copyLeftOverData() {
+    private void copyLeftOverDataFromFrameToBuffer() {
         leftOverSize = lastFieldEndOffset + fieldCount * 4;
         if (cachedLeftOverFields == null || cachedLeftOverFields.length < leftOverSize) {
             cachedLeftOverFields = new byte[leftOverSize];
@@ -99,6 +103,17 @@
         System.arraycopy(array, tupleDataEndOffset, cachedLeftOverFields, 0, leftOverSize);
     }
 
+    private void copyLeftOverDataFromeBufferToFrame() throws HyracksDataException {
+        if (leftOverSize > 0) {
+            if (!canHoldNewTuple(0, leftOverSize)) {
+                throw new HyracksDataException(
+                        "The given frame can not be extended to insert the leftover data from the last record");
+            }
+            System.arraycopy(cachedLeftOverFields, 0, array, tupleDataEndOffset, leftOverSize);
+            leftOverSize = 0;
+        }
+    }
+
     public boolean appendField(IFrameTupleAccessor fta, int tIndex, int fIndex) throws HyracksDataException {
         int startOffset = fta.getTupleStartOffset(tIndex);
         int fStartOffset = fta.getFieldStartOffset(tIndex, fIndex);
diff --git a/hyracks/hyracks-dataflow-std/pom.xml b/hyracks/hyracks-dataflow-std/pom.xml
index 949ea38..b01789c 100644
--- a/hyracks/hyracks-dataflow-std/pom.xml
+++ b/hyracks/hyracks-dataflow-std/pom.xml
@@ -17,52 +17,52 @@
  ! under the License.
  !-->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  <artifactId>hyracks-dataflow-std</artifactId>
-  <name>hyracks-dataflow-std</name>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>hyracks-dataflow-std</artifactId>
+    <name>hyracks-dataflow-std</name>
 
-  <parent>
-    <groupId>org.apache.hyracks</groupId>
-    <artifactId>hyracks</artifactId>
-    <version>0.2.17-SNAPSHOT</version>
-  </parent>
+    <parent>
+        <groupId>org.apache.hyracks</groupId>
+        <artifactId>hyracks</artifactId>
+        <version>0.2.17-SNAPSHOT</version>
+    </parent>
 
-  <licenses>
-    <license>
-      <name>Apache License, Version 2.0</name>
-      <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
-      <distribution>repo</distribution>
-      <comments>A business-friendly OSS license</comments>
-    </license>
-  </licenses>
+    <licenses>
+        <license>
+            <name>Apache License, Version 2.0</name>
+            <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+            <distribution>repo</distribution>
+            <comments>A business-friendly OSS license</comments>
+        </license>
+    </licenses>
 
 
-
-  <dependencies>
-  	<dependency>
-  		<groupId>org.apache.hyracks</groupId>
-  		<artifactId>hyracks-api</artifactId>
-  		<version>0.2.17-SNAPSHOT</version>
-  		<type>jar</type>
-  		<scope>compile</scope>
-  	</dependency>
-  	<dependency>
-  		<groupId>org.apache.hyracks</groupId>
-  		<artifactId>hyracks-dataflow-common</artifactId>
-  		<version>0.2.17-SNAPSHOT</version>
-  		<type>jar</type>
-  		<scope>compile</scope>
-  	</dependency>
-  	<dependency>
-  		<groupId>commons-io</groupId>
-  		<artifactId>commons-io</artifactId>
-  	</dependency>
-      <dependency>
-          <groupId>org.apache.hyracks</groupId>
-          <artifactId>hyracks-control-nc</artifactId>
-          <version>0.2.17-SNAPSHOT</version>
-          <scope>test</scope>
-      </dependency>
-  </dependencies>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.hyracks</groupId>
+            <artifactId>hyracks-api</artifactId>
+            <version>0.2.17-SNAPSHOT</version>
+            <type>jar</type>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hyracks</groupId>
+            <artifactId>hyracks-dataflow-common</artifactId>
+            <version>0.2.17-SNAPSHOT</version>
+            <type>jar</type>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hyracks</groupId>
+            <artifactId>hyracks-control-nc</artifactId>
+            <version>0.2.17-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
 </project>
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/intersect/IntersectOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/intersect/IntersectOperatorDescriptor.java
new file mode 100644
index 0000000..98807eb
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/intersect/IntersectOperatorDescriptor.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.intersect;
+
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.ActivityId;
+import org.apache.hyracks.api.dataflow.IActivity;
+import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.std.base.AbstractActivityNode;
+import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputOperatorNodePushable;
+
+/**
+ * This intersection operator is to get the common elements from multiple way inputs.
+ * It will only produce the projected fields which are used for comparison.
+ */
+public class IntersectOperatorDescriptor extends AbstractOperatorDescriptor {
+
+    private final int[][] projectFields;
+    private final INormalizedKeyComputerFactory firstKeyNormalizerFactory;
+    private final IBinaryComparatorFactory[] comparatorFactory;
+
+    /**
+     * @param spec
+     * @param nInputs                   Number of inputs
+     * @param compareAndProjectFields   The project field list of each input.
+     *                                  All the fields order should be the same with the comparatorFactories
+     * @param firstKeyNormalizerFactory Normalizer for the first comparison key.
+     * @param comparatorFactories       A list of comparators for each field
+     * @param recordDescriptor
+     * @throws HyracksException
+     */
+    public IntersectOperatorDescriptor(IOperatorDescriptorRegistry spec, int nInputs, int[][] compareAndProjectFields,
+            INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+            RecordDescriptor recordDescriptor) throws HyracksException {
+        super(spec, nInputs, 1);
+        recordDescriptors[0] = recordDescriptor;
+
+        validateParameters(compareAndProjectFields, comparatorFactories);
+
+        this.projectFields = compareAndProjectFields;
+        this.firstKeyNormalizerFactory = firstKeyNormalizerFactory;
+        this.comparatorFactory = comparatorFactories;
+    }
+
+    private void validateParameters(int[][] compareAndProjectFields, IBinaryComparatorFactory[] comparatorFactories)
+            throws HyracksException {
+
+        int firstLength = compareAndProjectFields[0].length;
+        for (int[] fields : compareAndProjectFields) {
+            if (fields.length != firstLength) {
+                throw new HyracksException("The given input comparison fields is not equal");
+            }
+            for (int fid : fields) {
+                if (fid < 0) {
+                    throw new HyracksException("Invalid field index in given comparison fields array");
+                }
+            }
+        }
+
+        if (firstLength != comparatorFactories.length) {
+            throw new HyracksException("The size of given fields is not equal with the number of comparators");
+        }
+    }
+
+    @Override
+    public void contributeActivities(IActivityGraphBuilder builder) {
+        IActivity intersectActivity = new IntersectActivity(new ActivityId(getOperatorId(), 0));
+        builder.addActivity(this, intersectActivity);
+        for (int i = 0; i < getInputArity(); i++) {
+            builder.addSourceEdge(i, intersectActivity, i);
+        }
+        builder.addTargetEdge(0, intersectActivity, 0);
+    }
+
+    private class IntersectActivity extends AbstractActivityNode {
+
+        public IntersectActivity(ActivityId activityId) {
+            super(activityId);
+        }
+
+        @Override
+        public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+                IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
+                throws HyracksDataException {
+            RecordDescriptor[] inputRecordDesc = new RecordDescriptor[inputArity];
+            for (int i = 0; i < inputRecordDesc.length; i++) {
+                inputRecordDesc[i] = recordDescProvider.getInputRecordDescriptor(getActivityId(), i);
+            }
+            return new IntersectOperatorNodePushable(ctx, inputArity, inputRecordDesc, projectFields,
+                    firstKeyNormalizerFactory, comparatorFactory);
+        }
+    }
+
+    public static class IntersectOperatorNodePushable extends AbstractUnaryOutputOperatorNodePushable {
+
+        private enum ACTION {FAILED, CLOSE}
+
+        private final int inputArity;
+        private final int[][] projectFields;
+        private final BitSet consumed;
+        private final int[] tupleIndexMarker;
+        private final FrameTupleAccessor[] refAccessor;
+        private final FrameTupleAppender appender;
+
+        private final INormalizedKeyComputer firstKeyNormalizerComputer;
+        private final IBinaryComparator[] comparators;
+
+        private boolean done = false;
+
+        public IntersectOperatorNodePushable(IHyracksTaskContext ctx, int inputArity,
+                RecordDescriptor[] inputRecordDescriptors, int[][] projectFields,
+                INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactory)
+                throws HyracksDataException {
+
+            this.inputArity = inputArity;
+            this.projectFields = projectFields;
+            this.firstKeyNormalizerComputer =
+                    firstKeyNormalizerFactory == null ? null : firstKeyNormalizerFactory.createNormalizedKeyComputer();
+
+            comparators = new IBinaryComparator[projectFields[0].length];
+            for (int i = 0; i < comparators.length; i++) {
+                comparators[i] = comparatorFactory[i].createBinaryComparator();
+            }
+
+            appender = new FrameTupleAppender(new VSizeFrame(ctx));
+
+            refAccessor = new FrameTupleAccessor[inputArity];
+            for (int i = 0; i < inputArity; i++) {
+                refAccessor[i] = new FrameTupleAccessor(inputRecordDescriptors[i]);
+            }
+
+            consumed = new BitSet(inputArity);
+            consumed.set(0, inputArity);
+            tupleIndexMarker = new int[inputArity];
+        }
+
+        @Override
+        public int getInputArity() {
+            return inputArity;
+        }
+
+        @Override
+        public IFrameWriter getInputFrameWriter(final int index) {
+            return new IFrameWriter() {
+                @Override
+                public void open() throws HyracksDataException {
+                    if (index == 0) {
+                        writer.open();
+                    }
+                }
+
+                @Override
+                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                    synchronized (IntersectOperatorNodePushable.this) {
+                        if (done) {
+                            return;
+                        }
+                        refAccessor[index].reset(buffer);
+                        tupleIndexMarker[index] = 0;
+                        consumed.clear(index);
+                        if (index != 0) {
+                            if (allInputArrived()) {
+                                IntersectOperatorNodePushable.this.notifyAll();
+                            }
+                            while (!consumed.get(index) && !done) {
+                                waitOrHyracksException();
+                            }
+                        } else { //(index == 0)
+                            while (!consumed.get(0)) {
+                                while (!allInputArrived() && !done) {
+                                    waitOrHyracksException();
+                                }
+                                if (done) {
+                                    break;
+                                }
+                                intersectAllInputs();
+                                IntersectOperatorNodePushable.this.notifyAll();
+                            }
+                        }
+                    }
+                }
+
+                private void waitOrHyracksException() throws HyracksDataException {
+                    try {
+                        IntersectOperatorNodePushable.this.wait();
+                    } catch (InterruptedException e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+
+                private boolean allInputArrived() {
+                    return consumed.cardinality() == 0;
+                }
+
+                private void intersectAllInputs() throws HyracksDataException {
+                    do {
+                        int maxInput = findMaxInput();
+                        int match = 1;
+                        boolean needToUpdateMax = false;
+                        for (int i = 0; i < inputArity; i++) {
+                            if (i == maxInput) {
+                                continue;
+                            }
+                            while (tupleIndexMarker[i] < refAccessor[i].getTupleCount()) {
+                                int cmp = compare(i, refAccessor[i], tupleIndexMarker[i], maxInput,
+                                        refAccessor[maxInput], tupleIndexMarker[maxInput]);
+                                if (cmp == 0) {
+                                    match++;
+                                    break;
+                                } else if (cmp < 0) {
+                                    tupleIndexMarker[i]++;
+                                } else {
+                                    needToUpdateMax = true;
+                                    break;
+                                }
+                            }
+
+                            if (tupleIndexMarker[i] >= refAccessor[i].getTupleCount()) {
+                                consumed.set(i);
+                            }
+                        }
+                        if (match == inputArity) {
+                            FrameUtils.appendProjectionToWriter(writer, appender, refAccessor[maxInput],
+                                    tupleIndexMarker[maxInput], projectFields[maxInput]);
+                            for (int i = 0; i < inputArity; i++) {
+                                tupleIndexMarker[i]++;
+                                if (tupleIndexMarker[i] >= refAccessor[i].getTupleCount()) {
+                                    consumed.set(i);
+                                }
+                            }
+                        } else if (needToUpdateMax) {
+                            tupleIndexMarker[maxInput]++;
+                            if (tupleIndexMarker[maxInput] >= refAccessor[maxInput].getTupleCount()) {
+                                consumed.set(maxInput);
+                            }
+                        }
+
+                    } while (consumed.nextSetBit(0) < 0);
+                    appender.write(writer, true);
+                }
+
+                private int compare(int input1, FrameTupleAccessor frameTupleAccessor1, int tid1, int input2,
+                        FrameTupleAccessor frameTupleAccessor2, int tid2) throws HyracksDataException {
+                    int firstNorm1 = getFirstNorm(input1, frameTupleAccessor1, tid1);
+                    int firstNorm2 = getFirstNorm(input2, frameTupleAccessor2, tid2);
+
+                    if (firstNorm1 < firstNorm2) {
+                        return -1;
+                    } else if (firstNorm1 > firstNorm2) {
+                        return 1;
+                    }
+
+                    for (int i = 0; i < comparators.length; i++) {
+                        int cmp = comparators[i].compare(frameTupleAccessor1.getBuffer().array(),
+                                frameTupleAccessor1.getAbsoluteFieldStartOffset(tid1, projectFields[input1][i]),
+                                frameTupleAccessor1.getFieldLength(tid1, projectFields[input1][i]),
+                                frameTupleAccessor2.getBuffer().array(),
+                                frameTupleAccessor2.getAbsoluteFieldStartOffset(tid2, projectFields[input2][i]),
+                                frameTupleAccessor2.getFieldLength(tid2, projectFields[input2][i]));
+
+                        if (cmp != 0) {
+                            return cmp;
+                        }
+                    }
+                    return 0;
+                }
+
+                private int getFirstNorm(int inputId1, FrameTupleAccessor frameTupleAccessor1, int tid1) {
+                    return firstKeyNormalizerComputer == null ?
+                            0 :
+                            firstKeyNormalizerComputer.normalize(frameTupleAccessor1.getBuffer().array(),
+                                    frameTupleAccessor1.getAbsoluteFieldStartOffset(tid1, projectFields[inputId1][0]),
+                                    frameTupleAccessor1.getFieldLength(tid1, projectFields[inputId1][0]));
+                }
+
+                private int findMaxInput() throws HyracksDataException {
+                    int max = 0;
+                    for (int i = 1; i < inputArity; i++) {
+                        int cmp = compare(max, refAccessor[max], tupleIndexMarker[max], i, refAccessor[i],
+                                tupleIndexMarker[i]);
+                        if (cmp < 0) {
+                            max = i;
+                        }
+                    }
+                    return max;
+                }
+
+                @Override
+                public void fail() throws HyracksDataException {
+                    clearStateWith(ACTION.FAILED);
+                }
+
+                @Override
+                public void close() throws HyracksDataException {
+                    clearStateWith(ACTION.CLOSE);
+                }
+
+                private void clearStateWith(ACTION action) throws HyracksDataException {
+                    synchronized (IntersectOperatorNodePushable.this) {
+                        if (index == 0) {
+                            doAction(action);
+                        }
+                        if (done) {
+                            return;
+                        }
+                        consumed.set(index);
+                        refAccessor[index] = null;
+                        done = true;
+                        IntersectOperatorNodePushable.this.notifyAll();
+                    }
+                }
+
+                private void doAction(ACTION action) throws HyracksDataException {
+                    switch (action) {
+                        case CLOSE:
+                            writer.close();
+                            break;
+                        case FAILED:
+                            writer.fail();
+                            break;
+                    }
+                }
+
+            };
+        }
+    }
+}
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/IntersectOperatorDescriptorTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/IntersectOperatorDescriptorTest.java
new file mode 100644
index 0000000..26c83ab
--- /dev/null
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/IntersectOperatorDescriptorTest.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.tests.unit;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import org.apache.hyracks.data.std.primitive.IntegerPointable;
+import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import org.apache.hyracks.dataflow.std.intersect.IntersectOperatorDescriptor;
+import org.apache.hyracks.tests.util.InputFrameGenerator;
+import org.apache.hyracks.tests.util.MultiThreadTaskEmulator;
+import org.apache.hyracks.tests.util.OutputFrameVerifier;
+import org.apache.hyracks.test.support.TestUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+public class IntersectOperatorDescriptorTest {
+
+    IOperatorDescriptorRegistry mockRegistry = when(
+            mock(IOperatorDescriptorRegistry.class).createOperatorDescriptorId(any()))
+            .thenReturn(new OperatorDescriptorId(1)).getMock();
+    MultiThreadTaskEmulator multiThreadTaskEmulator = new MultiThreadTaskEmulator();
+    InputFrameGenerator frameGenerator = new InputFrameGenerator(256);
+    IHyracksTaskContext ctx = TestUtils.create(256);
+
+    int nInputs;
+    int nProjectFields;
+    int[][] compareFields;
+    RecordDescriptor[] inputRecordDescriptor;
+    INormalizedKeyComputerFactory normalizedKeyFactory;
+    IBinaryComparatorFactory[] comparatorFactory;
+    RecordDescriptor outRecordDescriptor;
+
+    protected void initializeParameters() {
+        compareFields = new int[nInputs][];
+
+        inputRecordDescriptor = new RecordDescriptor[nInputs];
+
+        normalizedKeyFactory = null;
+        comparatorFactory = new IBinaryComparatorFactory[] {
+                PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY),
+                PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY)
+        };
+
+        for (int i = 0; i < nInputs; i++) {
+            compareFields[i] = new int[nProjectFields];
+            for (int f = 0; f < nProjectFields; f++) {
+                compareFields[i][f] = f;
+            }
+        }
+        for (int i = 0; i < nInputs; i++) {
+            inputRecordDescriptor[i] = new RecordDescriptor(new ISerializerDeserializer[] {
+                    IntegerSerializerDeserializer.INSTANCE,
+                    IntegerSerializerDeserializer.INSTANCE,
+                    IntegerSerializerDeserializer.INSTANCE
+            });
+        }
+
+        outRecordDescriptor = new RecordDescriptor(new ISerializerDeserializer[] {
+                IntegerSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE
+        });
+    }
+
+    @Before
+    public void setUpInput() {
+        nInputs = 3;
+        nProjectFields = 2;
+        initializeParameters();
+    }
+
+    @Test
+    public void testNormalOperatorInitialization() throws HyracksException {
+
+        IntersectOperatorDescriptor operatorDescriptor = new IntersectOperatorDescriptor(mockRegistry, nInputs,
+                compareFields, normalizedKeyFactory, comparatorFactory, outRecordDescriptor);
+
+        assertEquals(nInputs, operatorDescriptor.getInputArity());
+    }
+
+    @Test
+    public void testCommonIntersect() throws Exception {
+        List<Object[]> answer = new ArrayList<>();
+        List<IFrame>[] inputFrames = new ArrayList[nInputs];
+        prepareCommonDataFrame(inputFrames, answer);
+        executeAndVerifyResult(inputFrames, answer);
+    }
+
+    @Test
+    public void testNullOutputIntersect() throws Exception {
+        List<Object[]> answer = new ArrayList<>();
+        List<IFrame>[] inputFrames = new ArrayList[nInputs];
+        prepareNullResultDataFrame(inputFrames, answer);
+        executeAndVerifyResult(inputFrames, answer);
+    }
+
+    @Test
+    public void testOneInputIsVeryShortIntersect() throws Exception {
+        List<Object[]> answer = new ArrayList<>();
+        List<IFrame>[] inputFrames = new ArrayList[nInputs];
+        prepareOneInputIsVeryShortDataFrame(inputFrames, answer);
+        executeAndVerifyResult(inputFrames, answer);
+    }
+
+    @Test
+    public void testAllSameInputIntersect() throws Exception {
+        List<Object[]> answer = new ArrayList<>();
+        List<IFrame>[] inputFrames = new ArrayList[nInputs];
+        prepareAllSameInputDataFrame(inputFrames, answer);
+        executeAndVerifyResult(inputFrames, answer);
+    }
+
+    @Test
+    public void testOnlyOneInputIntersect() throws Exception {
+        nInputs = 1;
+        initializeParameters();
+        List<Object[]> answer = new ArrayList<>();
+        List<IFrame>[] inputFrames = new ArrayList[nInputs];
+        prepareAllSameInputDataFrame(inputFrames, answer);
+        executeAndVerifyResult(inputFrames, answer);
+    }
+
+    private void executeAndVerifyResult(List<IFrame>[] inputFrames, List<Object[]> answer) throws Exception {
+        IntersectOperatorDescriptor.IntersectOperatorNodePushable pushable =
+                new IntersectOperatorDescriptor.IntersectOperatorNodePushable(ctx, nInputs, inputRecordDescriptor,
+                        compareFields, null, comparatorFactory);
+        assertEquals(nInputs, pushable.getInputArity());
+
+        IFrameWriter[] writers = new IFrameWriter[nInputs];
+        for (int i = 0; i < nInputs; i++) {
+            writers[i] = pushable.getInputFrameWriter(i);
+        }
+        IFrameWriter resultVerifier = new OutputFrameVerifier(outRecordDescriptor, answer);
+        pushable.setOutputFrameWriter(0, resultVerifier, outRecordDescriptor);
+        multiThreadTaskEmulator.runInParallel(writers, inputFrames);
+    }
+
+    protected void prepareCommonDataFrame(List<IFrame>[] inputFrames, List<Object[]> answer)
+            throws HyracksDataException {
+        for (int i = 0; i < nInputs; i++) {
+            List<Object[]> inputObjects = new ArrayList<>();
+            generateRecordStream(inputObjects, inputRecordDescriptor[i], i + 1, (i + 1) * 100, 1);
+            inputFrames[i] = frameGenerator.generateDataFrame(inputRecordDescriptor[i], inputObjects);
+        }
+        generateRecordStream(answer, outRecordDescriptor, nInputs, 100, 1);
+    }
+
+    protected void prepareNullResultDataFrame(List<IFrame>[] inputFrames, List<Object[]> answer)
+            throws HyracksDataException {
+        for (int i = 0; i < nInputs; i++) {
+            List<Object[]> inputObjects = new ArrayList<>();
+            generateRecordStream(inputObjects, inputRecordDescriptor[i], (i + 1) * 100, (i + 2) * 100, 1);
+            inputFrames[i] = frameGenerator.generateDataFrame(inputRecordDescriptor[i], inputObjects);
+        }
+    }
+
+    protected void prepareOneInputIsVeryShortDataFrame(List<IFrame>[] inputFrames, List<Object[]> answer)
+            throws HyracksDataException {
+        for (int i = 0; i < nInputs; i++) {
+            List<Object[]> inputObjects = new ArrayList<>();
+            generateRecordStream(inputObjects, inputRecordDescriptor[i], i, i * 100 + 1, 1);
+            inputFrames[i] = frameGenerator.generateDataFrame(inputRecordDescriptor[i], inputObjects);
+        }
+    }
+
+    protected void prepareAllSameInputDataFrame(List<IFrame>[] inputFrames, List<Object[]> answer)
+            throws HyracksDataException {
+        for (int i = 0; i < nInputs; i++) {
+            List<Object[]> inputObjects = new ArrayList<>();
+            generateRecordStream(inputObjects, inputRecordDescriptor[i], 0, 100, 1);
+            inputFrames[i] = frameGenerator.generateDataFrame(inputRecordDescriptor[i], inputObjects);
+        }
+        generateRecordStream(answer, outRecordDescriptor, 0, 100, 1);
+    }
+
+    private void generateRecordStream(List<Object[]> inputs, RecordDescriptor recordDesc,
+            int start, int end, int step) {
+        for (int i = start; i < end; i += step) {
+            Object[] obj = new Object[recordDesc.getFieldCount()];
+            for (int f = 0; f < recordDesc.getFieldCount(); f++) {
+                obj[f] = i;
+            }
+            inputs.add(obj);
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/InputFrameGenerator.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/InputFrameGenerator.java
new file mode 100644
index 0000000..d37bfd9
--- /dev/null
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/InputFrameGenerator.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.tests.util;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.control.nc.resources.memory.FrameManager;
+import org.apache.hyracks.dataflow.common.comm.io.FrameFixedFieldAppender;
+
+public class InputFrameGenerator {
+
+    protected final FrameManager manager;
+
+    public InputFrameGenerator(int initialFrameSize) {
+        manager = new FrameManager(initialFrameSize);
+    }
+
+    public List<IFrame> generateDataFrame(RecordDescriptor recordDescriptor, List<Object[]> listOfObject)
+            throws HyracksDataException {
+        List<IFrame> listFrame = new ArrayList<>();
+        VSizeFrame frame = new VSizeFrame(manager);
+        FrameFixedFieldAppender appender = new FrameFixedFieldAppender(recordDescriptor.getFieldCount());
+        appender.reset(frame, true);
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream(manager.getInitialFrameSize());
+        DataOutputStream ds = new DataOutputStream(baos);
+        for (Object[] objs : listOfObject) {
+            for (int i = 0; i < recordDescriptor.getFieldCount(); i++) {
+                baos.reset();
+                recordDescriptor.getFields()[i].serialize(objs[i], ds);
+                if (!appender.appendField(baos.toByteArray(), 0, baos.size())) {
+                    listFrame.add(frame);
+                    frame = new VSizeFrame(manager);
+                    appender.resetWithLeftOverData(frame);
+                    if (!appender.appendField(baos.toByteArray(), 0, baos.size())) {
+                        throw new HyracksDataException("Should never happen!");
+                    }
+                }
+            }
+        }
+        listFrame.add(frame);
+        return listFrame;
+    }
+
+}
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/MultiThreadTaskEmulator.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/MultiThreadTaskEmulator.java
new file mode 100644
index 0000000..022d3a2
--- /dev/null
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/MultiThreadTaskEmulator.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.tests.util;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.IFrameWriter;
+
+public class MultiThreadTaskEmulator {
+
+    private ExecutorService executor;
+
+    public MultiThreadTaskEmulator() {
+        this.executor = Executors.newCachedThreadPool((r) -> {
+            Thread t = new Thread(r);
+            t.setDaemon(true);
+            return t;
+        });
+    }
+
+    public void runInParallel(final IFrameWriter[] writers, final List<IFrame>[] inputFrames) throws Exception {
+        final Semaphore sem = new Semaphore(writers.length - 1);
+        List<Exception> exceptions = Collections.synchronizedList(new ArrayList<>());
+        for (int i = 1; i < writers.length; i++) {
+            sem.acquire();
+            final IFrameWriter writer = writers[i];
+            final List<IFrame> inputFrame = inputFrames[i];
+            executor.execute(() -> {
+                executeOneWriter(writer, inputFrame, exceptions);
+                sem.release();
+            });
+        }
+
+        final IFrameWriter writer = writers[0];
+        final List<IFrame> inputFrame = inputFrames[0];
+        executeOneWriter(writer, inputFrame, exceptions);
+        sem.acquire(writers.length - 1);
+
+        for (int i = 0; i < exceptions.size(); i++) {
+            exceptions.get(i).printStackTrace();
+            if (i == exceptions.size() - 1) {
+                throw exceptions.get(i);
+            }
+        }
+    }
+
+    private void executeOneWriter(IFrameWriter writer, List<IFrame> inputFrame, List<Exception> exceptions) {
+        try {
+            try {
+                writer.open();
+                for (IFrame frame : inputFrame) {
+                    writer.nextFrame(frame.getBuffer());
+                }
+            } catch (Exception ex) {
+                writer.fail();
+                throw ex;
+            } finally {
+                writer.close();
+            }
+        } catch (Exception e) {
+            exceptions.add(e);
+        }
+    }
+}
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/OutputFrameVerifier.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/OutputFrameVerifier.java
new file mode 100644
index 0000000..77b6913
--- /dev/null
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/OutputFrameVerifier.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.tests.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+public class OutputFrameVerifier implements IFrameWriter {
+
+    private final RecordDescriptor inputRecordDescriptor;
+    private final List<Object[]> answerList;
+    private final FrameTupleAccessor frameAccessor;
+    private int offset;
+    private boolean failed;
+
+    public OutputFrameVerifier(RecordDescriptor inputRecordDescriptor, List<Object[]> answerList) {
+        this.inputRecordDescriptor = inputRecordDescriptor;
+        this.frameAccessor = new FrameTupleAccessor(inputRecordDescriptor);
+        this.answerList = answerList;
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        this.offset = 0;
+        this.failed = false;
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        frameAccessor.reset(buffer);
+        for (int tid = 0; tid < frameAccessor.getTupleCount(); tid++) {
+            Object[] objects = new Object[inputRecordDescriptor.getFieldCount()];
+            for (int fid = 0; fid < inputRecordDescriptor.getFieldCount(); fid++) {
+                ByteArrayInputStream bais = new ByteArrayInputStream(frameAccessor.getBuffer().array(),
+                        frameAccessor.getAbsoluteFieldStartOffset(tid, fid),
+                        frameAccessor.getFieldLength(tid, fid));
+                DataInputStream dis = new DataInputStream(bais);
+                objects[fid] = inputRecordDescriptor.getFields()[fid].deserialize(dis);
+            }
+            if (offset >= answerList.size()) {
+                throw new HyracksDataException(
+                        "The number of given results is more than expected size:" + answerList.size());
+            }
+            Object[] expected = answerList.get(offset);
+            for (int i = 0; i < expected.length; i++) {
+                if (!expected[i].equals(objects[i])) {
+                    throw new HyracksDataException(
+                            "The result object: " + objects[i] + " is different from the expected one:" + expected[i]);
+                }
+            }
+            offset++;
+        }
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        this.failed = true;
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        if (offset < answerList.size()) {
+            throw new HyracksDataException(
+                    "The number of given results:" + offset + " is less than expected size:" + answerList.size());
+        }
+    }
+
+    public boolean isFailed() {
+        return failed;
+    }
+}
diff --git a/hyracks/hyracks-test-support/pom.xml b/hyracks/hyracks-test-support/pom.xml
index f0ec043..404885f 100644
--- a/hyracks/hyracks-test-support/pom.xml
+++ b/hyracks/hyracks-test-support/pom.xml
@@ -37,7 +37,6 @@
     </license>
   </licenses>
 
-
   <dependencies>
     <dependency>
       <groupId>junit</groupId>