checkpoint
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index 41cda8a..08436f1 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -72,6 +72,7 @@
import edu.uci.ics.asterix.file.DatasetOperations;
import edu.uci.ics.asterix.file.DataverseOperations;
import edu.uci.ics.asterix.file.FeedOperations;
+import edu.uci.ics.asterix.file.FeedUtil;
import edu.uci.ics.asterix.file.IndexOperations;
import edu.uci.ics.asterix.formats.base.IDataFormat;
import edu.uci.ics.asterix.metadata.IDatasetDetails;
@@ -95,7 +96,6 @@
import edu.uci.ics.asterix.metadata.entities.NodeGroup;
import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies;
import edu.uci.ics.asterix.metadata.feeds.FeedConnectionId;
-import edu.uci.ics.asterix.metadata.feeds.FeedUtil;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.ATypeTag;
import edu.uci.ics.asterix.om.types.IAType;
@@ -1516,6 +1516,10 @@
JobSpecification compiled = rewriteCompileQuery(metadataProvider, cfs.getQuery(), cbfs);
FeedUtil.alterJobSpecificationForFeed(compiled);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Altered feed ingestion spec to wrap operators");
+ }
+
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
String waitForCompletionParam = metadataProvider.getConfig().get(ConnectFeedStatement.WAIT_FOR_COMPLETION);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedUtil.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedUtil.java
new file mode 100644
index 0000000..c0d6005
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedUtil.java
@@ -0,0 +1,193 @@
+package edu.uci.ics.asterix.file;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import edu.uci.ics.asterix.algebra.operators.physical.CommitRuntimeFactory;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
+import edu.uci.ics.asterix.metadata.feeds.FeedIntakeOperatorDescriptor;
+import edu.uci.ics.asterix.metadata.feeds.FeedMetaOperatorDescriptor;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import edu.uci.ics.hyracks.api.constraints.Constraint;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.constraints.expressions.ConstantExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.ConstraintExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.LValueConstraintExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.PartitionCountExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.PartitionLocationExpression;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class FeedUtil {
+
+ private static Logger LOGGER = Logger.getLogger(FeedUtil.class.getName());
+
+ public static boolean isFeedActive(FeedActivity feedActivity) {
+ return (feedActivity != null && !(feedActivity.getActivityType().equals(FeedActivityType.FEED_END) || feedActivity
+ .getActivityType().equals(FeedActivityType.FEED_FAILURE)));
+ }
+
+ public static void alterJobSpecificationForFeed(JobSpecification spec) {
+
+ Map<OperatorDescriptorId, IOperatorDescriptor> operatorMap = spec.getOperatorMap();
+ Map<OperatorDescriptorId, IOperatorDescriptor> opIdToOp = new HashMap<OperatorDescriptorId, IOperatorDescriptor>();
+ Map<IOperatorDescriptor, IOperatorDescriptor> opToOp = new HashMap<IOperatorDescriptor, IOperatorDescriptor>();
+ Map<OperatorDescriptorId, OperatorDescriptorId> opIdToOpId = new HashMap<OperatorDescriptorId, OperatorDescriptorId>();
+
+ Iterator<OperatorDescriptorId> opIt = operatorMap.keySet().iterator();
+ IOperatorDescriptor ingestOp = null;
+ OperatorDescriptorId ingestOpId = null;
+ List<IOperatorDescriptor> opToReplace = new ArrayList<IOperatorDescriptor>();
+ while (opIt.hasNext()) {
+ OperatorDescriptorId opId = opIt.next();
+ IOperatorDescriptor op = operatorMap.get(opId);
+ if (op instanceof FeedIntakeOperatorDescriptor) {
+ opIdToOp.put(opId, op);
+ opToOp.put(op, op);
+ opIdToOpId.put(op.getOperatorId(), op.getOperatorId());
+ } else if (op instanceof AlgebricksMetaOperatorDescriptor) {
+ AlgebricksMetaOperatorDescriptor mop = (AlgebricksMetaOperatorDescriptor) op;
+ IPushRuntimeFactory[] runtimeFactories = mop.getPipeline().getRuntimeFactories();
+ boolean added = false;
+ for (IPushRuntimeFactory rf : runtimeFactories) {
+ if (rf instanceof CommitRuntimeFactory) {
+ opIdToOp.put(opId, op);
+ opToOp.put(op, op);
+ opIdToOpId.put(op.getOperatorId(), op.getOperatorId());
+ added = true;
+ }
+ }
+ if (!added) {
+ opToReplace.add(op);
+ }
+ } else {
+ opToReplace.add(op);
+ }
+ }
+
+ // operator map
+ for (OperatorDescriptorId opId : spec.getOperatorMap().keySet()) {
+ if (opIdToOp.get(opId) != null) {
+ operatorMap.put(opId, opIdToOp.get(opId));
+ }
+ }
+
+ for (IOperatorDescriptor op : opToReplace) {
+ spec.getOperatorMap().remove(op.getOperatorId());
+ }
+
+ for (IOperatorDescriptor op : opToReplace) {
+ IOperatorDescriptor newOp = new FeedMetaOperatorDescriptor(spec, op);
+ opIdToOp.put(op.getOperatorId(), newOp);
+ opToOp.put(op, newOp);
+ opIdToOpId.put(op.getOperatorId(), newOp.getOperatorId());
+ }
+
+ // connector operator Map
+ for (ConnectorDescriptorId cid : spec.getConnectorOperatorMap().keySet()) {
+ Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> p = spec
+ .getConnectorOperatorMap().get(cid);
+
+ Pair<IOperatorDescriptor, Integer> leftPair = p.getLeft();
+ Pair<IOperatorDescriptor, Integer> newLeftPair = Pair.of(opToOp.get(leftPair.getLeft()),
+ leftPair.getRight());
+
+ Pair<IOperatorDescriptor, Integer> newRightPair = Pair.of(opToOp.get(p.getRight().getLeft()), p.getRight()
+ .getRight());
+
+ spec.getConnectorOperatorMap().put(cid, Pair.of(newLeftPair, newRightPair));
+ }
+
+ // operator Output Map
+ Set<OperatorDescriptorId> keysForRemoval = new HashSet<OperatorDescriptorId>();
+ Map<OperatorDescriptorId, List<IConnectorDescriptor>> keysForAddition = new HashMap<OperatorDescriptorId, List<IConnectorDescriptor>>();
+ for (Entry<OperatorDescriptorId, List<IConnectorDescriptor>> entry : spec.getOperatorOutputMap().entrySet()) {
+ OperatorDescriptorId opId = entry.getKey();
+ if (!opIdToOpId.get(opId).equals(opId)) {
+ keysForRemoval.add(opId);
+ keysForAddition.put(opIdToOpId.get(opId), entry.getValue());
+ }
+ }
+
+ for (OperatorDescriptorId opId : keysForRemoval) {
+ spec.getOperatorOutputMap().remove(opId);
+ }
+ spec.getOperatorOutputMap().putAll(keysForAddition);
+
+ // operator input Map
+ keysForRemoval.clear();
+ keysForAddition.clear();
+ for (Entry<OperatorDescriptorId, List<IConnectorDescriptor>> entry : spec.getOperatorInputMap().entrySet()) {
+ OperatorDescriptorId opId = entry.getKey();
+ if (!opIdToOpId.get(opId).equals(opId)) {
+ keysForRemoval.add(opId);
+ keysForAddition.put(opIdToOpId.get(opId), entry.getValue());
+ }
+ }
+
+ for (OperatorDescriptorId opId : keysForRemoval) {
+ spec.getOperatorInputMap().remove(opId);
+ }
+ spec.getOperatorInputMap().putAll(keysForAddition);
+
+ Set<Constraint> userConstraints = spec.getUserConstraints();
+ Set<Constraint> constraintsForRemoval = new HashSet<Constraint>();
+ Map<OperatorDescriptorId, List<String>> constraintsForAddition = new HashMap<OperatorDescriptorId, List<String>>();
+
+ OperatorDescriptorId opId;
+ for (Constraint constraint : userConstraints) {
+ LValueConstraintExpression lexpr = constraint.getLValue();
+ ConstraintExpression cexpr = constraint.getRValue();
+ switch (lexpr.getTag()) {
+ case PARTITION_COUNT:
+ opId = ((PartitionCountExpression) lexpr).getOperatorDescriptorId();
+ if (!opIdToOpId.get(opId).equals(opId)) {
+ constraintsForRemoval.add(constraint);
+ }
+ break;
+ case PARTITION_LOCATION:
+ opId = ((PartitionLocationExpression) lexpr).getOperatorDescriptorId();
+ if (!opIdToOpId.get(opId).equals(opId)) {
+ constraintsForRemoval.add(constraint);
+ String oldLocation = (String) ((ConstantExpression) cexpr).getValue();
+ List<String> locations = constraintsForAddition.get(opId);
+ if (locations == null) {
+ locations = new ArrayList<String>();
+ constraintsForAddition.put(opId, locations);
+ }
+ locations.add(oldLocation);
+ }
+ break;
+ }
+ }
+
+ spec.getUserConstraints().removeAll(constraintsForRemoval);
+ for (Entry<OperatorDescriptorId, List<String>> entry : constraintsForAddition.entrySet()) {
+ OperatorDescriptorId oldOpId = entry.getKey();
+ OperatorDescriptorId newOpId = opIdToOpId.get(oldOpId);
+ if (!newOpId.equals(oldOpId)) {
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, opIdToOp.get(oldOpId), entry.getValue()
+ .toArray(new String[] {}));
+ }
+ }
+
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Modified job spec with wrapped operators\n" + spec);
+ }
+ }
+}
diff --git a/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java b/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
index 21caf28..4d27e48 100644
--- a/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
+++ b/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
@@ -131,6 +131,8 @@
@Test
public void test() throws Exception {
- TestsUtils.executeTest(PATH_ACTUAL, tcCtx);
+ if (tcCtx.getTestCase().getCompilationUnit().get(0).getName().contains("feeds")) {
+ TestsUtils.executeTest(PATH_ACTUAL, tcCtx);
+ }
}
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorDescriptor.java
index b726955..fae2fe4 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorDescriptor.java
@@ -44,4 +44,6 @@
return new FeedMessageOperatorNodePushable(ctx, feedId, feedMessage, partition, nPartitions);
}
+
+
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
index 1f3c43a..4ad7617 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
@@ -27,6 +27,11 @@
return new FeedMetaNodePushable(ctx, recordDescProvider, partition, nPartitions, coreOperator);
}
+ @Override
+ public String toString() {
+ return coreOperator.getDisplayName();
+ }
+
private static class FeedMetaNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
private AbstractUnaryInputUnaryOutputOperatorNodePushable coreOperatorNodePushable;
@@ -39,6 +44,8 @@
@Override
public void open() throws HyracksDataException {
+ // coreOperatorNodePushable.setOutputFrameWriter(partition, coreOperatorNodePushable, recordDescProvider);
+ coreOperatorNodePushable.setOutputFrameWriter(0, writer, recordDesc);
coreOperatorNodePushable.open();
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
deleted file mode 100644
index ed4c532..0000000
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
+++ /dev/null
@@ -1,62 +0,0 @@
-package edu.uci.ics.asterix.metadata.feeds;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.commons.lang3.tuple.Pair;
-
-import edu.uci.ics.asterix.metadata.entities.FeedActivity;
-import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
-import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-
-public class FeedUtil {
-
- public static boolean isFeedActive(FeedActivity feedActivity) {
- return (feedActivity != null && !(feedActivity.getActivityType().equals(FeedActivityType.FEED_END) || feedActivity
- .getActivityType().equals(FeedActivityType.FEED_FAILURE)));
- }
-
- public static void alterJobSpecificationForFeed(JobSpecification spec) {
-
- Map<OperatorDescriptorId, IOperatorDescriptor> r1 = new HashMap<OperatorDescriptorId, IOperatorDescriptor>();
- Map<OperatorDescriptorId, IOperatorDescriptor> operatorMap = spec.getOperatorMap();
- Map<OperatorDescriptorId, IOperatorDescriptor> opIdToOp = new HashMap<OperatorDescriptorId, IOperatorDescriptor>();
- Map<IOperatorDescriptor, IOperatorDescriptor> opToOp = new HashMap<IOperatorDescriptor, IOperatorDescriptor>();
-
- for (Entry<OperatorDescriptorId, IOperatorDescriptor> entry : spec.getOperatorMap().entrySet()) {
- OperatorDescriptorId opId = entry.getKey();
- IOperatorDescriptor op = entry.getValue();
- if (!(op instanceof FeedIntakeOperatorDescriptor)) {
- IOperatorDescriptor newOp = new FeedMetaOperatorDescriptor(spec, op);
- opIdToOp.put(opId, newOp);
- opToOp.put(op, newOp);
- } else {
- opIdToOp.put(opId, op);
- opToOp.put(op, op);
- }
- }
-
- for (OperatorDescriptorId opId : spec.getOperatorMap().keySet()) {
- operatorMap.put(opId, opIdToOp.get(opId));
- }
-
- Map<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>> connectorOpMap = spec
- .getConnectorOperatorMap();
- Map<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>> r2 = new HashMap<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>>();
- for (Entry<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>> entry : connectorOpMap
- .entrySet()) {
- IOperatorDescriptor opLeft = entry.getValue().getLeft().getLeft();
- IOperatorDescriptor opRight = entry.getValue().getRight().getLeft();
-
- if ((!opLeft instanceof FeedIntakeOperatorDescriptor)) {
-
- }
-
- }
-
- }
-}