ASTERIXDB-1343: support heterogeneity of computation nodes and storage nodes.

Change-Id: Ic21d8da2cd457aa17cc9861c0b92ac5960978e03
Reviewed-on: https://asterix-gerrit.ics.uci.edu/718
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java b/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
index 0d899b9..1d3a55c 100644
--- a/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
+++ b/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.algebricks.compiler.api;
 
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
@@ -51,11 +52,11 @@
                 IExpressionEvalSizeComputer expressionEvalSizeComputer,
                 IMergeAggregationExpressionFactory mergeAggregationExpressionFactory,
                 IExpressionTypeComputer expressionTypeComputer, INullableTypeComputer nullableTypeComputer,
-                PhysicalOptimizationConfig physicalOptimizationConfig) {
+                PhysicalOptimizationConfig physicalOptimizationConfig, AlgebricksPartitionConstraint clusterLocations) {
             LogicalOperatorPrettyPrintVisitor prettyPrintVisitor = new LogicalOperatorPrettyPrintVisitor();
             return new AlgebricksOptimizationContext(varCounter, expressionEvalSizeComputer,
                     mergeAggregationExpressionFactory, expressionTypeComputer, nullableTypeComputer,
-                    physicalOptimizationConfig, prettyPrintVisitor);
+                    physicalOptimizationConfig, clusterLocations, prettyPrintVisitor);
         }
     }
 
@@ -77,7 +78,7 @@
                     int varCounter) {
                 final IOptimizationContext oc = optCtxFactory.createOptimizationContext(varCounter,
                         expressionEvalSizeComputer, mergeAggregationExpressionFactory, expressionTypeComputer,
-                        nullableTypeComputer, physicalOptimizationConfig);
+                        nullableTypeComputer, physicalOptimizationConfig, clusterLocations);
                 oc.setMetadataDeclarations(metadata);
                 final HeuristicOptimizer opt = new HeuristicOptimizer(plan, logicalRewrites, physicalRewrites, oc);
                 return new ICompiler() {
@@ -92,13 +93,13 @@
                             IJobletEventListenerFactory jobEventListenerFactory) throws AlgebricksException {
                         AlgebricksConfig.ALGEBRICKS_LOGGER.fine("Starting Job Generation.\n");
                         JobGenContext context = new JobGenContext(null, metadata, appContext,
-                                serializerDeserializerProvider, hashFunctionFactoryProvider,
-                                hashFunctionFamilyProvider, comparatorFactoryProvider, typeTraitProvider,
-                                binaryBooleanInspectorFactory, binaryIntegerInspectorFactory, printerProvider,
-                                nullWriterFactory, normalizedKeyComputerFactoryProvider, expressionRuntimeProvider,
-                                expressionTypeComputer, nullableTypeComputer, oc, expressionEvalSizeComputer,
-                                partialAggregationTypeComputer, predEvaluatorFactoryProvider,
-                                physicalOptimizationConfig.getFrameSize(), clusterLocations);
+                                serializerDeserializerProvider, hashFunctionFactoryProvider, hashFunctionFamilyProvider,
+                                comparatorFactoryProvider, typeTraitProvider, binaryBooleanInspectorFactory,
+                                binaryIntegerInspectorFactory, printerProvider, nullWriterFactory,
+                                normalizedKeyComputerFactoryProvider, expressionRuntimeProvider, expressionTypeComputer,
+                                nullableTypeComputer, oc, expressionEvalSizeComputer, partialAggregationTypeComputer,
+                                predEvaluatorFactoryProvider, physicalOptimizationConfig.getFrameSize(),
+                                clusterLocations);
 
                         PlanCompiler pc = new PlanCompiler(context);
                         return pc.compilePlan(plan, null, jobEventListenerFactory);
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
index 9d68fae..707a7db 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
@@ -22,7 +22,6 @@
 import java.util.Map;
 
 import org.apache.commons.lang3.mutable.Mutable;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
@@ -66,7 +65,7 @@
 
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context,
             IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
-            throws AlgebricksException;
+                    throws AlgebricksException;
 
     // variables
 
@@ -88,7 +87,8 @@
      * @return for each child, one vector of required physical properties
      */
 
-    public PhysicalRequirements getRequiredPhysicalPropertiesForChildren(IPhysicalPropertiesVector requiredProperties);
+    public PhysicalRequirements getRequiredPhysicalPropertiesForChildren(IPhysicalPropertiesVector requiredProperties,
+            IOptimizationContext context);
 
     /**
      * @return the physical properties that this operator delivers, based on
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
index 6badac7..328697d 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
@@ -28,6 +28,7 @@
 import org.apache.hyracks.algebricks.core.algebra.prettyprint.LogicalOperatorPrettyPrintVisitor;
 import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
 import org.apache.hyracks.algebricks.core.algebra.properties.ILogicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
 import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
@@ -35,52 +36,54 @@
 public interface IOptimizationContext extends ITypingContext, IVariableContext {
 
     @Override
-    public abstract IMetadataProvider<?, ?> getMetadataProvider();
+    public IMetadataProvider<?, ?> getMetadataProvider();
 
-    public abstract void setMetadataDeclarations(IMetadataProvider<?, ?> metadataProvider);
+    public void setMetadataDeclarations(IMetadataProvider<?, ?> metadataProvider);
 
-    public abstract boolean checkIfInDontApplySet(IAlgebraicRewriteRule rule, ILogicalOperator op);
+    public boolean checkIfInDontApplySet(IAlgebraicRewriteRule rule, ILogicalOperator op);
 
-    public abstract void addToDontApplySet(IAlgebraicRewriteRule rule, ILogicalOperator op);
+    public void addToDontApplySet(IAlgebraicRewriteRule rule, ILogicalOperator op);
 
     /*
      * returns true if op1 and op2 have already been compared
      */
-    public abstract boolean checkAndAddToAlreadyCompared(ILogicalOperator op1, ILogicalOperator op2);
+    public boolean checkAndAddToAlreadyCompared(ILogicalOperator op1, ILogicalOperator op2);
 
-    public abstract void removeFromAlreadyCompared(ILogicalOperator op1);
+    public void removeFromAlreadyCompared(ILogicalOperator op1);
 
-    public abstract void addNotToBeInlinedVar(LogicalVariable var);
+    public void addNotToBeInlinedVar(LogicalVariable var);
 
-    public abstract boolean shouldNotBeInlined(LogicalVariable var);
+    public boolean shouldNotBeInlined(LogicalVariable var);
 
-    public abstract void addPrimaryKey(FunctionalDependency pk);
+    public void addPrimaryKey(FunctionalDependency pk);
 
-    public abstract List<LogicalVariable> findPrimaryKey(LogicalVariable var);
+    public List<LogicalVariable> findPrimaryKey(LogicalVariable var);
 
-    public abstract void putEquivalenceClassMap(ILogicalOperator op, Map<LogicalVariable, EquivalenceClass> eqClassMap);
+    public void putEquivalenceClassMap(ILogicalOperator op, Map<LogicalVariable, EquivalenceClass> eqClassMap);
 
-    public abstract Map<LogicalVariable, EquivalenceClass> getEquivalenceClassMap(ILogicalOperator op);
+    public Map<LogicalVariable, EquivalenceClass> getEquivalenceClassMap(ILogicalOperator op);
 
-    public abstract void putFDList(ILogicalOperator op, List<FunctionalDependency> fdList);
+    public void putFDList(ILogicalOperator op, List<FunctionalDependency> fdList);
 
-    public abstract List<FunctionalDependency> getFDList(ILogicalOperator op);
+    public List<FunctionalDependency> getFDList(ILogicalOperator op);
 
     public void clearAllFDAndEquivalenceClasses();
 
-    public abstract void putLogicalPropertiesVector(ILogicalOperator op, ILogicalPropertiesVector v);
+    public void putLogicalPropertiesVector(ILogicalOperator op, ILogicalPropertiesVector v);
 
-    public abstract ILogicalPropertiesVector getLogicalPropertiesVector(ILogicalOperator op);
+    public ILogicalPropertiesVector getLogicalPropertiesVector(ILogicalOperator op);
 
-    public abstract IExpressionEvalSizeComputer getExpressionEvalSizeComputer();
+    public IExpressionEvalSizeComputer getExpressionEvalSizeComputer();
 
-    public abstract IVariableEvalSizeEnvironment getVariableEvalSizeEnvironment();
+    public IVariableEvalSizeEnvironment getVariableEvalSizeEnvironment();
 
-    public abstract IMergeAggregationExpressionFactory getMergeAggregationExpressionFactory();
+    public IMergeAggregationExpressionFactory getMergeAggregationExpressionFactory();
 
-    public abstract PhysicalOptimizationConfig getPhysicalOptimizationConfig();
+    public PhysicalOptimizationConfig getPhysicalOptimizationConfig();
 
-    public abstract void updatePrimaryKeys(Map<LogicalVariable, LogicalVariable> mappedVars);
+    public void updatePrimaryKeys(Map<LogicalVariable, LogicalVariable> mappedVars);
 
-    public abstract LogicalOperatorPrettyPrintVisitor getPrettyPrintVisitor();
+    public LogicalOperatorPrettyPrintVisitor getPrettyPrintVisitor();
+
+    public INodeDomain getComputationNodeDomain();
 }
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java
index a3ef5d9..8c0ab2f 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java
@@ -35,10 +35,12 @@
      * @param reqdByParent
      *            parent's requirements, which are not enforced for now, as we
      *            only explore one plan
+     * @param context
+     *            the optimization context
      * @return for each child, one vector of required physical properties
      */
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent);
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context);
 
     /**
      * @return the physical properties that this operator delivers, based on
@@ -51,7 +53,7 @@
 
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
             IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
-            throws AlgebricksException;
+                    throws AlgebricksException;
 
     public void disableJobGenBelowMe();
 
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
index 5e9eef6..2410ca0 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
@@ -102,8 +102,8 @@
 
     @Override
     public final PhysicalRequirements getRequiredPhysicalPropertiesForChildren(
-            IPhysicalPropertiesVector requiredProperties) {
-        return physicalOperator.getRequiredPropertiesForChildren(this, requiredProperties);
+            IPhysicalPropertiesVector requiredProperties, IOptimizationContext context) {
+        return physicalOperator.getRequiredPropertiesForChildren(this, requiredProperties, context);
     }
 
     /**
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
index 09d2253..7077014 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
@@ -86,7 +86,7 @@
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator iop,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         StructuralPropertiesVector[] pv = new StructuralPropertiesVector[2];
         // In a cost-based optimizer, we would also try to propagate the
         // parent's partitioning requirements.
@@ -97,12 +97,14 @@
         if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) {
             switch (partitioningType) {
                 case PAIRWISE: {
-                    pp1 = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(keysLeftBranch), null);
-                    pp2 = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(keysRightBranch), null);
+                    pp1 = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(keysLeftBranch),
+                            context.getComputationNodeDomain());
+                    pp2 = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(keysRightBranch),
+                            context.getComputationNodeDomain());
                     break;
                 }
                 case BROADCAST: {
-                    pp2 = new BroadcastPartitioningProperty(null);
+                    pp2 = new BroadcastPartitioningProperty(context.getComputationNodeDomain());
                     break;
                 }
                 default: {
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
index 2c93cd4..2d00e4c 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
@@ -138,7 +138,7 @@
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
         List<ILocalStructuralProperty> localProps = null;
 
@@ -233,7 +233,8 @@
         IPartitioningProperty pp = null;
         AbstractLogicalOperator aop = (AbstractLogicalOperator) op;
         if (aop.getExecutionMode() == ExecutionMode.PARTITIONED) {
-            pp = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(columnList), null);
+            pp = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(columnList),
+                    context.getComputationNodeDomain());
         }
         pv[0] = new StructuralPropertiesVector(pp, localProps);
         return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractScanPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractScanPOperator.java
index 2ab24d2..5159ac5 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractScanPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractScanPOperator.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.algebricks.core.algebra.operators.physical;
 
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
 import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
 
@@ -26,7 +27,7 @@
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         return emptyUnaryRequirements();
     }
 
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
index 4b13645..3759956 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
@@ -69,7 +69,7 @@
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator iop,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         AbstractLogicalOperator op = (AbstractLogicalOperator) iop;
         if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) {
             if (orderProp == null) {
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java
index 2a2eb1e..3242fa0 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java
@@ -72,7 +72,7 @@
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         AggregateOperator aggOp = (AggregateOperator) op;
         StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
         if (aggOp.isGlobal() && aggOp.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.UNPARTITIONED) {
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
index 5b6c40a..5aed63e 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
@@ -63,7 +63,7 @@
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         return emptyUnaryRequirements();
     }
 
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BroadcastPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BroadcastPOperator.java
index 03a8666..69bfe2a 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BroadcastPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BroadcastPOperator.java
@@ -61,7 +61,7 @@
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         return emptyUnaryRequirements();
     }
 
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java
index 8ac3271..dda5456 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java
@@ -72,7 +72,7 @@
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         List<LogicalVariable> scanVariables = new ArrayList<>();
         scanVariables.addAll(primaryKeys);
         scanVariables.add(new LogicalVariable(-1));
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
index 5823bc0..b3e8385 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
@@ -68,7 +68,7 @@
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         DistributeResultOperator write = (DistributeResultOperator) op;
         IDataSink sink = write.getDataSink();
         IPartitioningProperty pp = sink.getPartitioningProperty();
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/EmptyTupleSourcePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/EmptyTupleSourcePOperator.java
index 4e3a5bc..dcb7e15 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/EmptyTupleSourcePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/EmptyTupleSourcePOperator.java
@@ -56,7 +56,7 @@
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         return null;
     }
 
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
index b15ea0b..7772620 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
@@ -135,12 +135,12 @@
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         AbstractLogicalOperator aop = (AbstractLogicalOperator) op;
         if (aop.getExecutionMode() == ExecutionMode.PARTITIONED) {
             StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
-            pv[0] = new StructuralPropertiesVector(
-                    new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(columnSet), null), null);
+            pv[0] = new StructuralPropertiesVector(new UnorderedPartitionedProperty(
+                    new ListSet<LogicalVariable>(columnSet), context.getComputationNodeDomain()), null);
             return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
         } else {
             return emptyUnaryRequirements();
@@ -206,13 +206,16 @@
         }
 
         List<LogicalVariable> keyAndDecVariables = new ArrayList<LogicalVariable>();
-        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gby.getGroupByList())
+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gby.getGroupByList()) {
             keyAndDecVariables.add(p.first);
-        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gby.getDecorList())
+        }
+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gby.getDecorList()) {
             keyAndDecVariables.add(GroupByOperator.getDecorVariable(p));
+        }
 
-        for (LogicalVariable var : keyAndDecVariables)
+        for (LogicalVariable var : keyAndDecVariables) {
             aggOpInputEnv.setVarType(var, outputEnv.getVarType(var));
+        }
 
         compileSubplans(inputSchemas[0], gby, opSchema, context);
         IOperatorDescriptorRegistry spec = builder.getJobSpec();
@@ -236,10 +239,12 @@
         for (Object type : intermediateTypes) {
             aggOpInputEnv.setVarType(usedVars.get(i++), type);
         }
-        for (LogicalVariable keyVar : keyAndDecVariables)
+        for (LogicalVariable keyVar : keyAndDecVariables) {
             localInputSchemas[0].addVariable(keyVar);
-        for (LogicalVariable usedVar : usedVars)
+        }
+        for (LogicalVariable usedVar : usedVars) {
             localInputSchemas[0].addVariable(usedVar);
+        }
         for (i = 0; i < n; i++) {
             AggregateFunctionCallExpression mergeFun = (AggregateFunctionCallExpression) aggOp.getMergeExpressions()
                     .get(i).getValue();
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java
index 0ff1e47..34c707b 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java
@@ -76,7 +76,7 @@
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         return emptyUnaryRequirements();
     }
 
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java
index 21be272..17322b6 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java
@@ -100,7 +100,7 @@
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         List<ILocalStructuralProperty> orderProps = new LinkedList<ILocalStructuralProperty>();
         List<OrderColumn> columns = new ArrayList<OrderColumn>();
         for (OrderColumn oc : orderColumns) {
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java
index b837bfa..50ab6aa 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java
@@ -83,7 +83,7 @@
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         List<LogicalVariable> scanVariables = new ArrayList<>();
         scanVariables.addAll(primaryKeys);
         scanVariables.add(new LogicalVariable(-1));
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
index 4702361..f29fd6f 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
@@ -87,7 +87,7 @@
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         List<LogicalVariable> scanVariables = new ArrayList<LogicalVariable>();
         scanVariables.addAll(primaryKeys);
         scanVariables.add(new LogicalVariable(-1));
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
index 2e4b647..f0bd603 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
@@ -80,7 +80,7 @@
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         List<LogicalVariable> scanVariables = new ArrayList<LogicalVariable>();
         scanVariables.addAll(keys);
         scanVariables.add(new LogicalVariable(-1));
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java
index b6d0f1f..5f43c3e 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java
@@ -21,7 +21,9 @@
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
@@ -40,8 +42,8 @@
 import org.apache.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
 import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
 import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
-import org.apache.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty;
 import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
 import org.apache.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
@@ -61,7 +63,7 @@
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator iop,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         IntersectOperator intersectOp = (IntersectOperator) iop;
         StructuralPropertiesVector[] pv = new StructuralPropertiesVector[intersectOp.getNumInput()];
         for (int i = 0; i < intersectOp.getNumInput(); i++) {
@@ -73,7 +75,8 @@
             localProps.add(new LocalOrderProperty(orderColumns));
             IPartitioningProperty pp = null;
             if (intersectOp.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) {
-                pp = new RandomPartitioningProperty(null);
+                Set<LogicalVariable> partitioningVariables = new HashSet<>(intersectOp.getInputVariables(i));
+                pp = new UnorderedPartitionedProperty(partitioningVariables, null);
             }
             pv[i] = new StructuralPropertiesVector(pp, localProps);
         }
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MaterializePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MaterializePOperator.java
index a154d91..c55a4ae 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MaterializePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MaterializePOperator.java
@@ -55,7 +55,7 @@
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         return emptyUnaryRequirements();
     }
 
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
index 7ff15d7..2622ae3 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
@@ -113,13 +113,14 @@
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         if (partitioningType != JoinPartitioningType.BROADCAST) {
             throw new NotImplementedException(partitioningType + " nested loop joins are not implemented.");
         }
 
         StructuralPropertiesVector[] pv = new StructuralPropertiesVector[2];
-        pv[0] = new StructuralPropertiesVector(new BroadcastPartitioningProperty(null), null);
+        pv[0] = new StructuralPropertiesVector(new BroadcastPartitioningProperty(context.getComputationNodeDomain()),
+                null);
         pv[1] = new StructuralPropertiesVector(null, null);
         return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
     }
@@ -225,10 +226,11 @@
             }
             boolean result = binaryBooleanInspector.getBooleanValue(p.getByteArray(), p.getStartOffset(),
                     p.getLength());
-            if (result)
+            if (result) {
                 return 0;
-            else
+            } else {
                 return 1;
+            }
         }
     }
 
@@ -256,28 +258,31 @@
         @Override
         public byte[] getFieldData(int fIdx) {
             int leftFieldCount = refLeft.getFieldCount();
-            if (fIdx < leftFieldCount)
+            if (fIdx < leftFieldCount) {
                 return refLeft.getFieldData(fIdx);
-            else
+            } else {
                 return refRight.getFieldData(fIdx - leftFieldCount);
+            }
         }
 
         @Override
         public int getFieldStart(int fIdx) {
             int leftFieldCount = refLeft.getFieldCount();
-            if (fIdx < leftFieldCount)
+            if (fIdx < leftFieldCount) {
                 return refLeft.getFieldStart(fIdx);
-            else
+            } else {
                 return refRight.getFieldStart(fIdx - leftFieldCount);
+            }
         }
 
         @Override
         public int getFieldLength(int fIdx) {
             int leftFieldCount = refLeft.getFieldCount();
-            if (fIdx < leftFieldCount)
+            if (fIdx < leftFieldCount) {
                 return refLeft.getFieldLength(fIdx);
-            else
+            } else {
                 return refRight.getFieldLength(fIdx - leftFieldCount);
+            }
         }
 
         @Override
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java
index 7535c82..e8cc05f 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java
@@ -89,7 +89,7 @@
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         return null;
     }
 
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/OneToOneExchangePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/OneToOneExchangePOperator.java
index 27a5357..818e1ec 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/OneToOneExchangePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/OneToOneExchangePOperator.java
@@ -51,7 +51,7 @@
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         return emptyUnaryRequirements();
     }
 
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java
index df65906..fe11f64 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java
@@ -78,7 +78,7 @@
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
         List<ILocalStructuralProperty> localProps = new ArrayList<ILocalStructuralProperty>();
         List<OrderColumn> orderColumns = new ArrayList<OrderColumn>();
@@ -89,7 +89,8 @@
         IPartitioningProperty pp = null;
         AbstractLogicalOperator aop = (AbstractLogicalOperator) op;
         if (aop.getExecutionMode() == ExecutionMode.PARTITIONED) {
-            pp = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(columnList), null);
+            pp = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(columnList),
+                    context.getComputationNodeDomain());
         }
         pv[0] = new StructuralPropertiesVector(pp, localProps);
         return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
@@ -98,7 +99,7 @@
     @Override
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
             IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
-            throws AlgebricksException {
+                    throws AlgebricksException {
 
         IOperatorDescriptorRegistry spec = builder.getJobSpec();
         int keys[] = JobGenHelper.variablesToFieldIndexes(columnList, inputSchemas[0]);
@@ -119,11 +120,11 @@
             keysAndDecs[i + keys.length] = fdColumns[i];
         }
 
-        IBinaryComparatorFactory[] comparatorFactories = JobGenHelper.variablesToAscBinaryComparatorFactories(
-                columnList, context.getTypeEnvironment(op), context);
+        IBinaryComparatorFactory[] comparatorFactories = JobGenHelper
+                .variablesToAscBinaryComparatorFactories(columnList, context.getTypeEnvironment(op), context);
         IAggregateEvaluatorFactory[] aggFactories = new IAggregateEvaluatorFactory[] {};
-        IAggregatorDescriptorFactory aggregatorFactory = new SimpleAlgebricksAccumulatingAggregatorFactory(
-                aggFactories, keysAndDecs);
+        IAggregatorDescriptorFactory aggregatorFactory = new SimpleAlgebricksAccumulatingAggregatorFactory(aggFactories,
+                keysAndDecs);
 
         RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema,
                 context);
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomMergeExchangePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomMergeExchangePOperator.java
index bfe8b36..e11a64f 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomMergeExchangePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomMergeExchangePOperator.java
@@ -51,7 +51,7 @@
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         return emptyUnaryRequirements();
     }
 
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java
index af0087d..7237d24 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java
@@ -85,7 +85,7 @@
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         return emptyUnaryRequirements();
     }
 
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergePOperator.java
index d47c31f..1f70f2a 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergePOperator.java
@@ -105,7 +105,7 @@
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         List<ILocalStructuralProperty> orderProps = new LinkedList<ILocalStructuralProperty>();
         List<OrderColumn> columns = new ArrayList<OrderColumn>();
         for (OrderColumn oc : partitioningFields) {
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java
index dca3d97..9046ce2 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java
@@ -85,7 +85,7 @@
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         return emptyUnaryRequirements();
     }
 
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java
index 19fb5d4..14a8f16 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java
@@ -50,7 +50,7 @@
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         return emptyUnaryRequirements();
     }
 
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
index 2c842e3..8e4ca18 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
@@ -63,7 +63,7 @@
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         return emptyUnaryRequirements();
     }
 
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java
index 6246ad2..71acecf 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java
@@ -75,7 +75,7 @@
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
       return emptyUnaryRequirements(op.getInputs().size());
     }
 
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
index 0b100ee..35f9444 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
@@ -67,7 +67,7 @@
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         WriteOperator write = (WriteOperator) op;
         IDataSink sink = write.getDataSink();
         IPartitioningProperty pp = sink.getPartitioningProperty();
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java
index 7465711..c08ff85 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java
@@ -138,7 +138,7 @@
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         return emptyUnaryRequirements();
     }
 
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortMergeExchangePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortMergeExchangePOperator.java
index f264284..81f6e6b 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortMergeExchangePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortMergeExchangePOperator.java
@@ -121,7 +121,7 @@
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         List<ILocalStructuralProperty> localProps = new ArrayList<ILocalStructuralProperty>(sortColumns.length);
         localProps.add(new LocalOrderProperty(Arrays.asList(sortColumns)));
         StructuralPropertiesVector[] r = new StructuralPropertiesVector[] { new StructuralPropertiesVector(null,
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java
index f47e671..99be356 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java
@@ -71,7 +71,7 @@
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         AbstractLogicalOperator limitOp = (AbstractLogicalOperator) op;
         if (limitOp.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.UNPARTITIONED) {
             StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java
index 326172f..184cbbc 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java
@@ -49,7 +49,7 @@
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         return emptyUnaryRequirements();
     }
 
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamSelectPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamSelectPOperator.java
index 2c40b3b..9b35922 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamSelectPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StreamSelectPOperator.java
@@ -54,7 +54,7 @@
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         return emptyUnaryRequirements();
     }
 
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StringStreamingScriptPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StringStreamingScriptPOperator.java
index bd46230..1f5159d 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StringStreamingScriptPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/StringStreamingScriptPOperator.java
@@ -49,7 +49,7 @@
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         return emptyUnaryRequirements();
     }
 
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java
index aa5672e..9fc0dd4 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java
@@ -77,7 +77,7 @@
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         return emptyUnaryRequirements();
     }
 
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java
index 98427fe..557a657 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java
@@ -66,7 +66,7 @@
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         return emptyUnaryRequirements();
     }
 
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
index 2c407b7..e6517d0 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
@@ -64,7 +64,7 @@
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         StructuralPropertiesVector pv0 = StructuralPropertiesVector.EMPTY_PROPERTIES_VECTOR;
         StructuralPropertiesVector pv1 = StructuralPropertiesVector.EMPTY_PROPERTIES_VECTOR;
         return new PhysicalRequirements(new StructuralPropertiesVector[] { pv0, pv1 },
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WriteResultPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WriteResultPOperator.java
index 9c8ddc3..7ec1914 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WriteResultPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WriteResultPOperator.java
@@ -80,7 +80,7 @@
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
         List<LogicalVariable> scanVariables = new ArrayList<LogicalVariable>();
         scanVariables.addAll(keys);
         scanVariables.add(new LogicalVariable(-1));
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/DefaultNodeGroupDomain.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/DefaultNodeGroupDomain.java
index 908dcc1..719c70e 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/DefaultNodeGroupDomain.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/DefaultNodeGroupDomain.java
@@ -18,26 +18,53 @@
  */
 package org.apache.hyracks.algebricks.core.algebra.properties;
 
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint.PartitionConstraintType;
+
 public class DefaultNodeGroupDomain implements INodeDomain {
 
-    private String groupName;
+    private List<String> nodes = new ArrayList<>();
 
-    public DefaultNodeGroupDomain(String groupName) {
-        this.groupName = groupName;
+    public DefaultNodeGroupDomain(List<String> nodes) {
+        this.nodes.addAll(nodes);
+    }
+
+    public DefaultNodeGroupDomain(DefaultNodeGroupDomain domain) {
+        this.nodes.addAll(domain.nodes);
+    }
+
+    public DefaultNodeGroupDomain(AlgebricksPartitionConstraint clusterLocations) {
+        if (clusterLocations.getPartitionConstraintType() == PartitionConstraintType.ABSOLUTE) {
+            AlgebricksAbsolutePartitionConstraint absPc = (AlgebricksAbsolutePartitionConstraint) clusterLocations;
+            String[] locations = absPc.getLocations();
+            for (String location : locations) {
+                nodes.add(location);
+            }
+        } else {
+            throw new IllegalStateException("A node domain can only take absolute location constraints.");
+        }
     }
 
     @Override
     public boolean sameAs(INodeDomain domain) {
-        return true;
+        if (!(domain instanceof DefaultNodeGroupDomain)) {
+            return false;
+        }
+        DefaultNodeGroupDomain nodeDomain = (DefaultNodeGroupDomain) domain;
+        return nodes.equals(nodeDomain.nodes);
     }
 
     @Override
     public String toString() {
-        return "AsterixDomain(" + groupName + ")";
+        return nodes.toString();
     }
 
     @Override
     public Integer cardinality() {
-        return null;
+        return nodes.size();
     }
 }
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPhysicalPropertiesVector.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPhysicalPropertiesVector.java
index d872491..e298691 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPhysicalPropertiesVector.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPhysicalPropertiesVector.java
@@ -31,7 +31,6 @@
     public List<ILocalStructuralProperty> getLocalProperties();
 
     /**
-     *
      * @param reqd
      *            vector of required properties
      * @param equivalenceClasses
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java
index 5808da1..44df740 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java
@@ -83,8 +83,8 @@
 
     @Override
     public void substituteColumnVars(Map<LogicalVariable, LogicalVariable> varMap) {
-        for (OrderColumn orderColumn : orderColumns){
-            if (varMap.containsKey(orderColumn.getColumn())){
+        for (OrderColumn orderColumn : orderColumns) {
+            if (varMap.containsKey(orderColumn.getColumn())) {
                 orderColumn.setColumn(varMap.get(orderColumn.getColumn()));
             }
         }
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
index 711c0e7..0b8d759 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
@@ -55,9 +55,8 @@
         return k;
     }
 
-    public static boolean matchLocalProperties(List<ILocalStructuralProperty> reqd,
-            List<ILocalStructuralProperty> dlvd, Map<LogicalVariable, EquivalenceClass> equivalenceClasses,
-            List<FunctionalDependency> fds) {
+    public static boolean matchLocalProperties(List<ILocalStructuralProperty> reqd, List<ILocalStructuralProperty> dlvd,
+            Map<LogicalVariable, EquivalenceClass> equivalenceClasses, List<FunctionalDependency> fds) {
         if (reqd == null) {
             return true;
         }
@@ -118,7 +117,7 @@
             boolean mayExpandProperties) {
         INodeDomain dom1 = reqd.getNodeDomain();
         INodeDomain dom2 = dlvd.getNodeDomain();
-        if (dom1 != null && dom2 != null && !dom1.sameAs(dom2)) {
+        if (!dom1.sameAs(dom2)) {
             return false;
         }
 
@@ -132,10 +131,11 @@
                     case UNORDERED_PARTITIONED: {
                         UnorderedPartitionedProperty ur = (UnorderedPartitionedProperty) reqd;
                         UnorderedPartitionedProperty ud = (UnorderedPartitionedProperty) dlvd;
-                        if (mayExpandProperties)
+                        if (mayExpandProperties) {
                             return (!ud.getColumnSet().isEmpty() && ur.getColumnSet().containsAll(ud.getColumnSet()));
-                        else
+                        } else {
                             return (ud.getColumnSet().equals(ur.getColumnSet()));
+                        }
                     }
                     case ORDERED_PARTITIONED: {
                         UnorderedPartitionedProperty ur = (UnorderedPartitionedProperty) reqd;
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/StructuralPropertiesVector.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/StructuralPropertiesVector.java
index 7ae7e62..2cad1cb 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/StructuralPropertiesVector.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/StructuralPropertiesVector.java
@@ -33,7 +33,8 @@
     public static final StructuralPropertiesVector EMPTY_PROPERTIES_VECTOR = new StructuralPropertiesVector(null,
             new ArrayList<ILocalStructuralProperty>());
 
-    public StructuralPropertiesVector(IPartitioningProperty propPartitioning, List<ILocalStructuralProperty> propsLocal) {
+    public StructuralPropertiesVector(IPartitioningProperty propPartitioning,
+            List<ILocalStructuralProperty> propsLocal) {
         this.propPartitioning = propPartitioning;
         this.propsLocal = propsLocal;
     }
@@ -63,7 +64,6 @@
     }
 
     /**
-     *
      * @param reqd
      *            vector of required properties
      * @return a vector of properties from pvector that are not delivered by the
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/UnorderedPartitionedProperty.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/UnorderedPartitionedProperty.java
index 17e0cb3..d6a364a 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/UnorderedPartitionedProperty.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/UnorderedPartitionedProperty.java
@@ -67,8 +67,8 @@
 
     @Override
     public void substituteColumnVars(Map<LogicalVariable, LogicalVariable> varMap) {
-        for (Map.Entry<LogicalVariable, LogicalVariable> var : varMap.entrySet()){
-            if (columnSet.remove(var.getKey())){
+        for (Map.Entry<LogicalVariable, LogicalVariable> var : varMap.entrySet()) {
+            if (columnSet.remove(var.getKey())) {
                 columnSet.add(var.getValue());
             }
         }
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
index ac2ae5c..4671a71 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
@@ -228,19 +228,13 @@
             }
         }
         if (partitionConstraintMap.get(opDesc) == null) {
-            if (opConstraint == null) {
-                if (parentOp != null) {
-                    AlgebricksPartitionConstraint pc = partitionConstraintMap.get(parentOp);
-                    if (pc != null) {
-                        opConstraint = pc;
-                    } else if ((opInputs == null || opInputs.size() == 0) && finalPass) {
-                        opConstraint = new AlgebricksCountPartitionConstraint(1);
-                    }
-                }
-                if (opConstraint == null && finalPass) {
-                    opConstraint = clusterLocations;
-                }
+            if (finalPass && opConstraint == null && (opInputs == null || opInputs.size() == 0)) {
+                opConstraint = new AlgebricksCountPartitionConstraint(1);
             }
+            if (finalPass && opConstraint == null) {
+                opConstraint = clusterLocations;
+            }
+            // Sets up the location constraint.
             if (opConstraint != null) {
                 partitionConstraintMap.put(opDesc, opConstraint);
                 AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, opDesc, opConstraint);
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
index e76b486..154f4a1 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
@@ -24,6 +24,7 @@
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.EquivalenceClass;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -37,8 +38,10 @@
 import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
 import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
 import org.apache.hyracks.algebricks.core.algebra.prettyprint.LogicalOperatorPrettyPrintVisitor;
+import org.apache.hyracks.algebricks.core.algebra.properties.DefaultNodeGroupDomain;
 import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
 import org.apache.hyracks.algebricks.core.algebra.properties.ILogicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
 
 @SuppressWarnings({ "unchecked", "rawtypes" })
 public class AlgebricksOptimizationContext implements IOptimizationContext {
@@ -77,20 +80,22 @@
     protected final Map<ILogicalOperator, ILogicalPropertiesVector> logicalProps = new HashMap<ILogicalOperator, ILogicalPropertiesVector>();
     private final IExpressionTypeComputer expressionTypeComputer;
     private final INullableTypeComputer nullableTypeComputer;
+    private final INodeDomain defaultNodeDomain;
     private final LogicalOperatorPrettyPrintVisitor prettyPrintVisitor;
 
     public AlgebricksOptimizationContext(int varCounter, IExpressionEvalSizeComputer expressionEvalSizeComputer,
             IMergeAggregationExpressionFactory mergeAggregationExpressionFactory,
             IExpressionTypeComputer expressionTypeComputer, INullableTypeComputer nullableTypeComputer,
-            PhysicalOptimizationConfig physicalOptimizationConfig) {
+            PhysicalOptimizationConfig physicalOptimizationConfig, AlgebricksPartitionConstraint clusterLocations) {
         this(varCounter, expressionEvalSizeComputer, mergeAggregationExpressionFactory, expressionTypeComputer,
-                nullableTypeComputer, physicalOptimizationConfig, new LogicalOperatorPrettyPrintVisitor());
+                nullableTypeComputer, physicalOptimizationConfig, clusterLocations,
+                new LogicalOperatorPrettyPrintVisitor());
     }
 
     public AlgebricksOptimizationContext(int varCounter, IExpressionEvalSizeComputer expressionEvalSizeComputer,
             IMergeAggregationExpressionFactory mergeAggregationExpressionFactory,
             IExpressionTypeComputer expressionTypeComputer, INullableTypeComputer nullableTypeComputer,
-            PhysicalOptimizationConfig physicalOptimizationConfig,
+            PhysicalOptimizationConfig physicalOptimizationConfig, AlgebricksPartitionConstraint clusterLocations,
             LogicalOperatorPrettyPrintVisitor prettyPrintVisitor) {
         this.varCounter = varCounter;
         this.expressionEvalSizeComputer = expressionEvalSizeComputer;
@@ -98,6 +103,7 @@
         this.expressionTypeComputer = expressionTypeComputer;
         this.nullableTypeComputer = nullableTypeComputer;
         this.physicalOptimizationConfig = physicalOptimizationConfig;
+        this.defaultNodeDomain = new DefaultNodeGroupDomain(clusterLocations);
         this.prettyPrintVisitor = prettyPrintVisitor;
     }
 
@@ -316,6 +322,11 @@
     }
 
     @Override
+    public INodeDomain getComputationNodeDomain() {
+        return defaultNodeDomain;
+    }
+
+    @Override
     public LogicalOperatorPrettyPrintVisitor getPrettyPrintVisitor() {
         return prettyPrintVisitor;
     }
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IOptimizationContextFactory.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IOptimizationContextFactory.java
index 996b988..ce77942 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IOptimizationContextFactory.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IOptimizationContextFactory.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.algebricks.core.rewriter.base;
 
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionEvalSizeComputer;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionTypeComputer;
@@ -29,5 +30,5 @@
             IExpressionEvalSizeComputer expressionEvalSizeComputer,
             IMergeAggregationExpressionFactory mergeAggregationExpressionFactory,
             IExpressionTypeComputer expressionTypeComputer, INullableTypeComputer nullableTypeComputer,
-            PhysicalOptimizationConfig physicalOptimizationConfig);
+            PhysicalOptimizationConfig physicalOptimizationConfig, AlgebricksPartitionConstraint clusterLocations);
 }
diff --git a/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/compiler/PigletCompiler.java b/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/compiler/PigletCompiler.java
index ae4f628..b5751fa 100644
--- a/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/compiler/PigletCompiler.java
+++ b/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/compiler/PigletCompiler.java
@@ -29,7 +29,7 @@
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
-
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.compiler.api.HeuristicCompilerFactoryBuilder;
@@ -145,13 +145,14 @@
             }
         });
         builder.setTypeTraitProvider(new ITypeTraitProvider() {
+            @Override
             public ITypeTraits getTypeTrait(Object type) {
                 return null;
             }
         });
         builder.setPrinterProvider(PigletPrinterFactoryProvider.INSTANCE);
-        builder.setExpressionRuntimeProvider(new LogicalExpressionJobGenToExpressionRuntimeProviderAdapter(
-                new PigletExpressionJobGen()));
+        builder.setExpressionRuntimeProvider(
+                new LogicalExpressionJobGenToExpressionRuntimeProviderAdapter(new PigletExpressionJobGen()));
         builder.setExpressionTypeComputer(new IExpressionTypeComputer() {
             @Override
             public Object getType(ILogicalExpression expr, IMetadataProvider<?, ?> metadataProvider,
@@ -159,6 +160,7 @@
                 return null;
             }
         });
+        builder.setClusterLocations(new AlgebricksAbsolutePartitionConstraint(new String[] { "nc1", "nc2" }));
         cFactory = builder.create();
         metadataProvider = new PigletMetadataProvider();
     }
@@ -237,9 +239,8 @@
                 }
                 PigletFileDataSource ds = new PigletFileDataSource(file, types.toArray());
                 rel.op = new DataSourceScanOperator(variables, ds);
-                rel.op.getInputs().add(
-                        new MutableObject<ILogicalOperator>(previousOp == null ? new EmptyTupleSourceOperator()
-                                : previousOp));
+                rel.op.getInputs().add(new MutableObject<ILogicalOperator>(
+                        previousOp == null ? new EmptyTupleSourceOperator() : previousOp));
                 return rel;
             }
 
@@ -250,8 +251,9 @@
                 Relation inputRel = findInputRelation(alias, symMap);
                 Pair<Relation, LogicalVariable> tempInput = translateScalarExpression(inputRel, conditionNode);
                 Relation rel = new Relation();
-                rel.op = new SelectOperator(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(
-                        tempInput.second)), false, null);
+                rel.op = new SelectOperator(
+                        new MutableObject<ILogicalExpression>(new VariableReferenceExpression(tempInput.second)), false,
+                        null);
                 rel.op.getInputs().add(new MutableObject<ILogicalOperator>(tempInput.first.op));
                 rel.schema.putAll(tempInput.first.schema);
                 return rel;
@@ -298,7 +300,8 @@
                 for (ASTNode a : arguments) {
                     Pair<Relation, LogicalVariable> argPair = translateScalarExpression(rel, (ExpressionNode) a);
                     rel = argPair.first;
-                    argExprs.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(argPair.second)));
+                    argExprs.add(
+                            new MutableObject<ILogicalExpression>(new VariableReferenceExpression(argPair.second)));
                 }
                 Relation outRel = new Relation();
                 outRel.schema.putAll(rel.schema);
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index 8bf1ad5..53f7dbf 100644
--- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -66,7 +66,6 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.StableSortPOperator;
 import org.apache.hyracks.algebricks.core.algebra.prettyprint.LogicalOperatorPrettyPrintVisitor;
 import org.apache.hyracks.algebricks.core.algebra.prettyprint.PlanPrettyPrinter;
-import org.apache.hyracks.algebricks.core.algebra.properties.DefaultNodeGroupDomain;
 import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
 import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
 import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty.PropertyType;
@@ -93,8 +92,6 @@
 
 public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
 
-    private static final INodeDomain DEFAULT_DOMAIN = new DefaultNodeGroupDomain("__DEFAULT");
-
     private PhysicalOptimizationConfig physicalOptimizationConfig;
 
     @Override
@@ -127,7 +124,8 @@
 
         PhysicalOptimizationsUtil.computeFDsAndEquivalenceClasses(op, context);
 
-        StructuralPropertiesVector pvector = new StructuralPropertiesVector(new RandomPartitioningProperty(null),
+        StructuralPropertiesVector pvector = new StructuralPropertiesVector(
+                new RandomPartitioningProperty(context.getComputationNodeDomain()),
                 new LinkedList<ILocalStructuralProperty>());
         boolean changed = physOptimizeOp(opRef, pvector, false, context);
         op.computeDeliveredPhysicalProperties(context);
@@ -196,7 +194,7 @@
         boolean changed = false;
         AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
         optimizeUsingConstraintsAndEquivClasses(op);
-        PhysicalRequirements pr = op.getRequiredPhysicalPropertiesForChildren(required);
+        PhysicalRequirements pr = op.getRequiredPhysicalPropertiesForChildren(required, context);
         IPhysicalPropertiesVector[] reqdProperties = null;
         if (pr != null) {
             reqdProperties = pr.getRequiredProperties();
@@ -220,7 +218,7 @@
                 } else {
                     INodeDomain dom2 = delivered.getPartitioningProperty().getNodeDomain();
                     if (!childrenDomain.sameAs(dom2)) {
-                        childrenDomain = DEFAULT_DOMAIN;
+                        childrenDomain = context.getComputationNodeDomain();
                     }
                 }
                 j++;
@@ -443,7 +441,7 @@
                     .getDeliveredPhysicalProperties();
             addPartitioningEnforcers(op, childIndex, pp, required, deliveredByNewChild, domain, context);
         } else {
-            addPartitioningEnforcers(op, childIndex, pp, required, deliveredByChild, domain, context);
+            addPartitioningEnforcers(op, childIndex, pp, required, deliveredByChild, pp.getNodeDomain(), context);
             AbstractLogicalOperator newChild = (AbstractLogicalOperator) op.getInputs().get(childIndex).getValue();
             IPhysicalPropertiesVector newDiff = newPropertiesDiff(newChild, required, true, context);
             AlgebricksConfig.ALGEBRICKS_LOGGER.finest(">>>> New properties diff: " + newDiff + "\n");