Added constraints to the operator JSON.
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
index f0e8510..a0d07a4 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
@@ -18,6 +18,7 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -28,6 +29,8 @@
import org.json.JSONObject;
import edu.uci.ics.hyracks.api.constraints.Constraint;
+import edu.uci.ics.hyracks.api.constraints.expressions.PartitionCountExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.PartitionLocationExpression;
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
@@ -38,7 +41,7 @@
public class JobSpecification implements Serializable, IOperatorDescriptorRegistry, IConnectorDescriptorRegistry {
private static final long serialVersionUID = 1L;
-
+
private static final int DEFAULT_FRAME_SIZE = 32768;
private final List<OperatorDescriptorId> roots;
@@ -80,7 +83,7 @@
public JobSpecification() {
this(DEFAULT_FRAME_SIZE);
}
-
+
public JobSpecification(int frameSize) {
roots = new ArrayList<OperatorDescriptorId>();
resultSetIds = new ArrayList<ResultSetId>();
@@ -322,12 +325,39 @@
return buffer.toString();
}
+ @SuppressWarnings("incomplete-switch")
public JSONObject toJSON() throws JSONException {
JSONObject jjob = new JSONObject();
JSONArray jopArray = new JSONArray();
for (Map.Entry<OperatorDescriptorId, IOperatorDescriptor> e : opMap.entrySet()) {
- jopArray.put(e.getValue().toJSON());
+ JSONObject op = e.getValue().toJSON();
+ if (!userConstraints.isEmpty()) {
+ // Add operator partition constraints to each JSON operator.
+ JSONObject pleObject = new JSONObject();
+ Iterator<Constraint> test = userConstraints.iterator();
+ while (test.hasNext()) {
+ Constraint constraint = test.next();
+ switch (constraint.getLValue().getTag()) {
+ case PARTITION_COUNT:
+ PartitionCountExpression pce = (PartitionCountExpression) constraint.getLValue();
+ if (e.getKey() == pce.getOperatorDescriptorId()) {
+ op.put("partition-count", constraint.getRValue().toString());
+ }
+ break;
+ case PARTITION_LOCATION:
+ PartitionLocationExpression ple = (PartitionLocationExpression) constraint.getLValue();
+ if (e.getKey() == ple.getOperatorDescriptorId()) {
+ pleObject.put(Integer.toString(ple.getPartition()), constraint.getRValue().toString());
+ }
+ break;
+ }
+ }
+ if (pleObject.length() > 0) {
+ op.put("partition-location", pleObject);
+ }
+ }
+ jopArray.put(op);
}
jjob.put("operators", jopArray);