[ASTERIXDB-2020][HYR] Set Default Number of Reattempts to 0
- user model changes: no
- storage format changes: no
- interface changes: no
details:
- Set Default Number of Reattempts to 0
Change-Id: I922611fca234a77aca0947f4571c4ffad008273a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1914
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Integration-Tests: Murtadha Hubail <mhubail@apache.org>
Tested-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
index a7d3864..2cf96c2 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
@@ -31,6 +31,7 @@
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.api.constraints.Constraint;
import org.apache.hyracks.api.constraints.expressions.ConstantExpression;
+import org.apache.hyracks.api.constraints.expressions.ConstraintExpression.ExpressionTag;
import org.apache.hyracks.api.constraints.expressions.PartitionCountExpression;
import org.apache.hyracks.api.constraints.expressions.PartitionLocationExpression;
import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
@@ -95,18 +96,18 @@
}
public JobSpecification(int frameSize) {
- roots = new ArrayList<OperatorDescriptorId>();
- resultSetIds = new ArrayList<ResultSetId>();
- opMap = new HashMap<OperatorDescriptorId, IOperatorDescriptor>();
- connMap = new HashMap<ConnectorDescriptorId, IConnectorDescriptor>();
- opInputMap = new HashMap<OperatorDescriptorId, List<IConnectorDescriptor>>();
- opOutputMap = new HashMap<OperatorDescriptorId, List<IConnectorDescriptor>>();
+ roots = new ArrayList<>();
+ resultSetIds = new ArrayList<>();
+ opMap = new HashMap<>();
+ connMap = new HashMap<>();
+ opInputMap = new HashMap<>();
+ opOutputMap = new HashMap<>();
connectorOpMap = new HashMap<>();
- properties = new HashMap<String, Serializable>();
- userConstraints = new HashSet<Constraint>();
+ properties = new HashMap<>();
+ userConstraints = new HashSet<>();
operatorIdCounter = 0;
connectorIdCounter = 0;
- maxReattempts = 2;
+ maxReattempts = 0;
useConnectorPolicyForScheduling = false;
requiredClusterCapacity = new ClusterCapacity();
setFrameSize(frameSize);
@@ -170,20 +171,20 @@
}
public RecordDescriptor getConnectorRecordDescriptor(IConnectorDescriptor conn) {
- Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connInfo = connectorOpMap
- .get(conn.getConnectorId());
+ Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connInfo =
+ connectorOpMap.get(conn.getConnectorId());
return connInfo.getLeft().getLeft().getOutputRecordDescriptors()[connInfo.getLeft().getRight()];
}
public IOperatorDescriptor getConsumer(IConnectorDescriptor conn) {
- Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connInfo = connectorOpMap
- .get(conn.getConnectorId());
+ Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connInfo =
+ connectorOpMap.get(conn.getConnectorId());
return connInfo.getRight().getLeft();
}
public int getConsumerInputIndex(IConnectorDescriptor conn) {
- Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connInfo = connectorOpMap
- .get(conn.getConnectorId());
+ Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connInfo =
+ connectorOpMap.get(conn.getConnectorId());
return connInfo.getRight().getRight();
}
@@ -224,14 +225,14 @@
}
public IOperatorDescriptor getProducer(IConnectorDescriptor conn) {
- Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connInfo = connectorOpMap
- .get(conn.getConnectorId());
+ Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connInfo =
+ connectorOpMap.get(conn.getConnectorId());
return connInfo.getLeft().getLeft();
}
public int getProducerOutputIndex(IConnectorDescriptor conn) {
- Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connInfo = connectorOpMap
- .get(conn.getConnectorId());
+ Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connInfo =
+ connectorOpMap.get(conn.getConnectorId());
return connInfo.getLeft().getRight();
}
@@ -310,7 +311,7 @@
private <K, V> void insertIntoIndexedMap(Map<K, List<V>> map, K key, int index, V value) {
List<V> vList = map.get(key);
if (vList == null) {
- vList = new ArrayList<V>();
+ vList = new ArrayList<>();
map.put(key, vList);
}
extend(vList, index);
@@ -346,7 +347,6 @@
return buffer.toString();
}
- @SuppressWarnings("incomplete-switch")
public ObjectNode toJSON() throws IOException {
ObjectMapper om = new ObjectMapper();
ObjectNode jjob = om.createObjectNode();
@@ -361,20 +361,18 @@
Iterator<Constraint> test = userConstraints.iterator();
while (test.hasNext()) {
Constraint constraint = test.next();
- switch (constraint.getLValue().getTag()) {
- case PARTITION_COUNT:
- PartitionCountExpression pce = (PartitionCountExpression) constraint.getLValue();
- if (e.getKey() == pce.getOperatorDescriptorId()) {
- pcObject.put("count", getConstraintExpressionRValue(constraint));
- }
- break;
- case PARTITION_LOCATION:
- PartitionLocationExpression ple = (PartitionLocationExpression) constraint.getLValue();
- if (e.getKey() == ple.getOperatorDescriptorId()) {
- pleObject.put(Integer.toString(ple.getPartition()),
- getConstraintExpressionRValue(constraint));
- }
- break;
+ ExpressionTag tag = constraint.getLValue().getTag();
+ if (tag == ExpressionTag.PARTITION_COUNT) {
+ PartitionCountExpression pce = (PartitionCountExpression) constraint.getLValue();
+ if (e.getKey() == pce.getOperatorDescriptorId()) {
+ pcObject.put("count", getConstraintExpressionRValue(constraint));
+ }
+ } else if (tag == ExpressionTag.PARTITION_LOCATION) {
+ PartitionLocationExpression ple = (PartitionLocationExpression) constraint.getLValue();
+ if (e.getKey() == ple.getOperatorDescriptorId()) {
+ pleObject.put(Integer.toString(ple.getPartition()),
+ getConstraintExpressionRValue(constraint));
+ }
}
}
if (pleObject.size() > 0) {
@@ -391,8 +389,8 @@
ArrayNode jcArray = om.createArrayNode();
for (Map.Entry<ConnectorDescriptorId, IConnectorDescriptor> e : connMap.entrySet()) {
ObjectNode conn = om.createObjectNode();
- Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connection = connectorOpMap
- .get(e.getKey());
+ Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connection =
+ connectorOpMap.get(e.getKey());
if (connection != null) {
conn.put("in-operator-id", connection.getLeft().getLeft().getOperatorId().toString());
conn.put("in-operator-port", connection.getLeft().getRight().intValue());
@@ -408,12 +406,11 @@
}
private static String getConstraintExpressionRValue(Constraint constraint) {
- switch (constraint.getRValue().getTag()) {
- case CONSTANT:
- ConstantExpression ce = (ConstantExpression) constraint.getRValue();
- return ce.getValue().toString();
- default:
- return constraint.getRValue().toString();
+ if (constraint.getRValue().getTag() == ExpressionTag.CONSTANT) {
+ ConstantExpression ce = (ConstantExpression) constraint.getRValue();
+ return ce.getValue().toString();
+ } else {
+ return constraint.getRValue().toString();
}
}
}