add sender side materialzing pipelining policy for load dataset/index jobs
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_stabilization@126 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
index d179eb6..3666251 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
@@ -43,6 +43,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IEvaluatorFactory;
import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.jobgen.impl.ConnectorPolicyAssignmentPolicy;
import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.jobgen.impl.JobGenHelper;
import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.std.AssignRuntimeFactory;
@@ -211,7 +212,7 @@
splitsAndConstraint.second);
spec.connect(new OneToOneConnectorDescriptor(spec), asterixOp, 0, bulkLoad, 0);
-
+ spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
spec.addRoot(bulkLoad);
} catch (AlgebricksException e) {
throw new AsterixException(e);
@@ -355,7 +356,8 @@
}
spec.addRoot(btreeBulkLoad);
-
+ spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+
jobSpecs.add(new Job(spec));
return jobSpecs;
}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
index f9ac244..5e6ffa1 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
@@ -30,6 +30,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IEvaluatorFactory;
import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.jobgen.impl.ConnectorPolicyAssignmentPolicy;
import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.jobgen.impl.JobGenHelper;
import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.std.AssignRuntimeFactory;
@@ -330,7 +331,7 @@
spec.addRoot(secondaryBulkLoadOp);
// ---------- END CONNECT THE OPERATORS
-
+ spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
return spec;
}
@@ -550,7 +551,7 @@
spec.addRoot(secondaryBulkLoadOp);
// ---------- END CONNECT THE OPERATORS
-
+ spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
return spec;
}
@@ -806,7 +807,7 @@
spec.addRoot(secondaryBulkLoadOp);
// ---------- END CONNECT THE OPERATORS
-
+ spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
return spec;
}