ensure proper partitioning strategy for operators beneath datasourcescan
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
index 6285b96..fc926d2 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
@@ -75,15 +75,17 @@
         boolean change = false;
         switch (op.getOperatorTag()) {
             case DATASOURCESCAN: {
-                // ILogicalExpression e = ((UnnestOperator) op).getExpression();
-                // if (AnalysisUtil.isDataSetCall(e)) {
                 op.setExecutionMode(AbstractLogicalOperator.ExecutionMode.PARTITIONED);
-                AbstractLogicalOperator child = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
-                if (child.getOperatorTag() == LogicalOperatorTag.EMPTYTUPLESOURCE) {
+                AbstractLogicalOperator currentOp = op;
+                while (currentOp.getInputs().size() == 1) {
+                    AbstractLogicalOperator child = (AbstractLogicalOperator) currentOp.getInputs().get(0).getValue();
+                    if (child.getOperatorTag() == LogicalOperatorTag.EXCHANGE) {
+                        break;
+                    }
                     child.setExecutionMode(AbstractLogicalOperator.ExecutionMode.PARTITIONED);
+                    currentOp = child;
                 }
                 change = true;
-                // }
                 break;
             }
             case NESTEDTUPLESOURCE: {