Merge branch 'ecarm002/json_constraints'
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
index f0e8510..128978b 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
@@ -18,6 +18,7 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -28,6 +29,9 @@
import org.json.JSONObject;
import edu.uci.ics.hyracks.api.constraints.Constraint;
+import edu.uci.ics.hyracks.api.constraints.expressions.ConstantExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.PartitionCountExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.PartitionLocationExpression;
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
@@ -38,7 +42,7 @@
public class JobSpecification implements Serializable, IOperatorDescriptorRegistry, IConnectorDescriptorRegistry {
private static final long serialVersionUID = 1L;
-
+
private static final int DEFAULT_FRAME_SIZE = 32768;
private final List<OperatorDescriptorId> roots;
@@ -80,7 +84,7 @@
public JobSpecification() {
this(DEFAULT_FRAME_SIZE);
}
-
+
public JobSpecification(int frameSize) {
roots = new ArrayList<OperatorDescriptorId>();
resultSetIds = new ArrayList<ResultSetId>();
@@ -322,12 +326,44 @@
return buffer.toString();
}
+ @SuppressWarnings("incomplete-switch")
public JSONObject toJSON() throws JSONException {
JSONObject jjob = new JSONObject();
JSONArray jopArray = new JSONArray();
for (Map.Entry<OperatorDescriptorId, IOperatorDescriptor> e : opMap.entrySet()) {
- jopArray.put(e.getValue().toJSON());
+ JSONObject op = e.getValue().toJSON();
+ if (!userConstraints.isEmpty()) {
+ // Add operator partition constraints to each JSON operator.
+ JSONObject pcObject = new JSONObject();
+ JSONObject pleObject = new JSONObject();
+ Iterator<Constraint> test = userConstraints.iterator();
+ while (test.hasNext()) {
+ Constraint constraint = test.next();
+ switch (constraint.getLValue().getTag()) {
+ case PARTITION_COUNT:
+ PartitionCountExpression pce = (PartitionCountExpression) constraint.getLValue();
+ if (e.getKey() == pce.getOperatorDescriptorId()) {
+ pcObject.put("count", getConstraintExpressionRValue(constraint));
+ }
+ break;
+ case PARTITION_LOCATION:
+ PartitionLocationExpression ple = (PartitionLocationExpression) constraint.getLValue();
+ if (e.getKey() == ple.getOperatorDescriptorId()) {
+ pleObject.put(Integer.toString(ple.getPartition()),
+ getConstraintExpressionRValue(constraint));
+ }
+ break;
+ }
+ }
+ if (pleObject.length() > 0) {
+ pcObject.put("location", pleObject);
+ }
+ if (pcObject.length() > 0) {
+ op.put("partition-constraints", pcObject);
+ }
+ }
+ jopArray.put(op);
}
jjob.put("operators", jopArray);
@@ -349,4 +385,14 @@
return jjob;
}
+
+ private static String getConstraintExpressionRValue(Constraint constraint) {
+ switch (constraint.getRValue().getTag()) {
+ case CONSTANT:
+ ConstantExpression ce = (ConstantExpression) constraint.getRValue();
+ return ce.getValue().toString();
+ default:
+ return constraint.getRValue().toString();
+ }
+ }
}
\ No newline at end of file