add sender side materialzing pipelining policy for load dataset/index jobs

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_stabilization@126 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
index d179eb6..3666251 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
@@ -43,6 +43,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IEvaluatorFactory;
 import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.jobgen.impl.ConnectorPolicyAssignmentPolicy;
 import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.jobgen.impl.JobGenHelper;
 import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
 import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.std.AssignRuntimeFactory;
@@ -211,7 +212,7 @@
                     splitsAndConstraint.second);
 
             spec.connect(new OneToOneConnectorDescriptor(spec), asterixOp, 0, bulkLoad, 0);
-
+            spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
             spec.addRoot(bulkLoad);
         } catch (AlgebricksException e) {
             throw new AsterixException(e);
@@ -355,7 +356,8 @@
         }
 
         spec.addRoot(btreeBulkLoad);
-
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        
         jobSpecs.add(new Job(spec));
         return jobSpecs;
     }
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
index f9ac244..5e6ffa1 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
@@ -30,6 +30,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
 import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IEvaluatorFactory;
 import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.jobgen.impl.ConnectorPolicyAssignmentPolicy;
 import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.jobgen.impl.JobGenHelper;
 import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
 import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.std.AssignRuntimeFactory;
@@ -330,7 +331,7 @@
         spec.addRoot(secondaryBulkLoadOp);
 
         // ---------- END CONNECT THE OPERATORS
-
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
         return spec;
 
     }
@@ -550,7 +551,7 @@
         spec.addRoot(secondaryBulkLoadOp);
 
         // ---------- END CONNECT THE OPERATORS
-
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
         return spec;
 
     }
@@ -806,7 +807,7 @@
         spec.addRoot(secondaryBulkLoadOp);
 
         // ---------- END CONNECT THE OPERATORS
-
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
         return spec;
     }