[NO ISSUE][RT] Add Support For Feed Ingestion Without Message

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Add an option to specify whether FeedMetaStoreNodePushable
  should process a message as part of the feed data or not.

Change-Id: I804fb4ae884020906dc09be188fa976867662d40
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/7404
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index 8ea0ad4..20a97ed 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -309,7 +309,7 @@
                 if (opDesc instanceof LSMTreeInsertDeleteOperatorDescriptor
                         && ((LSMTreeInsertDeleteOperatorDescriptor) opDesc).isPrimary()) {
                     metaOp = new FeedMetaOperatorDescriptor(jobSpec, feedConnectionId, opDesc,
-                            feedPolicyEntity.getProperties(), FeedRuntimeType.STORE);
+                            feedPolicyEntity.getProperties(), FeedRuntimeType.STORE, true);
                     opId = metaOp.getOperatorId();
                     opDesc.setOperatorId(opId);
                 } else {
@@ -323,7 +323,7 @@
                             // anything on the network interface needs to be message compatible
                             if (connectorDesc instanceof MToNPartitioningConnectorDescriptor) {
                                 metaOp = new FeedMetaOperatorDescriptor(jobSpec, feedConnectionId, opDesc,
-                                        feedPolicyEntity.getProperties(), FeedRuntimeType.COMPUTE);
+                                        feedPolicyEntity.getProperties(), FeedRuntimeType.COMPUTE, true);
                                 opId = metaOp.getOperatorId();
                                 opDesc.setOperatorId(opId);
                             }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
index 74858ce..3f02927 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
@@ -67,14 +67,17 @@
     private final Map<String, String> feedPolicyProperties;
 
     /**
-     * type for the feed runtime associated with the operator.
+     * Type for the feed runtime associated with the operator.
      * Possible values: COMPUTE, STORE, OTHER
      **/
     private final FeedRuntimeType runtimeType;
 
+    /** Whether the incoming frame has a message that this operator should handle **/
+    private final boolean hasMessage;
+
     public FeedMetaOperatorDescriptor(final JobSpecification spec, final FeedConnectionId feedConnectionId,
             final IOperatorDescriptor coreOperatorDescriptor, final Map<String, String> feedPolicyProperties,
-            final FeedRuntimeType runtimeType) {
+            final FeedRuntimeType runtimeType, boolean hasMessage) {
         super(spec, coreOperatorDescriptor.getInputArity(), coreOperatorDescriptor.getOutputArity());
         this.feedConnectionId = feedConnectionId;
         this.feedPolicyProperties = feedPolicyProperties;
@@ -83,6 +86,7 @@
         }
         this.coreOperator = coreOperatorDescriptor;
         this.runtimeType = runtimeType;
+        this.hasMessage = hasMessage;
     }
 
     @Override
@@ -97,7 +101,7 @@
                 break;
             case STORE:
                 nodePushable = new FeedMetaStoreNodePushable(ctx, recordDescProvider, partition, nPartitions,
-                        coreOperator, feedConnectionId, feedPolicyProperties, this);
+                        coreOperator, feedConnectionId, feedPolicyProperties, this, hasMessage);
                 break;
             default:
                 throw new RuntimeDataException(ErrorCode.OPERATORS_FEED_META_OPERATOR_DESCRIPTOR_INVALID_RUNTIME,
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
index 94ae75c..7548313 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
@@ -51,8 +51,11 @@
 
     private static final Logger LOGGER = LogManager.getLogger();
 
+    /** Whether the incoming frame has a message that this operator should handle **/
+    private final boolean hasMessage;
+
     /** Runtime node pushable corresponding to the core feed operator **/
-    private AbstractUnaryInputUnaryOutputOperatorNodePushable insertOperator;
+    private final AbstractUnaryInputUnaryOutputOperatorNodePushable insertOperator;
 
     /**
      * A policy accessor that ensures dyanmic decisions for a feed are taken
@@ -92,10 +95,11 @@
 
     private final long traceCategory;
 
-    public FeedMetaStoreNodePushable(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
-            int partition, int nPartitions, IOperatorDescriptor coreOperator, FeedConnectionId feedConnectionId,
-            Map<String, String> feedPolicyProperties, FeedMetaOperatorDescriptor feedMetaOperatorDescriptor)
-            throws HyracksDataException {
+    FeedMetaStoreNodePushable(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, int partition,
+            int nPartitions, IOperatorDescriptor coreOperator, FeedConnectionId feedConnectionId,
+            Map<String, String> feedPolicyProperties, FeedMetaOperatorDescriptor feedMetaOperatorDescriptor,
+            boolean hasMessage) throws HyracksDataException {
+        this.hasMessage = hasMessage;
         this.ctx = ctx;
         this.insertOperator = (AbstractUnaryInputUnaryOutputOperatorNodePushable) ((IActivity) coreOperator)
                 .createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
@@ -104,8 +108,13 @@
         this.connectionId = feedConnectionId;
         this.feedManager = (ActiveManager) ((INcApplicationContext) ctx.getJobletContext().getServiceContext()
                 .getApplicationContext()).getActiveManager();
-        this.message = new VSizeFrame(ctx);
-        TaskUtil.put(HyracksConstants.KEY_MESSAGE, message, ctx);
+        if (hasMessage) {
+            this.message = new VSizeFrame(ctx);
+            TaskUtil.put(HyracksConstants.KEY_MESSAGE, message, ctx);
+        } else {
+            this.message = null;
+        }
+
         this.recordDescProvider = recordDescProvider;
         this.opDesc = feedMetaOperatorDescriptor;
         tracer = ctx.getJobletContext().getServiceContext().getTracer();
@@ -147,7 +156,9 @@
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         long tid = tracer.durationB("Ingestion-Store", traceCategory, null);
         try {
-            FeedUtils.processFeedMessage(buffer, message, fta);
+            if (hasMessage) {
+                FeedUtils.processFeedMessage(buffer, message, fta);
+            }
             writer.nextFrame(buffer);
         } catch (Exception e) {
             LOGGER.log(Level.WARN, "Failure Processing a frame at store side", e);