cloning of feed job spec using wrapping operator
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index 08436f1..eb148d5 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -1514,7 +1514,8 @@
metadataProvider.getConfig().put(FunctionUtils.IMPORT_PRIVATE_FUNCTIONS, "" + Boolean.TRUE);
metadataProvider.getConfig().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, cbfs.getPolicyName());
JobSpecification compiled = rewriteCompileQuery(metadataProvider, cfs.getQuery(), cbfs);
- FeedUtil.alterJobSpecificationForFeed(compiled);
+ //FeedUtil.alterJobSpecificationForFeed(compiled);
+ JobSpecification newJobSpec = FeedUtil.alterJobSpecificationForFeed2(compiled);
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Altered feed ingestion spec to wrap operators");
@@ -1529,7 +1530,7 @@
releaseReadLatch();
readLatchAcquired = false;
}
- runJob(hcc, compiled, waitForCompletion);
+ runJob(hcc, newJobSpec, waitForCompletion);
} catch (Exception e) {
if (bActiveTxn) {
abort(e, e, mdTxnCtx);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedUtil.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedUtil.java
index c809295..8c42aad 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedUtil.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedUtil.java
@@ -47,61 +47,87 @@
JobSpecification altered = null;
altered = new JobSpecification();
Map<OperatorDescriptorId, IOperatorDescriptor> operatorMap = spec.getOperatorMap();
- Map<OperatorDescriptorId, IOperatorDescriptor> opIdToOp = new HashMap<OperatorDescriptorId, IOperatorDescriptor>();
- Map<IOperatorDescriptor, IOperatorDescriptor> opToOp = new HashMap<IOperatorDescriptor, IOperatorDescriptor>();
- Map<OperatorDescriptorId, OperatorDescriptorId> opIdToOpId = new HashMap<OperatorDescriptorId, OperatorDescriptorId>();
- List<IOperatorDescriptor> opToReplace = new ArrayList<IOperatorDescriptor>();
- Iterator<OperatorDescriptorId> opIt = operatorMap.keySet().iterator();
+ // copy operators
+ Map<OperatorDescriptorId, OperatorDescriptorId> oldNewOID = new HashMap<OperatorDescriptorId, OperatorDescriptorId>();
+ for (Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operatorMap.entrySet()) {
+ IOperatorDescriptor opDesc = entry.getValue();
+ FeedMetaOperatorDescriptor metaOp = new FeedMetaOperatorDescriptor(altered, opDesc);
+ oldNewOID.put(opDesc.getOperatorId(), metaOp.getOperatorId());
+ }
- while (opIt.hasNext()) {
- OperatorDescriptorId opId = opIt.next();
- IOperatorDescriptor op = operatorMap.get(opId);
- if (op instanceof FeedIntakeOperatorDescriptor) {
- opIdToOp.put(opId, op);
- opToOp.put(op, op);
- opIdToOpId.put(op.getOperatorId(), op.getOperatorId());
- operatorMap.put(opId, op);
- } else if (op instanceof AlgebricksMetaOperatorDescriptor) {
- AlgebricksMetaOperatorDescriptor mop = (AlgebricksMetaOperatorDescriptor) op;
- IPushRuntimeFactory[] runtimeFactories = mop.getPipeline().getRuntimeFactories();
- boolean added = false;
- for (IPushRuntimeFactory rf : runtimeFactories) {
- if (rf instanceof CommitRuntimeFactory) {
- opIdToOp.put(opId, op);
- opToOp.put(op, op);
- opIdToOpId.put(op.getOperatorId(), op.getOperatorId());
- operatorMap.put(opId, op);
- added = true;
+ // copy connectors
+ for (Entry<ConnectorDescriptorId, IConnectorDescriptor> entry : spec.getConnectorMap().entrySet()) {
+ IConnectorDescriptor connDesc = entry.getValue();
+ altered.getConnectorMap().put(connDesc.getConnectorId(), connDesc);
+ }
+
+ // make connections between operators
+ for (Entry<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>> entry : spec
+ .getConnectorOperatorMap().entrySet()) {
+ IConnectorDescriptor connDesc = altered.getConnectorMap().get(entry.getKey());
+ Pair<IOperatorDescriptor, Integer> leftOp = entry.getValue().getLeft();
+ Pair<IOperatorDescriptor, Integer> rightOp = entry.getValue().getRight();
+
+ IOperatorDescriptor leftOpDesc = altered.getOperatorMap().get(
+ oldNewOID.get(leftOp.getLeft().getOperatorId()));
+ IOperatorDescriptor rightOpDesc = altered.getOperatorMap().get(
+ oldNewOID.get(rightOp.getLeft().getOperatorId()));
+
+ altered.connect(connDesc, leftOpDesc, leftOp.getRight(), rightOpDesc, rightOp.getRight());
+ }
+
+ // prepare for setting partition constraints
+ Map<OperatorDescriptorId, List<String>> operatorLocations = new HashMap<OperatorDescriptorId, List<String>>();
+ Map<OperatorDescriptorId, Integer> operatorCounts = new HashMap<OperatorDescriptorId, Integer>();
+
+ for (Constraint constraint : spec.getUserConstraints()) {
+ LValueConstraintExpression lexpr = constraint.getLValue();
+ ConstraintExpression cexpr = constraint.getRValue();
+ OperatorDescriptorId opId;
+ switch (lexpr.getTag()) {
+ case PARTITION_COUNT:
+ opId = ((PartitionCountExpression) lexpr).getOperatorDescriptorId();
+ if (operatorCounts.get(opId) == null) {
+ operatorCounts.put(opId, 1);
+ } else {
+ operatorCounts.put(opId, operatorCounts.get(opId) + 1);
}
- }
- if (!added) {
- opToReplace.add(op);
- IOperatorDescriptor newOp = new FeedMetaOperatorDescriptor(altered, op);
- }
- } else {
- opToReplace.add(op);
- IOperatorDescriptor newOp = new FeedMetaOperatorDescriptor(altered, op);
+ break;
+ case PARTITION_LOCATION:
+ opId = ((PartitionLocationExpression) lexpr).getOperatorDescriptorId();
+ IOperatorDescriptor opDesc = altered.getOperatorMap().get(oldNewOID.get(opId));
+ List<String> locations = operatorLocations.get(opDesc.getOperatorId());
+ if (locations == null) {
+ locations = new ArrayList<String>();
+ operatorLocations.put(opDesc.getOperatorId(), locations);
+ }
+ String location = (String) ((ConstantExpression) cexpr).getValue();
+ locations.add(location);
+ break;
}
}
- // operators that were not changed
- for (OperatorDescriptorId opId : spec.getOperatorMap().keySet()) {
- if (opIdToOp.get(opId) != null) {
- operatorMap.put(opId, opIdToOp.get(opId));
+ // set absolute location constraints
+ for (Entry<OperatorDescriptorId, List<String>> entry : operatorLocations.entrySet()) {
+ IOperatorDescriptor opDesc = altered.getOperatorMap().get(oldNewOID.get(entry.getKey()));
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(altered, opDesc,
+ entry.getValue().toArray(new String[] {}));
+ }
+
+ // set count constraints
+ for (Entry<OperatorDescriptorId, Integer> entry : operatorCounts.entrySet()) {
+ IOperatorDescriptor opDesc = altered.getOperatorMap().get(oldNewOID.get(entry.getKey()));
+ if (!operatorLocations.keySet().contains(entry.getKey())) {
+ PartitionConstraintHelper.addPartitionCountConstraint(altered, opDesc, entry.getValue());
}
}
- for (IOperatorDescriptor op : opToReplace) {
- spec.getOperatorMap().remove(op.getOperatorId());
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("New Job Spec:" + altered);
}
- for (IOperatorDescriptor op : opToReplace) {
- IOperatorDescriptor newOp = new FeedMetaOperatorDescriptor(spec, op);
- opIdToOp.put(op.getOperatorId(), newOp);
- opToOp.put(op, newOp);
- opIdToOpId.put(op.getOperatorId(), newOp.getOperatorId());
- }
+ return altered;
}
@@ -160,7 +186,8 @@
}
// connectors
-
+
+ /*
for(Map.Entry<ConnectorDescriptorId, IConnectorDescriptor> entry : spec.getConnectorMap().entrySet()){
ConnectorDescriptorId cid= entry.getKey();
IConnectorDescriptor cdesc = entry.getValue();
@@ -168,10 +195,8 @@
((OneToOneConnectorDescriptor)cdesc).
}
}
-
-
-
-
+ */
+
// connector operator Map
for (ConnectorDescriptorId cid : spec.getConnectorOperatorMap().keySet()) {
Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> p = spec
@@ -201,7 +226,8 @@
for (OperatorDescriptorId opId : keysForRemoval) {
spec.getOperatorOutputMap().remove(opId);
}
-
+
+ /*
for(OperatorDescriptorId opId : keysForAddition.keySet()){
List<IConnectorDescriptor> origConnectors = keysForAddition.get(opId);
List<IConnectorDescriptor> newConnectors = new ArrayList<IConnectorDescriptor>();
@@ -209,10 +235,8 @@
newConnectors.add(e)
}
- }
-
-
-
+ }*/
+
spec.getOperatorOutputMap().putAll(keysForAddition);
// operator input Map