[NO ISSUE][RT] Add Support For Feed Ingestion Without Message
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Add an option to specify whether FeedMetaStoreNodePushable
should process a message as part of the feed data or not.
Change-Id: I804fb4ae884020906dc09be188fa976867662d40
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/7404
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index 8ea0ad4..20a97ed 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -309,7 +309,7 @@
if (opDesc instanceof LSMTreeInsertDeleteOperatorDescriptor
&& ((LSMTreeInsertDeleteOperatorDescriptor) opDesc).isPrimary()) {
metaOp = new FeedMetaOperatorDescriptor(jobSpec, feedConnectionId, opDesc,
- feedPolicyEntity.getProperties(), FeedRuntimeType.STORE);
+ feedPolicyEntity.getProperties(), FeedRuntimeType.STORE, true);
opId = metaOp.getOperatorId();
opDesc.setOperatorId(opId);
} else {
@@ -323,7 +323,7 @@
// anything on the network interface needs to be message compatible
if (connectorDesc instanceof MToNPartitioningConnectorDescriptor) {
metaOp = new FeedMetaOperatorDescriptor(jobSpec, feedConnectionId, opDesc,
- feedPolicyEntity.getProperties(), FeedRuntimeType.COMPUTE);
+ feedPolicyEntity.getProperties(), FeedRuntimeType.COMPUTE, true);
opId = metaOp.getOperatorId();
opDesc.setOperatorId(opId);
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
index 74858ce..3f02927 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
@@ -67,14 +67,17 @@
private final Map<String, String> feedPolicyProperties;
/**
- * type for the feed runtime associated with the operator.
+ * Type for the feed runtime associated with the operator.
* Possible values: COMPUTE, STORE, OTHER
**/
private final FeedRuntimeType runtimeType;
+ /** Whether the incoming frame has a message that this operator should handle **/
+ private final boolean hasMessage;
+
public FeedMetaOperatorDescriptor(final JobSpecification spec, final FeedConnectionId feedConnectionId,
final IOperatorDescriptor coreOperatorDescriptor, final Map<String, String> feedPolicyProperties,
- final FeedRuntimeType runtimeType) {
+ final FeedRuntimeType runtimeType, boolean hasMessage) {
super(spec, coreOperatorDescriptor.getInputArity(), coreOperatorDescriptor.getOutputArity());
this.feedConnectionId = feedConnectionId;
this.feedPolicyProperties = feedPolicyProperties;
@@ -83,6 +86,7 @@
}
this.coreOperator = coreOperatorDescriptor;
this.runtimeType = runtimeType;
+ this.hasMessage = hasMessage;
}
@Override
@@ -97,7 +101,7 @@
break;
case STORE:
nodePushable = new FeedMetaStoreNodePushable(ctx, recordDescProvider, partition, nPartitions,
- coreOperator, feedConnectionId, feedPolicyProperties, this);
+ coreOperator, feedConnectionId, feedPolicyProperties, this, hasMessage);
break;
default:
throw new RuntimeDataException(ErrorCode.OPERATORS_FEED_META_OPERATOR_DESCRIPTOR_INVALID_RUNTIME,
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
index 94ae75c..7548313 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
@@ -51,8 +51,11 @@
private static final Logger LOGGER = LogManager.getLogger();
+ /** Whether the incoming frame has a message that this operator should handle **/
+ private final boolean hasMessage;
+
/** Runtime node pushable corresponding to the core feed operator **/
- private AbstractUnaryInputUnaryOutputOperatorNodePushable insertOperator;
+ private final AbstractUnaryInputUnaryOutputOperatorNodePushable insertOperator;
/**
* A policy accessor that ensures dyanmic decisions for a feed are taken
@@ -92,10 +95,11 @@
private final long traceCategory;
- public FeedMetaStoreNodePushable(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
- int partition, int nPartitions, IOperatorDescriptor coreOperator, FeedConnectionId feedConnectionId,
- Map<String, String> feedPolicyProperties, FeedMetaOperatorDescriptor feedMetaOperatorDescriptor)
- throws HyracksDataException {
+ FeedMetaStoreNodePushable(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, int partition,
+ int nPartitions, IOperatorDescriptor coreOperator, FeedConnectionId feedConnectionId,
+ Map<String, String> feedPolicyProperties, FeedMetaOperatorDescriptor feedMetaOperatorDescriptor,
+ boolean hasMessage) throws HyracksDataException {
+ this.hasMessage = hasMessage;
this.ctx = ctx;
this.insertOperator = (AbstractUnaryInputUnaryOutputOperatorNodePushable) ((IActivity) coreOperator)
.createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
@@ -104,8 +108,13 @@
this.connectionId = feedConnectionId;
this.feedManager = (ActiveManager) ((INcApplicationContext) ctx.getJobletContext().getServiceContext()
.getApplicationContext()).getActiveManager();
- this.message = new VSizeFrame(ctx);
- TaskUtil.put(HyracksConstants.KEY_MESSAGE, message, ctx);
+ if (hasMessage) {
+ this.message = new VSizeFrame(ctx);
+ TaskUtil.put(HyracksConstants.KEY_MESSAGE, message, ctx);
+ } else {
+ this.message = null;
+ }
+
this.recordDescProvider = recordDescProvider;
this.opDesc = feedMetaOperatorDescriptor;
tracer = ctx.getJobletContext().getServiceContext().getTracer();
@@ -147,7 +156,9 @@
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
long tid = tracer.durationB("Ingestion-Store", traceCategory, null);
try {
- FeedUtils.processFeedMessage(buffer, message, fta);
+ if (hasMessage) {
+ FeedUtils.processFeedMessage(buffer, message, fta);
+ }
writer.nextFrame(buffer);
} catch (Exception e) {
LOGGER.log(Level.WARN, "Failure Processing a frame at store side", e);