checkpoint
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedUtil.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedUtil.java
index c0d6005..c809295 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedUtil.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedUtil.java
@@ -32,6 +32,7 @@
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
public class FeedUtil {
@@ -42,6 +43,68 @@
.getActivityType().equals(FeedActivityType.FEED_FAILURE)));
}
+ public static JobSpecification alterJobSpecificationForFeed2(JobSpecification spec) {
+ JobSpecification altered = null;
+ altered = new JobSpecification();
+ Map<OperatorDescriptorId, IOperatorDescriptor> operatorMap = spec.getOperatorMap();
+ Map<OperatorDescriptorId, IOperatorDescriptor> opIdToOp = new HashMap<OperatorDescriptorId, IOperatorDescriptor>();
+ Map<IOperatorDescriptor, IOperatorDescriptor> opToOp = new HashMap<IOperatorDescriptor, IOperatorDescriptor>();
+ Map<OperatorDescriptorId, OperatorDescriptorId> opIdToOpId = new HashMap<OperatorDescriptorId, OperatorDescriptorId>();
+
+ List<IOperatorDescriptor> opToReplace = new ArrayList<IOperatorDescriptor>();
+ Iterator<OperatorDescriptorId> opIt = operatorMap.keySet().iterator();
+
+ while (opIt.hasNext()) {
+ OperatorDescriptorId opId = opIt.next();
+ IOperatorDescriptor op = operatorMap.get(opId);
+ if (op instanceof FeedIntakeOperatorDescriptor) {
+ opIdToOp.put(opId, op);
+ opToOp.put(op, op);
+ opIdToOpId.put(op.getOperatorId(), op.getOperatorId());
+ operatorMap.put(opId, op);
+ } else if (op instanceof AlgebricksMetaOperatorDescriptor) {
+ AlgebricksMetaOperatorDescriptor mop = (AlgebricksMetaOperatorDescriptor) op;
+ IPushRuntimeFactory[] runtimeFactories = mop.getPipeline().getRuntimeFactories();
+ boolean added = false;
+ for (IPushRuntimeFactory rf : runtimeFactories) {
+ if (rf instanceof CommitRuntimeFactory) {
+ opIdToOp.put(opId, op);
+ opToOp.put(op, op);
+ opIdToOpId.put(op.getOperatorId(), op.getOperatorId());
+ operatorMap.put(opId, op);
+ added = true;
+ }
+ }
+ if (!added) {
+ opToReplace.add(op);
+ IOperatorDescriptor newOp = new FeedMetaOperatorDescriptor(altered, op);
+ }
+ } else {
+ opToReplace.add(op);
+ IOperatorDescriptor newOp = new FeedMetaOperatorDescriptor(altered, op);
+ }
+ }
+
+ // operators that were not changed
+ for (OperatorDescriptorId opId : spec.getOperatorMap().keySet()) {
+ if (opIdToOp.get(opId) != null) {
+ operatorMap.put(opId, opIdToOp.get(opId));
+ }
+ }
+
+ for (IOperatorDescriptor op : opToReplace) {
+ spec.getOperatorMap().remove(op.getOperatorId());
+ }
+
+ for (IOperatorDescriptor op : opToReplace) {
+ IOperatorDescriptor newOp = new FeedMetaOperatorDescriptor(spec, op);
+ opIdToOp.put(op.getOperatorId(), newOp);
+ opToOp.put(op, newOp);
+ opIdToOpId.put(op.getOperatorId(), newOp.getOperatorId());
+ }
+
+ }
+
public static void alterJobSpecificationForFeed(JobSpecification spec) {
Map<OperatorDescriptorId, IOperatorDescriptor> operatorMap = spec.getOperatorMap();
@@ -50,8 +113,6 @@
Map<OperatorDescriptorId, OperatorDescriptorId> opIdToOpId = new HashMap<OperatorDescriptorId, OperatorDescriptorId>();
Iterator<OperatorDescriptorId> opIt = operatorMap.keySet().iterator();
- IOperatorDescriptor ingestOp = null;
- OperatorDescriptorId ingestOpId = null;
List<IOperatorDescriptor> opToReplace = new ArrayList<IOperatorDescriptor>();
while (opIt.hasNext()) {
OperatorDescriptorId opId = opIt.next();
@@ -98,6 +159,19 @@
opIdToOpId.put(op.getOperatorId(), newOp.getOperatorId());
}
+ // connectors
+
+ for(Map.Entry<ConnectorDescriptorId, IConnectorDescriptor> entry : spec.getConnectorMap().entrySet()){
+ ConnectorDescriptorId cid= entry.getKey();
+ IConnectorDescriptor cdesc = entry.getValue();
+ if(cdesc instanceof OneToOneConnectorDescriptor){
+ ((OneToOneConnectorDescriptor)cdesc).
+ }
+ }
+
+
+
+
// connector operator Map
for (ConnectorDescriptorId cid : spec.getConnectorOperatorMap().keySet()) {
Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> p = spec
@@ -127,6 +201,18 @@
for (OperatorDescriptorId opId : keysForRemoval) {
spec.getOperatorOutputMap().remove(opId);
}
+
+ for(OperatorDescriptorId opId : keysForAddition.keySet()){
+ List<IConnectorDescriptor> origConnectors = keysForAddition.get(opId);
+ List<IConnectorDescriptor> newConnectors = new ArrayList<IConnectorDescriptor>();
+ for(IConnectorDescriptor connDesc : origConnectors){
+ newConnectors.add(e)
+ }
+
+ }
+
+
+
spec.getOperatorOutputMap().putAll(keysForAddition);
// operator input Map