added support for cloning a feed job spec, wrapping operators with meta versions
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index eb148d5..fd55a07 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -83,6 +83,7 @@
import edu.uci.ics.asterix.metadata.bootstrap.MetadataConstants;
import edu.uci.ics.asterix.metadata.dataset.hints.DatasetHints;
import edu.uci.ics.asterix.metadata.dataset.hints.DatasetHints.DatasetNodegroupCardinalityHint;
+import edu.uci.ics.asterix.metadata.declared.AqlDataSource;
import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
import edu.uci.ics.asterix.metadata.entities.Dataset;
import edu.uci.ics.asterix.metadata.entities.Datatype;
@@ -90,6 +91,7 @@
import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
import edu.uci.ics.asterix.metadata.entities.Feed;
import edu.uci.ics.asterix.metadata.entities.FeedActivity;
+import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
import edu.uci.ics.asterix.metadata.entities.Function;
import edu.uci.ics.asterix.metadata.entities.Index;
import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
@@ -1497,8 +1499,10 @@
throw new AsterixException("Unknown source feed :" + cfs.getFeedName());
}
+ FeedConnectionId feedConnId = new FeedConnectionId(dataverseName, cfs.getFeedName(), cfs.getDatasetName()
+ .getValue());
FeedActivity recentActivity = MetadataManager.INSTANCE.getRecentActivityOnFeedConnection(mdTxnCtx,
- new FeedConnectionId(dataverseName, cfs.getFeedName(), cfs.getDatasetName().getValue()), null);
+ feedConnId, null);
boolean isFeedActive = FeedUtil.isFeedActive(recentActivity);
if (isFeedActive && !cfs.forceConnect()) {
throw new AsterixException("Feed " + cfs.getDatasetName().getValue()
@@ -1506,16 +1510,25 @@
}
IDatasetDetails datasetDetails = dataset.getDatasetDetails();
if (datasetDetails.getDatasetType() != DatasetType.INTERNAL) {
- throw new IllegalArgumentException("Dataset " + cfs.getDatasetName().getValue()
- + " is not an interal dataset");
+ throw new AsterixException("Dataset " + cfs.getDatasetName().getValue() + " is not an interal dataset");
}
+
+ FeedPolicy feedPolicy = MetadataManager.INSTANCE.getFeedPolicy(mdTxnCtx, dataverseName,
+ cbfs.getPolicyName());
+ if (feedPolicy == null) {
+ feedPolicy = MetadataManager.INSTANCE.getFeedPolicy(mdTxnCtx,
+ MetadataConstants.METADATA_DATAVERSE_NAME, cbfs.getPolicyName());
+ if (feedPolicy == null) {
+ throw new AsterixException("Unknown feed policy" + cbfs.getPolicyName());
+ }
+ }
+
cfs.initialize(metadataProvider.getMetadataTxnContext(), dataset, feed);
cbfs.setQuery(cfs.getQuery());
metadataProvider.getConfig().put(FunctionUtils.IMPORT_PRIVATE_FUNCTIONS, "" + Boolean.TRUE);
metadataProvider.getConfig().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, cbfs.getPolicyName());
JobSpecification compiled = rewriteCompileQuery(metadataProvider, cfs.getQuery(), cbfs);
- //FeedUtil.alterJobSpecificationForFeed(compiled);
- JobSpecification newJobSpec = FeedUtil.alterJobSpecificationForFeed2(compiled);
+ JobSpecification newJobSpec = FeedUtil.alterJobSpecificationForFeed(compiled, feedConnId, feedPolicy);
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Altered feed ingestion spec to wrap operators");
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedUtil.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedUtil.java
index 8c42aad..0a93a8e 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedUtil.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedUtil.java
@@ -2,24 +2,22 @@
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.commons.lang3.tuple.Pair;
-import edu.uci.ics.asterix.algebra.operators.physical.CommitRuntimeFactory;
+import edu.uci.ics.asterix.common.dataflow.AsterixLSMTreeInsertDeleteOperatorDescriptor;
import edu.uci.ics.asterix.metadata.entities.FeedActivity;
import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
+import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
+import edu.uci.ics.asterix.metadata.feeds.FeedConnectionId;
import edu.uci.ics.asterix.metadata.feeds.FeedIntakeOperatorDescriptor;
import edu.uci.ics.asterix.metadata.feeds.FeedMetaOperatorDescriptor;
-import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.hyracks.api.constraints.Constraint;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
import edu.uci.ics.hyracks.api.constraints.expressions.ConstantExpression;
@@ -32,7 +30,6 @@
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
public class FeedUtil {
@@ -43,7 +40,8 @@
.getActivityType().equals(FeedActivityType.FEED_FAILURE)));
}
- public static JobSpecification alterJobSpecificationForFeed2(JobSpecification spec) {
+ public static JobSpecification alterJobSpecificationForFeed(JobSpecification spec,
+ FeedConnectionId feedConnectionId, FeedPolicy feedPolicy) {
JobSpecification altered = null;
altered = new JobSpecification();
Map<OperatorDescriptorId, IOperatorDescriptor> operatorMap = spec.getOperatorMap();
@@ -52,20 +50,40 @@
Map<OperatorDescriptorId, OperatorDescriptorId> oldNewOID = new HashMap<OperatorDescriptorId, OperatorDescriptorId>();
for (Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operatorMap.entrySet()) {
IOperatorDescriptor opDesc = entry.getValue();
- FeedMetaOperatorDescriptor metaOp = new FeedMetaOperatorDescriptor(altered, opDesc);
- oldNewOID.put(opDesc.getOperatorId(), metaOp.getOperatorId());
+ if (opDesc instanceof FeedIntakeOperatorDescriptor) {
+ FeedIntakeOperatorDescriptor orig = (FeedIntakeOperatorDescriptor) opDesc;
+ FeedIntakeOperatorDescriptor fiop = new FeedIntakeOperatorDescriptor(altered, orig.getFeedId(),
+ orig.getAdapterFactory(), (ARecordType) orig.getAtype(), orig.getRecordDescriptor(),
+ orig.getFeedPolicy());
+ oldNewOID.put(opDesc.getOperatorId(), fiop.getOperatorId());
+ } else if (opDesc instanceof AsterixLSMTreeInsertDeleteOperatorDescriptor) {
+ AsterixLSMTreeInsertDeleteOperatorDescriptor orig = (AsterixLSMTreeInsertDeleteOperatorDescriptor) opDesc;
+ AsterixLSMTreeInsertDeleteOperatorDescriptor liop = new AsterixLSMTreeInsertDeleteOperatorDescriptor(
+ altered, orig.getRecordDescriptor(), orig.getStorageManager(),
+ orig.getLifecycleManagerProvider(), orig.getFileSplitProvider(), orig.getTreeIndexTypeTraits(),
+ orig.getComparatorFactories(), orig.getTreeIndexBloomFilterKeyFields(),
+ orig.getFieldPermutations(), orig.getIndexOperation(), orig.getIndexDataflowHelperFactory(),
+ orig.getTupleFilterFactory(), orig.getModificationOpCallbackFactory(), orig.isPrimary());
+ oldNewOID.put(opDesc.getOperatorId(), liop.getOperatorId());
+ } else {
+ FeedMetaOperatorDescriptor metaOp = new FeedMetaOperatorDescriptor(altered, feedConnectionId, opDesc,
+ feedPolicy);
+ oldNewOID.put(opDesc.getOperatorId(), metaOp.getOperatorId());
+ }
}
// copy connectors
+ Map<ConnectorDescriptorId, ConnectorDescriptorId> connectorMapping = new HashMap<ConnectorDescriptorId, ConnectorDescriptorId>();
for (Entry<ConnectorDescriptorId, IConnectorDescriptor> entry : spec.getConnectorMap().entrySet()) {
IConnectorDescriptor connDesc = entry.getValue();
- altered.getConnectorMap().put(connDesc.getConnectorId(), connDesc);
+ ConnectorDescriptorId newConnId = altered.createConnectorDescriptor(connDesc);
+ connectorMapping.put(entry.getKey(), newConnId);
}
// make connections between operators
for (Entry<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>> entry : spec
.getConnectorOperatorMap().entrySet()) {
- IConnectorDescriptor connDesc = altered.getConnectorMap().get(entry.getKey());
+ IConnectorDescriptor connDesc = altered.getConnectorMap().get(connectorMapping.get(entry.getKey()));
Pair<IOperatorDescriptor, Integer> leftOp = entry.getValue().getLeft();
Pair<IOperatorDescriptor, Integer> rightOp = entry.getValue().getRight();
@@ -123,6 +141,17 @@
}
}
+ // useConnectorSchedulingPolicy
+ altered.setUseConnectorPolicyForScheduling(spec.isUseConnectorPolicyForScheduling());
+
+ // connectorAssignmentPolicy
+ altered.setConnectorPolicyAssignmentPolicy(spec.getConnectorPolicyAssignmentPolicy());
+
+ // roots
+ for (OperatorDescriptorId root : spec.getRoots()) {
+ altered.addRoot(altered.getOperatorMap().get(oldNewOID.get(root)));
+ }
+
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("New Job Spec:" + altered);
}
@@ -131,173 +160,4 @@
}
- public static void alterJobSpecificationForFeed(JobSpecification spec) {
-
- Map<OperatorDescriptorId, IOperatorDescriptor> operatorMap = spec.getOperatorMap();
- Map<OperatorDescriptorId, IOperatorDescriptor> opIdToOp = new HashMap<OperatorDescriptorId, IOperatorDescriptor>();
- Map<IOperatorDescriptor, IOperatorDescriptor> opToOp = new HashMap<IOperatorDescriptor, IOperatorDescriptor>();
- Map<OperatorDescriptorId, OperatorDescriptorId> opIdToOpId = new HashMap<OperatorDescriptorId, OperatorDescriptorId>();
-
- Iterator<OperatorDescriptorId> opIt = operatorMap.keySet().iterator();
- List<IOperatorDescriptor> opToReplace = new ArrayList<IOperatorDescriptor>();
- while (opIt.hasNext()) {
- OperatorDescriptorId opId = opIt.next();
- IOperatorDescriptor op = operatorMap.get(opId);
- if (op instanceof FeedIntakeOperatorDescriptor) {
- opIdToOp.put(opId, op);
- opToOp.put(op, op);
- opIdToOpId.put(op.getOperatorId(), op.getOperatorId());
- } else if (op instanceof AlgebricksMetaOperatorDescriptor) {
- AlgebricksMetaOperatorDescriptor mop = (AlgebricksMetaOperatorDescriptor) op;
- IPushRuntimeFactory[] runtimeFactories = mop.getPipeline().getRuntimeFactories();
- boolean added = false;
- for (IPushRuntimeFactory rf : runtimeFactories) {
- if (rf instanceof CommitRuntimeFactory) {
- opIdToOp.put(opId, op);
- opToOp.put(op, op);
- opIdToOpId.put(op.getOperatorId(), op.getOperatorId());
- added = true;
- }
- }
- if (!added) {
- opToReplace.add(op);
- }
- } else {
- opToReplace.add(op);
- }
- }
-
- // operator map
- for (OperatorDescriptorId opId : spec.getOperatorMap().keySet()) {
- if (opIdToOp.get(opId) != null) {
- operatorMap.put(opId, opIdToOp.get(opId));
- }
- }
-
- for (IOperatorDescriptor op : opToReplace) {
- spec.getOperatorMap().remove(op.getOperatorId());
- }
-
- for (IOperatorDescriptor op : opToReplace) {
- IOperatorDescriptor newOp = new FeedMetaOperatorDescriptor(spec, op);
- opIdToOp.put(op.getOperatorId(), newOp);
- opToOp.put(op, newOp);
- opIdToOpId.put(op.getOperatorId(), newOp.getOperatorId());
- }
-
- // connectors
-
- /*
- for(Map.Entry<ConnectorDescriptorId, IConnectorDescriptor> entry : spec.getConnectorMap().entrySet()){
- ConnectorDescriptorId cid= entry.getKey();
- IConnectorDescriptor cdesc = entry.getValue();
- if(cdesc instanceof OneToOneConnectorDescriptor){
- ((OneToOneConnectorDescriptor)cdesc).
- }
- }
- */
-
- // connector operator Map
- for (ConnectorDescriptorId cid : spec.getConnectorOperatorMap().keySet()) {
- Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> p = spec
- .getConnectorOperatorMap().get(cid);
-
- Pair<IOperatorDescriptor, Integer> leftPair = p.getLeft();
- Pair<IOperatorDescriptor, Integer> newLeftPair = Pair.of(opToOp.get(leftPair.getLeft()),
- leftPair.getRight());
-
- Pair<IOperatorDescriptor, Integer> newRightPair = Pair.of(opToOp.get(p.getRight().getLeft()), p.getRight()
- .getRight());
-
- spec.getConnectorOperatorMap().put(cid, Pair.of(newLeftPair, newRightPair));
- }
-
- // operator Output Map
- Set<OperatorDescriptorId> keysForRemoval = new HashSet<OperatorDescriptorId>();
- Map<OperatorDescriptorId, List<IConnectorDescriptor>> keysForAddition = new HashMap<OperatorDescriptorId, List<IConnectorDescriptor>>();
- for (Entry<OperatorDescriptorId, List<IConnectorDescriptor>> entry : spec.getOperatorOutputMap().entrySet()) {
- OperatorDescriptorId opId = entry.getKey();
- if (!opIdToOpId.get(opId).equals(opId)) {
- keysForRemoval.add(opId);
- keysForAddition.put(opIdToOpId.get(opId), entry.getValue());
- }
- }
-
- for (OperatorDescriptorId opId : keysForRemoval) {
- spec.getOperatorOutputMap().remove(opId);
- }
-
- /*
- for(OperatorDescriptorId opId : keysForAddition.keySet()){
- List<IConnectorDescriptor> origConnectors = keysForAddition.get(opId);
- List<IConnectorDescriptor> newConnectors = new ArrayList<IConnectorDescriptor>();
- for(IConnectorDescriptor connDesc : origConnectors){
- newConnectors.add(e)
- }
-
- }*/
-
- spec.getOperatorOutputMap().putAll(keysForAddition);
-
- // operator input Map
- keysForRemoval.clear();
- keysForAddition.clear();
- for (Entry<OperatorDescriptorId, List<IConnectorDescriptor>> entry : spec.getOperatorInputMap().entrySet()) {
- OperatorDescriptorId opId = entry.getKey();
- if (!opIdToOpId.get(opId).equals(opId)) {
- keysForRemoval.add(opId);
- keysForAddition.put(opIdToOpId.get(opId), entry.getValue());
- }
- }
-
- for (OperatorDescriptorId opId : keysForRemoval) {
- spec.getOperatorInputMap().remove(opId);
- }
- spec.getOperatorInputMap().putAll(keysForAddition);
-
- Set<Constraint> userConstraints = spec.getUserConstraints();
- Set<Constraint> constraintsForRemoval = new HashSet<Constraint>();
- Map<OperatorDescriptorId, List<String>> constraintsForAddition = new HashMap<OperatorDescriptorId, List<String>>();
-
- OperatorDescriptorId opId;
- for (Constraint constraint : userConstraints) {
- LValueConstraintExpression lexpr = constraint.getLValue();
- ConstraintExpression cexpr = constraint.getRValue();
- switch (lexpr.getTag()) {
- case PARTITION_COUNT:
- opId = ((PartitionCountExpression) lexpr).getOperatorDescriptorId();
- if (!opIdToOpId.get(opId).equals(opId)) {
- constraintsForRemoval.add(constraint);
- }
- break;
- case PARTITION_LOCATION:
- opId = ((PartitionLocationExpression) lexpr).getOperatorDescriptorId();
- if (!opIdToOpId.get(opId).equals(opId)) {
- constraintsForRemoval.add(constraint);
- String oldLocation = (String) ((ConstantExpression) cexpr).getValue();
- List<String> locations = constraintsForAddition.get(opId);
- if (locations == null) {
- locations = new ArrayList<String>();
- constraintsForAddition.put(opId, locations);
- }
- locations.add(oldLocation);
- }
- break;
- }
- }
-
- spec.getUserConstraints().removeAll(constraintsForRemoval);
- for (Entry<OperatorDescriptorId, List<String>> entry : constraintsForAddition.entrySet()) {
- OperatorDescriptorId oldOpId = entry.getKey();
- OperatorDescriptorId newOpId = opIdToOpId.get(oldOpId);
- if (!newOpId.equals(oldOpId)) {
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, opIdToOp.get(oldOpId), entry.getValue()
- .toArray(new String[] {}));
- }
- }
-
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Modified job spec with wrapped operators\n" + spec);
- }
- }
}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
index 4402959..325dc2e 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
@@ -55,6 +55,7 @@
import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies;
import edu.uci.ics.asterix.metadata.feeds.FeedConnectionId;
import edu.uci.ics.asterix.metadata.feeds.FeedIntakeOperatorDescriptor;
+import edu.uci.ics.asterix.metadata.feeds.FeedMetaOperatorDescriptor;
import edu.uci.ics.asterix.metadata.feeds.FeedPolicyAccessor;
import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
@@ -238,17 +239,25 @@
Map<OperatorDescriptorId, IOperatorDescriptor> operators = jobSpec.getOperatorMap();
for (Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operators.entrySet()) {
- if (entry.getValue() instanceof AlgebricksMetaOperatorDescriptor) {
- AlgebricksMetaOperatorDescriptor op = ((AlgebricksMetaOperatorDescriptor) entry.getValue());
+ IOperatorDescriptor opDesc = entry.getValue();
+ IOperatorDescriptor actualOp = null;
+ if (opDesc instanceof FeedMetaOperatorDescriptor) {
+ actualOp = ((FeedMetaOperatorDescriptor) opDesc).getCoreOperator();
+ } else {
+ actualOp = opDesc;
+ }
+
+ if (actualOp instanceof AlgebricksMetaOperatorDescriptor) {
+ AlgebricksMetaOperatorDescriptor op = ((AlgebricksMetaOperatorDescriptor) actualOp);
IPushRuntimeFactory[] runtimeFactories = op.getPipeline().getRuntimeFactories();
for (IPushRuntimeFactory rf : runtimeFactories) {
if (rf instanceof AssignRuntimeFactory) {
computeOperatorIds.add(entry.getKey());
}
}
- } else if (entry.getValue() instanceof LSMTreeIndexInsertUpdateDeleteOperatorDescriptor) {
+ } else if (actualOp instanceof LSMTreeIndexInsertUpdateDeleteOperatorDescriptor) {
storageOperatorIds.add(entry.getKey());
- } else if (entry.getValue() instanceof FeedIntakeOperatorDescriptor) {
+ } else if (actualOp instanceof FeedIntakeOperatorDescriptor) {
ingestOperatorIds.add(entry.getKey());
}
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMTreeInsertDeleteOperatorDescriptor.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMTreeInsertDeleteOperatorDescriptor.java
index e58bbfa..6f6a7b2 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMTreeInsertDeleteOperatorDescriptor.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMTreeInsertDeleteOperatorDescriptor.java
@@ -56,4 +56,19 @@
recordDescProvider, op, isPrimary);
}
+ public boolean isPrimary() {
+ return isPrimary;
+ }
+
+ public int[] getFieldPermutations() {
+ return fieldPermutation;
+ }
+
+ public IndexOperation getIndexOperation() {
+ return op;
+ }
+
+ public IBinaryComparatorFactory[] getComparatorFactories() {
+ return comparatorFactories;
+ }
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
index ee87272..0889711 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
@@ -68,4 +68,16 @@
public Map<String, String> getFeedPolicy() {
return feedPolicy;
}
+
+ public IAdapterFactory getAdapterFactory() {
+ return adapterFactory;
+ }
+
+ public IAType getAtype() {
+ return atype;
+ }
+
+ public RecordDescriptor getRecordDescriptor() {
+ return recordDescriptors[0];
+ }
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
index 4ad7617..9acfc63 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
@@ -1,7 +1,10 @@
package edu.uci.ics.asterix.metadata.feeds;
import java.nio.ByteBuffer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.IActivity;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
@@ -14,56 +17,102 @@
public class FeedMetaOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
- private IOperatorDescriptor coreOperator;
+ private static final Logger LOGGER = Logger.getLogger(FeedMetaOperatorDescriptor.class.getName());
- public FeedMetaOperatorDescriptor(JobSpecification spec, IOperatorDescriptor coreOperatorDescriptor) {
+ private IOperatorDescriptor coreOperator;
+ private final FeedConnectionId feedConnectionId;
+ private final FeedPolicy feedPolicy;
+
+ public FeedMetaOperatorDescriptor(JobSpecification spec, FeedConnectionId feedConnectionId,
+ IOperatorDescriptor coreOperatorDescriptor, FeedPolicy feedPolicy) {
super(spec, coreOperatorDescriptor.getInputArity(), coreOperatorDescriptor.getOutputArity());
+ this.feedConnectionId = feedConnectionId;
+ this.feedPolicy = feedPolicy;
+ if (coreOperatorDescriptor.getOutputRecordDescriptors().length == 1) {
+ recordDescriptors[0] = coreOperatorDescriptor.getOutputRecordDescriptors()[0];
+ }
this.coreOperator = coreOperatorDescriptor;
}
@Override
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
- return new FeedMetaNodePushable(ctx, recordDescProvider, partition, nPartitions, coreOperator);
+ return new FeedMetaNodePushable(ctx, recordDescProvider, partition, nPartitions, coreOperator,
+ feedConnectionId, feedPolicy);
}
@Override
public String toString() {
- return coreOperator.getDisplayName();
+ return "FeedMeta [" + coreOperator + " ]";
}
private static class FeedMetaNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
private AbstractUnaryInputUnaryOutputOperatorNodePushable coreOperatorNodePushable;
+ private FeedPolicyEnforcer policyEnforcer;
public FeedMetaNodePushable(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
- int partition, int nPartitions, IOperatorDescriptor coreOperator) throws HyracksDataException {
+ int partition, int nPartitions, IOperatorDescriptor coreOperator, FeedConnectionId feedConnectionId,
+ FeedPolicy feedPolicy) throws HyracksDataException {
this.coreOperatorNodePushable = (AbstractUnaryInputUnaryOutputOperatorNodePushable) ((IActivity) coreOperator)
.createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
+ this.policyEnforcer = new FeedPolicyEnforcer(feedConnectionId, feedPolicy.getProperties());
}
@Override
public void open() throws HyracksDataException {
- // coreOperatorNodePushable.setOutputFrameWriter(partition, coreOperatorNodePushable, recordDescProvider);
coreOperatorNodePushable.setOutputFrameWriter(0, writer, recordDesc);
coreOperatorNodePushable.open();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Core Op:" + coreOperatorNodePushable.getDisplayName() + " open ");
+ }
+
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- coreOperatorNodePushable.nextFrame(buffer);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Core Op:" + coreOperatorNodePushable.getDisplayName() + " received frame ");
+ }
+ try {
+ coreOperatorNodePushable.nextFrame(buffer);
+ } catch (HyracksDataException e) {
+ // log tuple
+ if (policyEnforcer.getFeedPolicyAccessor().continueOnHardwareFailure()) {
+ } else {
+ throw e;
+ }
+ }
}
@Override
public void fail() throws HyracksDataException {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Core Op:" + coreOperatorNodePushable.getDisplayName() + " fail ");
+ }
coreOperatorNodePushable.fail();
}
@Override
public void close() throws HyracksDataException {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Core Op:" + coreOperatorNodePushable.getDisplayName() + " close ");
+ }
coreOperatorNodePushable.close();
}
}
+ public IOperatorDescriptor getCoreOperator() {
+ return coreOperator;
+ }
+
+ public FeedConnectionId getFeedConnectionId() {
+ return feedConnectionId;
+ }
+
+ public FeedPolicy getFeedPolicy() {
+ return feedPolicy;
+ }
+
}