ensure that feed alteration maintains ordering of location constraints and partitions
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
index 2ac657f..d7eed24 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
@@ -1,6 +1,8 @@
package edu.uci.ics.asterix.metadata.feeds;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -49,6 +51,11 @@
.getActivityType().equals(FeedActivityType.FEED_FAILURE)));
}
+ private static class LocationConstraint {
+ int partition;
+ String location;
+ }
+
public static JobSpecification alterJobSpecificationForFeed(JobSpecification spec,
FeedConnectionId feedConnectionId, FeedPolicy feedPolicy) {
@@ -128,7 +135,7 @@
}
// prepare for setting partition constraints
- Map<OperatorDescriptorId, List<String>> operatorLocations = new HashMap<OperatorDescriptorId, List<String>>();
+ Map<OperatorDescriptorId, List<LocationConstraint>> operatorLocations = new HashMap<OperatorDescriptorId, List<LocationConstraint>>();
Map<OperatorDescriptorId, Integer> operatorCounts = new HashMap<OperatorDescriptorId, Integer>();
for (Constraint constraint : spec.getUserConstraints()) {
@@ -146,23 +153,37 @@
break;
case PARTITION_LOCATION:
opId = ((PartitionLocationExpression) lexpr).getOperatorDescriptorId();
+
IOperatorDescriptor opDesc = altered.getOperatorMap().get(oldNewOID.get(opId));
- List<String> locations = operatorLocations.get(opDesc.getOperatorId());
+ List<LocationConstraint> locations = operatorLocations.get(opDesc.getOperatorId());
if (locations == null) {
- locations = new ArrayList<String>();
+ locations = new ArrayList<>();
operatorLocations.put(opDesc.getOperatorId(), locations);
}
String location = (String) ((ConstantExpression) cexpr).getValue();
- locations.add(location);
+ LocationConstraint lc = new LocationConstraint();
+ lc.location = location;
+ lc.partition = ((PartitionLocationExpression) lexpr).getPartition();
+ locations.add(lc);
break;
}
}
// set absolute location constraints
- for (Entry<OperatorDescriptorId, List<String>> entry : operatorLocations.entrySet()) {
+ for (Entry<OperatorDescriptorId, List<LocationConstraint>> entry : operatorLocations.entrySet()) {
IOperatorDescriptor opDesc = altered.getOperatorMap().get(oldNewOID.get(entry.getKey()));
- PartitionConstraintHelper.addAbsoluteLocationConstraint(altered, opDesc,
- entry.getValue().toArray(new String[] {}));
+ Collections.sort(entry.getValue(), new Comparator<LocationConstraint>() {
+
+ @Override
+ public int compare(LocationConstraint o1, LocationConstraint o2) {
+ return o1.partition - o2.partition;
+ }
+ });
+ String[] locations = new String[entry.getValue().size()];
+ for (int i = 0; i < locations.length; ++i) {
+ locations[i] = entry.getValue().get(i).location;
+ }
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(altered, opDesc, locations);
}
// set count constraints