[NO ISSUE][OTH] Add option to enable/disable ingestion events logging
- user model changes: yes
- storage format changes: no
- interface changes: no
Details:
- Add new feed option (log-ingestion-events) to enable/disable
ingestion events logging and default it to true.
Change-Id: I890c37e2ad70a5c7742e97ebdc311c9eae04e0b4
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17518
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FunctionReader.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FunctionReader.java
index c73f8e8..f0ecd89 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FunctionReader.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FunctionReader.java
@@ -22,7 +22,7 @@
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
-import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.asterix.external.util.IFeedLogManager;
import org.apache.hyracks.api.exceptions.HyracksDataException;
public abstract class FunctionReader implements IRecordReader<char[]> {
@@ -43,7 +43,7 @@
}
@Override
- public void setFeedLogManager(FeedLogManager feedLogManager) throws HyracksDataException {
+ public void setFeedLogManager(IFeedLogManager feedLogManager) throws HyracksDataException {
throw new UnsupportedOperationException();
}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_01/feeds_01.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_01/feeds_01.1.adm
index 8ada33c..b0a09aa 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_01/feeds_01.1.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_01/feeds_01.1.adm
@@ -1 +1 @@
-{ "DataverseName": "feeds", "FeedName": "TweetFeed", "AdapterConfiguration": {{ { "Name": "dataset-dataverse", "Value": "feeds" }, { "Name": "path", "Value": "asterix_nc1://data/twitter/obamatweets.adm" }, { "Name": "feed", "Value": "TweetFeed" }, { "Name": "adapter-name", "Value": "localfs" }, { "Name": "is-feed", "Value": "true" }, { "Name": "parser", "Value": "adm" }, { "Name": "reader", "Value": "localfs" }, { "Name": "format", "Value": "adm" }, { "Name": "tuple-interval", "Value": "10" }, { "Name": "type-name", "Value": "TweetType" } }}, "Timestamp": "Tue Mar 31 10:30:06 PDT 2020" }
\ No newline at end of file
+{ "DataverseName": "feeds", "FeedName": "TweetFeed", "AdapterConfiguration": {{ { "Name": "dataset-dataverse", "Value": "feeds" }, { "Name": "path", "Value": "asterix_nc1://data/twitter/obamatweets.adm" }, { "Name": "feed", "Value": "TweetFeed" }, { "Name": "log-ingestion-events", "Value": "true" }, { "Name": "adapter-name", "Value": "localfs" }, { "Name": "is-feed", "Value": "true" }, { "Name": "parser", "Value": "adm" }, { "Name": "reader", "Value": "localfs" }, { "Name": "format", "Value": "adm" }, { "Name": "tuple-interval", "Value": "10" }, { "Name": "type-name", "Value": "TweetType" } }}, "Timestamp": "Mon May 08 20:53:16 PDT 2023" }
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
index 214f3b3..fb08586 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
@@ -41,6 +41,8 @@
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.external.util.FeedLogManager;
import org.apache.asterix.external.util.FeedUtils;
+import org.apache.asterix.external.util.IFeedLogManager;
+import org.apache.asterix.external.util.NoOpFeedLogManager;
import org.apache.asterix.om.types.ARecordType;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -57,16 +59,17 @@
public class GenericAdapterFactory implements ITypedAdapterFactory {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
private static final Logger LOGGER = LogManager.getLogger();
private IExternalDataSourceFactory dataSourceFactory;
private IDataParserFactory dataParserFactory;
private ARecordType recordType;
private Map<String, String> configuration;
private boolean isFeed;
+ private boolean logIngestionEvents;
private FileSplit[] feedLogFileSplits;
private ARecordType metaType;
- private transient FeedLogManager feedLogManager;
+ private transient IFeedLogManager feedLogManager;
@Override
public String getAlias() {
@@ -92,12 +95,14 @@
LOGGER.log(Level.INFO, "Failure restoring external objects", e);
throw HyracksDataException.create(e);
}
- if (isFeed) {
- if (feedLogManager == null) {
+ if (isFeed && feedLogManager == null) {
+ if (logIngestionEvents) {
feedLogManager =
new FeedLogManager(feedLogFileSplits[partition].getFileReference(ctx.getIoManager()).getFile());
+ feedLogManager.touch();
+ } else {
+ feedLogManager = NoOpFeedLogManager.INSTANCE;
}
- feedLogManager.touch();
}
IDataFlowController controller = DataflowControllerProvider.getDataflowController(recordType, ctx, partition,
dataSourceFactory, dataParserFactory, configuration, isFeed, feedLogManager);
@@ -145,7 +150,8 @@
private void configureFeedLogManager(ICcApplicationContext appCtx)
throws HyracksDataException, AlgebricksException {
this.isFeed = ExternalDataUtils.isFeed(configuration);
- if (isFeed) {
+ this.logIngestionEvents = ExternalDataUtils.isLogIngestionEvents(configuration);
+ if (logIngestionEvents) {
//TODO(partitioning) make this code reuse DataPartitioningProvider
feedLogFileSplits = FeedUtils.splitsForAdapter(ExternalDataUtils.getDatasetDataverse(configuration),
ExternalDataUtils.getFeedName(configuration), dataSourceFactory.getPartitionConstraint());
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/AsterixInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/AsterixInputStream.java
index f959f8d..43e822e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/AsterixInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/AsterixInputStream.java
@@ -21,13 +21,13 @@
import java.io.InputStream;
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
-import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.asterix.external.util.IFeedLogManager;
import org.apache.hyracks.api.exceptions.HyracksDataException;
public abstract class AsterixInputStream extends InputStream {
protected AbstractFeedDataFlowController controller;
- protected FeedLogManager logManager;
+ protected IFeedLogManager logManager;
protected IStreamNotificationHandler notificationHandler;
public abstract boolean stop() throws Exception;
@@ -40,7 +40,7 @@
}
// TODO: Find a better way to send notifications
- public void setFeedLogManager(FeedLogManager logManager) throws HyracksDataException {
+ public void setFeedLogManager(IFeedLogManager logManager) throws HyracksDataException {
this.logManager = logManager;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
index 0e6ddb2..90dc6c4 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
@@ -26,7 +26,7 @@
import org.apache.asterix.active.message.ActiveManagerMessage;
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.asterix.external.util.IFeedLogManager;
import org.apache.hyracks.api.exceptions.HyracksDataException;
/**
@@ -71,7 +71,7 @@
*
* @throws HyracksDataException
*/
- public void setFeedLogManager(FeedLogManager feedLogManager) throws HyracksDataException;
+ public void setFeedLogManager(IFeedLogManager feedLogManager) throws HyracksDataException;
/**
* gives the record reader a chance to recover from IO errors during feed intake
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
index bd422ef..2dcb2e4 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
@@ -23,7 +23,7 @@
import org.apache.asterix.active.message.ActiveManagerMessage;
import org.apache.asterix.external.api.IDataFlowController;
-import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.asterix.external.util.IFeedLogManager;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -33,11 +33,11 @@
protected final IHyracksTaskContext ctx;
protected final int numOfFields;
protected final ArrayTupleBuilder tb;
- protected final FeedLogManager feedLogManager;
+ protected final IFeedLogManager feedLogManager;
protected boolean flushing;
protected long incomingRecordsCount = 0;
- public AbstractFeedDataFlowController(IHyracksTaskContext ctx, FeedLogManager feedLogManager, int numOfFields) {
+ public AbstractFeedDataFlowController(IHyracksTaskContext ctx, IFeedLogManager feedLogManager, int numOfFields) {
this.feedLogManager = feedLogManager;
this.numOfFields = numOfFields;
this.ctx = ctx;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
index b14722b..7c48035 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
@@ -21,7 +21,7 @@
import org.apache.asterix.external.api.IRawRecord;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.api.IRecordWithPKDataParser;
-import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.asterix.external.util.IFeedLogManager;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -30,7 +30,7 @@
private final IRecordWithPKDataParser<T> dataParser;
- public ChangeFeedDataFlowController(final IHyracksTaskContext ctx, final FeedLogManager feedLogManager,
+ public ChangeFeedDataFlowController(final IHyracksTaskContext ctx, final IFeedLogManager feedLogManager,
final int numOfOutputFields, final IRecordWithPKDataParser<T> dataParser,
final IRecordReader<T> recordReader) throws HyracksDataException {
super(ctx, feedLogManager, numOfOutputFields, dataParser, recordReader);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
index 621397b..a873060 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
@@ -21,14 +21,14 @@
import org.apache.asterix.external.api.IRawRecord;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.api.IRecordWithMetadataParser;
-import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.asterix.external.util.IFeedLogManager;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
public class ChangeFeedWithMetaDataFlowController<T> extends FeedWithMetaDataFlowController<T> {
- public ChangeFeedWithMetaDataFlowController(final IHyracksTaskContext ctx, final FeedLogManager feedLogManager,
+ public ChangeFeedWithMetaDataFlowController(final IHyracksTaskContext ctx, final IFeedLogManager feedLogManager,
final int numOfOutputFields, final IRecordWithMetadataParser<T> dataParser,
final IRecordReader<T> recordReader) throws HyracksDataException {
super(ctx, feedLogManager, numOfOutputFields, dataParser, recordReader);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index 4279ebd..ed46471 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -28,7 +28,7 @@
import org.apache.asterix.external.api.IRecordDataParser;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.asterix.external.util.IFeedLogManager;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -59,7 +59,7 @@
protected State state = State.CREATED;
protected long failedRecordsCount = 0;
- public FeedRecordDataFlowController(IHyracksTaskContext ctx, FeedLogManager feedLogManager, int numOfOutputFields,
+ public FeedRecordDataFlowController(IHyracksTaskContext ctx, IFeedLogManager feedLogManager, int numOfOutputFields,
IRecordDataParser<T> dataParser, IRecordReader<T> recordReader) throws HyracksDataException {
super(ctx, feedLogManager, numOfOutputFields);
this.dataParser = dataParser;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
index 9f4a3b0..1f0a135 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
@@ -22,7 +22,7 @@
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.api.IStreamDataParser;
-import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.asterix.external.util.IFeedLogManager;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -33,7 +33,7 @@
private final IStreamDataParser dataParser;
private final AsterixInputStream stream;
- public FeedStreamDataFlowController(IHyracksTaskContext ctx, FeedLogManager feedLogManager,
+ public FeedStreamDataFlowController(IHyracksTaskContext ctx, IFeedLogManager feedLogManager,
IStreamDataParser streamParser, AsterixInputStream inputStream) {
super(ctx, feedLogManager, 1);
this.dataParser = streamParser;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
index 289c16f..74bfe26 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
@@ -21,7 +21,7 @@
import org.apache.asterix.external.api.IRawRecord;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.api.IRecordWithMetadataParser;
-import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.asterix.external.util.IFeedLogManager;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -30,8 +30,9 @@
protected final IRecordWithMetadataParser<T> dataParser;
- public FeedWithMetaDataFlowController(IHyracksTaskContext ctx, FeedLogManager feedLogManager, int numOfOutputFields,
- IRecordWithMetadataParser<T> dataParser, IRecordReader<T> recordReader) throws HyracksDataException {
+ public FeedWithMetaDataFlowController(IHyracksTaskContext ctx, IFeedLogManager feedLogManager,
+ int numOfOutputFields, IRecordWithMetadataParser<T> dataParser, IRecordReader<T> recordReader)
+ throws HyracksDataException {
super(ctx, feedLogManager, numOfOutputFields, dataParser, recordReader);
this.dataParser = dataParser;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/AbstractHDFSRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/AbstractHDFSRecordReader.java
index 0d498b8..11241f1 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/AbstractHDFSRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/AbstractHDFSRecordReader.java
@@ -24,7 +24,7 @@
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
import org.apache.asterix.external.input.record.GenericRecord;
-import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.asterix.external.util.IFeedLogManager;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
@@ -151,7 +151,7 @@
}
@Override
- public void setFeedLogManager(FeedLogManager feedLogManager) {
+ public void setFeedLogManager(IFeedLogManager feedLogManager) {
}
@Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/http/HttpServerRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/http/HttpServerRecordReader.java
index 81ad5ba..e70e4a4 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/http/HttpServerRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/http/HttpServerRecordReader.java
@@ -28,7 +28,7 @@
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
import org.apache.asterix.external.input.record.CharArrayRecord;
-import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.asterix.external.util.IFeedLogManager;
import org.apache.hyracks.http.api.IServletRequest;
import org.apache.hyracks.http.api.IServletResponse;
import org.apache.hyracks.http.server.AbstractServlet;
@@ -98,7 +98,7 @@
}
@Override
- public void setFeedLogManager(FeedLogManager feedLogManager) {
+ public void setFeedLogManager(IFeedLogManager feedLogManager) {
// do nothing
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReader.java
index 1f97300..d98baa5 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReader.java
@@ -29,7 +29,7 @@
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
import org.apache.asterix.external.input.record.GenericRecord;
-import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.asterix.external.util.IFeedLogManager;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -135,7 +135,7 @@
}
@Override
- public void setFeedLogManager(FeedLogManager feedLogManager) {
+ public void setFeedLogManager(IFeedLogManager feedLogManager) {
}
@Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java
index 1e003a1..937f3fe 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java
@@ -34,7 +34,7 @@
import org.apache.asterix.external.input.record.CharArrayRecord;
import org.apache.asterix.external.input.stream.AsterixInputStreamReader;
import org.apache.asterix.external.util.ExternalDataUtils;
-import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.asterix.external.util.IFeedLogManager;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -45,7 +45,7 @@
protected int bufferLength = 0;
protected int bufferPosn = 0;
protected boolean done = false;
- protected FeedLogManager feedLogManager;
+ protected IFeedLogManager feedLogManager;
private Supplier<String> dataSourceName = EMPTY_STRING;
private Supplier<String> previousDataSourceName = EMPTY_STRING;
@@ -91,7 +91,7 @@
public abstract boolean hasNext() throws IOException;
@Override
- public void setFeedLogManager(FeedLogManager feedLogManager) throws HyracksDataException {
+ public void setFeedLogManager(IFeedLogManager feedLogManager) throws HyracksDataException {
this.feedLogManager = feedLogManager;
reader.setFeedLogManager(feedLogManager);
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
index 90662b2..238ee30 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
@@ -25,7 +25,7 @@
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
import org.apache.asterix.external.input.record.CharArrayRecord;
-import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.asterix.external.util.IFeedLogManager;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import twitter4j.Query;
@@ -97,7 +97,7 @@
}
@Override
- public void setFeedLogManager(FeedLogManager feedLogManager) {
+ public void setFeedLogManager(IFeedLogManager feedLogManager) {
// do nothing
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
index 0f93914..11a956e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
@@ -25,7 +25,7 @@
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
import org.apache.asterix.external.input.record.CharArrayRecord;
-import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.asterix.external.util.IFeedLogManager;
import org.apache.asterix.external.util.TwitterUtil;
import twitter4j.FilterQuery;
@@ -101,7 +101,7 @@
}
@Override
- public void setFeedLogManager(FeedLogManager feedLogManager) {
+ public void setFeedLogManager(IFeedLogManager feedLogManager) {
// do nothing
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java
index 81da0b0..2dc323e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java
@@ -27,7 +27,7 @@
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
-import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.asterix.external.util.IFeedLogManager;
import org.apache.hyracks.api.exceptions.HyracksDataException;
public class AsterixInputStreamReader extends Reader {
@@ -60,7 +60,7 @@
in.setController(controller);
}
- public void setFeedLogManager(FeedLogManager feedLogManager) throws HyracksDataException {
+ public void setFeedLogManager(IFeedLogManager feedLogManager) throws HyracksDataException {
in.setFeedLogManager(feedLogManager);
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
index ad762bf..b6cfc54 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
@@ -27,8 +27,8 @@
import java.io.IOException;
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
-import org.apache.asterix.external.util.FeedLogManager;
import org.apache.asterix.external.util.FileSystemWatcher;
+import org.apache.asterix.external.util.IFeedLogManager;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
@@ -51,7 +51,7 @@
}
@Override
- public void setFeedLogManager(FeedLogManager logManager) throws HyracksDataException {
+ public void setFeedLogManager(IFeedLogManager logManager) throws HyracksDataException {
super.setFeedLogManager(logManager);
watcher.setFeedLogManager(logManager);
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
index c531509..8902e82 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
@@ -45,7 +45,7 @@
import org.apache.asterix.external.dataflow.RecordDataFlowController;
import org.apache.asterix.external.dataflow.StreamDataFlowController;
import org.apache.asterix.external.util.ExternalDataUtils;
-import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.asterix.external.util.IFeedLogManager;
import org.apache.asterix.om.types.ARecordType;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -59,7 +59,7 @@
@SuppressWarnings({ "rawtypes", "unchecked" })
public static IDataFlowController getDataflowController(ARecordType recordType, IHyracksTaskContext ctx,
int partition, IExternalDataSourceFactory dataSourceFactory, IDataParserFactory dataParserFactory,
- Map<String, String> configuration, boolean isFeed, FeedLogManager feedLogManager)
+ Map<String, String> configuration, boolean isFeed, IFeedLogManager feedLogManager)
throws HyracksDataException {
try {
switch (dataSourceFactory.getDataSourceType()) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 5a5994e..0080e9b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -90,6 +90,7 @@
public static final String KEY_RSS_URL = "url";
public static final String KEY_INTERVAL = "interval";
public static final String KEY_IS_FEED = "is-feed";
+ public static final String KEY_LOG_INGESTION_EVENTS = "log-ingestion-events";
public static final String KEY_WAIT_FOR_DATA = "wait-for-data";
public static final String KEY_FEED_NAME = "feed";
// a string representing external bucket name
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index cb47c5d..35e68ed 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -323,10 +323,22 @@
}
}
+ public static boolean isLogIngestionEvents(Map<String, String> configuration) {
+ if (!isFeed(configuration)) {
+ return false;
+ }
+ if (!configuration.containsKey(ExternalDataConstants.KEY_LOG_INGESTION_EVENTS)) {
+ return true;
+ } else {
+ return Boolean.parseBoolean(configuration.get(ExternalDataConstants.KEY_LOG_INGESTION_EVENTS));
+ }
+ }
+
public static void prepareFeed(Map<String, String> configuration, DataverseName dataverseName, String feedName) {
if (!configuration.containsKey(ExternalDataConstants.KEY_IS_FEED)) {
configuration.put(ExternalDataConstants.KEY_IS_FEED, ExternalDataConstants.TRUE);
}
+ configuration.computeIfAbsent(ExternalDataConstants.KEY_LOG_INGESTION_EVENTS, k -> ExternalDataConstants.TRUE);
configuration.put(ExternalDataConstants.KEY_DATASET_DATAVERSE, dataverseName.getCanonicalForm());
configuration.put(ExternalDataConstants.KEY_FEED_NAME, feedName);
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java
index 282b5366..4908475 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java
@@ -20,7 +20,6 @@
import java.io.BufferedReader;
import java.io.BufferedWriter;
-import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@@ -37,7 +36,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.util.LogRedactionUtil;
-public class FeedLogManager implements Closeable {
+public class FeedLogManager implements IFeedLogManager {
public enum LogEntryType {
START, // partition start
@@ -76,6 +75,7 @@
}
}
+ @Override
public synchronized void touch() {
count++;
}
@@ -85,6 +85,7 @@
completed.add(currentPartition);
}
+ @Override
public synchronized void endPartition(String partition) throws IOException {
currentPartition = partition;
logProgress(END_PREFIX + currentPartition);
@@ -96,11 +97,11 @@
logProgress(START_PREFIX + currentPartition);
}
- public boolean exists() {
+ private boolean exists() {
return Files.exists(dir);
}
- public synchronized void open() throws IOException {
+ private synchronized void open() throws IOException {
// read content of logs.
try (BufferedReader reader = Files.newBufferedReader(
Paths.get(dir.toAbsolutePath().toString() + File.separator + PROGRESS_LOG_FILE_NAME))) {
@@ -135,7 +136,7 @@
recordLogger.close();
}
- public synchronized boolean create() throws IOException {
+ private synchronized boolean create() throws IOException {
File f = dir.toFile();
f.mkdirs();
new File(f, PROGRESS_LOG_FILE_NAME).createNewFile();
@@ -150,7 +151,8 @@
return true;
}
- private synchronized void logProgress(String log) throws IOException {
+ @Override
+ public synchronized void logProgress(String log) throws IOException {
stringBuilder.setLength(0);
stringBuilder.append(df.format((new Date())));
stringBuilder.append(' ');
@@ -160,6 +162,7 @@
progressLogger.flush();
}
+ @Override
public synchronized void logError(String error, Throwable th) throws IOException {
stringBuilder.setLength(0);
stringBuilder.append(df.format((new Date())));
@@ -172,6 +175,7 @@
errorLogger.flush();
}
+ @Override
public synchronized void logRecord(String record, String errorMessage) throws IOException {
stringBuilder.setLength(0);
stringBuilder.append(LogRedactionUtil.userData(record));
@@ -188,6 +192,7 @@
return log.substring(PREFIX_SIZE);
}
+ @Override
public synchronized boolean isSplitRead(String split) {
return completed.contains(split);
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java
index 919b74f..54369fb 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java
@@ -49,7 +49,7 @@
private final LinkedList<File> files = new LinkedList<File>();
private Iterator<File> it;
private final String expression;
- private FeedLogManager logManager;
+ private IFeedLogManager logManager;
private final List<Path> paths;
private final boolean isFeed;
private boolean done;
@@ -66,7 +66,7 @@
}
}
- public synchronized void setFeedLogManager(FeedLogManager feedLogManager) throws HyracksDataException {
+ public synchronized void setFeedLogManager(IFeedLogManager feedLogManager) throws HyracksDataException {
if (logManager == null) {
this.logManager = feedLogManager;
init();
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/IFeedLogManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/IFeedLogManager.java
new file mode 100644
index 0000000..7b2f247
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/IFeedLogManager.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public interface IFeedLogManager extends Closeable {
+ void touch();
+
+ void endPartition(String partition) throws IOException;
+
+ void logProgress(String log) throws IOException;
+
+ void logError(String error, Throwable th) throws IOException;
+
+ void logRecord(String record, String errorMessage) throws IOException;
+
+ boolean isSplitRead(String split);
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/NoOpFeedLogManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/NoOpFeedLogManager.java
new file mode 100644
index 0000000..6b2743f
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/NoOpFeedLogManager.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util;
+
+import java.io.IOException;
+
+public class NoOpFeedLogManager implements IFeedLogManager {
+
+ public static final NoOpFeedLogManager INSTANCE = new NoOpFeedLogManager();
+
+ @Override
+ public void touch() {
+ //no op
+ }
+
+ @Override
+ public void endPartition(String partition) throws IOException {
+ //no op
+ }
+
+ @Override
+ public void logProgress(String log) throws IOException {
+ //no op
+ }
+
+ @Override
+ public void logError(String error, Throwable th) throws IOException {
+ //no op
+ }
+
+ @Override
+ public void logRecord(String record, String errorMessage) throws IOException {
+ //no op
+ }
+
+ @Override
+ public boolean isSplitRead(String split) {
+ return false;
+ }
+
+ @Override
+ public void close() throws IOException {
+ //no op
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/TestAsterixMembersReader.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/TestAsterixMembersReader.java
index 8e6f346..4332004 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/TestAsterixMembersReader.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/TestAsterixMembersReader.java
@@ -26,7 +26,7 @@
import org.apache.asterix.external.input.record.CharArrayRecord;
import org.apache.asterix.external.input.record.GenericRecord;
import org.apache.asterix.external.input.record.RecordWithPK;
-import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.asterix.external.util.IFeedLogManager;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
@@ -83,7 +83,7 @@
}
@Override
- public void setFeedLogManager(final FeedLogManager feedLogManager) {
+ public void setFeedLogManager(final IFeedLogManager feedLogManager) {
}
@Override
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReader.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReader.java
index ec1db67..e0cbb88 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReader.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReader.java
@@ -27,7 +27,7 @@
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
import org.apache.asterix.external.input.record.GenericRecord;
import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.asterix.external.util.IFeedLogManager;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -187,7 +187,7 @@
}
@Override
- public void setFeedLogManager(final FeedLogManager feedLogManager) {
+ public void setFeedLogManager(final IFeedLogManager feedLogManager) {
}
@Override