Improve External Data

This change enable an adapter to specify its data parser and not have
the user specify it and pass it as part of the adapter configurations.
In addition, it introduces a new parameter "parser-factory" that can be
used to specifies a parser factory instead of using parser to specify
a parser factory.

Change-Id: Iae2560c73fa63e9454f731b8e893ae779a2ac7d9
Reviewed-on: https://asterix-gerrit.ics.uci.edu/906
Reviewed-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
index a0060b4..327154f 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
@@ -234,7 +234,12 @@
             if (metaTypeName == null) {
                 throw new AlgebricksException("Feed to a dataset with metadata doesn't have meta type specified");
             }
-            metaType = (ARecordType) metadataProvider.findType(aqlId.getDataverseName(), metaTypeName);
+            String dataverseName = aqlId.getDataverseName();
+            if (metaTypeName.contains(".")) {
+                dataverseName = metaTypeName.substring(0, metaTypeName.indexOf('.'));
+                metaTypeName = metaTypeName.substring(metaTypeName.indexOf('.') + 1);
+            }
+            metaType = (ARecordType) metadataProvider.findType(dataverseName, metaTypeName);
         }
         // Is a change feed?
         List<IAType> pkTypes = null;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java
index a807515..7be79fd 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java
@@ -96,7 +96,7 @@
                     }
                 } catch (Exception e) {
                     if (LOGGER.isEnabledFor(Level.FATAL)) {
-                        LOGGER.fatal("Exception in executing " + request);
+                        LOGGER.fatal("Exception in executing " + request, e);
                     }
                 }
             }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
index 6c5f70a..3d91c7d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
@@ -861,7 +861,7 @@
                         if (subType.isOpen()) {
                             isOpen = true;
                             break;
-                        } ;
+                        };
                     }
                 }
                 if (fieldExpr.second == null) {
@@ -1379,8 +1379,7 @@
                 }
             }
 
-            Map<FeedConnectionId, Pair<JobSpecification, Boolean>> disconnectJobList =
-                    new HashMap<FeedConnectionId, Pair<JobSpecification, Boolean>>();
+            Map<FeedConnectionId, Pair<JobSpecification, Boolean>> disconnectJobList = new HashMap<>();
             if (ds.getDatasetType() == DatasetType.INTERNAL) {
                 // prepare job spec(s) that would disconnect any active feeds involving the dataset.
                 List<FeedConnectionId> feedConnections = FeedLifecycleListener.INSTANCE.getActiveFeedConnections(null);
@@ -1950,7 +1949,7 @@
 
     private JobSpecification rewriteCompileQuery(AqlMetadataProvider metadataProvider, Query query,
             ICompiledDmlStatement stmt)
-            throws AsterixException, RemoteException, AlgebricksException, JSONException, ACIDException {
+                    throws AsterixException, RemoteException, AlgebricksException, JSONException, ACIDException {
 
         // Query Rewriting (happens under the same ongoing metadata transaction)
         Pair<Query, Integer> reWrittenQuery = apiFramework.reWriteQuery(declaredFunctions, metadataProvider, query,
@@ -1999,7 +1998,7 @@
                 default:
                     throw new IllegalStateException();
             }
-
+            FeedMetadataUtil.validateFeed(feed, mdTxnCtx);
             MetadataManager.INSTANCE.addFeed(metadataProvider.getMetadataTxnContext(), feed);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         } catch (Exception e) {
@@ -2260,7 +2259,7 @@
      */
     private Triple<FeedConnectionRequest, Boolean, List<IFeedJoint>> getFeedConnectionRequest(String dataverse,
             Feed feed, String dataset, FeedPolicyEntity feedPolicy, MetadataTransactionContext mdTxnCtx)
-            throws MetadataException {
+                    throws MetadataException {
         IFeedJoint sourceFeedJoint = null;
         FeedConnectionRequest request = null;
         List<String> functionsToApply = new ArrayList<String>();
@@ -2920,7 +2919,7 @@
     private void prepareRunExternalRuntime(AqlMetadataProvider metadataProvider, IHyracksClientConnection hcc,
             RunStatement pregelixStmt, String dataverseNameFrom, String dataverseNameTo, String datasetNameFrom,
             String datasetNameTo, MetadataTransactionContext mdTxnCtx)
-            throws AlgebricksException, AsterixException, Exception {
+                    throws AlgebricksException, AsterixException, Exception {
         // Validates the source/sink dataverses and datasets.
         Dataset fromDataset = metadataProvider.findDataset(dataverseNameFrom, datasetNameFrom);
         if (fromDataset == null) {
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.1.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.1.ddl.aql
index 31d6ea8..2c74ac7 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.1.ddl.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.1.ddl.aql
@@ -31,11 +31,9 @@
 
 create type KVMetaType as open{
 "key":string,
-bucket:string,
 vbucket:int32,
 seq:int64,
 cas:int64,
-creationTime:int64,
 expiration:int32,
 flags:int32,
 revSeq:int64,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_01/feeds_01.1.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_01/feeds_01.1.ddl.aql
index 69f87d5..171c067 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_01/feeds_01.1.ddl.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_01/feeds_01.1.ddl.aql
@@ -37,5 +37,5 @@
 primary key id;
 
 create feed TweetFeed
-using file_feed
-(("output-type-name"="TweetType"),("fs"="localfs"),("path"="asterix_nc1://data/twitter/obamatweets.adm"),("format"="adm"),("tuple-interval"="10"))
+using localfs
+(("type-name"="TweetType"),("path"="asterix_nc1://data/twitter/obamatweets.adm"),("format"="adm"),("tuple-interval"="10"))
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_03/feeds_03.1.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_03/feeds_03.1.ddl.aql
index 3b1595a..5a2a712 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_03/feeds_03.1.ddl.aql
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_03/feeds_03.1.ddl.aql
@@ -42,6 +42,6 @@
 }
 
 create feed TweetFeed
-using file_feed
-(("output-type-name"="TweetType"),("fs"="localfs"),("path"="asterix_nc1://data/twitter/obamatweets.adm"),("format"="adm"),("tuple-interval"="10"))
+using localfs
+(("type-name"="TweetType"),("path"="asterix_nc1://data/twitter/obamatweets.adm"),("format"="adm"),("tuple-interval"="10"))
 apply function feed_processor;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_01/feeds_01.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_01/feeds_01.1.adm
index 280974e..ff53e4c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_01/feeds_01.1.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_01/feeds_01.1.adm
@@ -1 +1 @@
-{ "DataverseName": "feeds", "FeedName": "TweetFeed", "FeedType": "PRIMARY", "PrimaryTypeDetails": { "AdapterName": "file_feed", "AdapterConfiguration": {{ { "Name": "output-type-name", "Value": "TweetType" }, { "Name": "fs", "Value": "localfs" }, { "Name": "path", "Value": "asterix_nc1://data/twitter/obamatweets.adm" }, { "Name": "format", "Value": "adm" }, { "Name": "tuple-interval", "Value": "10" } }} }, "Timestamp": "Wed Jun 01 19:07:55 PDT 2016" }
+{ "DataverseName": "feeds", "FeedName": "TweetFeed", "FeedType": "PRIMARY", "PrimaryTypeDetails": { "AdapterName": "localfs", "AdapterConfiguration": {{ { "Name": "type-name", "Value": "TweetType" }, { "Name": "path", "Value": "asterix_nc1://data/twitter/obamatweets.adm" }, { "Name": "format", "Value": "adm" }, { "Name": "tuple-interval", "Value": "10" }, { "Name": "is-feed", "Value": "true" }, { "Name": "dataverse", "Value": "feeds" }, { "Name": "feed", "Value": "TweetFeed" }, { "Name": "reader", "Value": "localfs" }, { "Name": "parser", "Value": "adm" } }} }, "Timestamp": "Mon Jun 06 01:59:57 AST 2016" }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_03/feeds_03.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_03/feeds_03.1.adm
index 2581941..8827fca 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_03/feeds_03.1.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_03/feeds_03.1.adm
@@ -1 +1 @@
-{ "DataverseName": "feeds", "FeedName": "TweetFeed", "Function": "feed_processor", "FeedType": "PRIMARY", "PrimaryTypeDetails": { "AdapterName": "file_feed", "AdapterConfiguration": {{ { "Name": "output-type-name", "Value": "TweetType" }, { "Name": "fs", "Value": "localfs" }, { "Name": "path", "Value": "asterix_nc1://data/twitter/obamatweets.adm" }, { "Name": "format", "Value": "adm" }, { "Name": "tuple-interval", "Value": "10" } }} }, "Timestamp": "Wed Jun 01 19:07:55 PDT 2016" }
+{ "DataverseName": "feeds", "FeedName": "TweetFeed", "Function": "feed_processor", "FeedType": "PRIMARY", "PrimaryTypeDetails": { "AdapterName": "localfs", "AdapterConfiguration": {{ { "Name": "type-name", "Value": "TweetType" }, { "Name": "path", "Value": "asterix_nc1://data/twitter/obamatweets.adm" }, { "Name": "format", "Value": "adm" }, { "Name": "tuple-interval", "Value": "10" }, { "Name": "is-feed", "Value": "true" }, { "Name": "dataverse", "Value": "feeds" }, { "Name": "feed", "Value": "TweetFeed" }, { "Name": "reader", "Value": "localfs" }, { "Name": "parser", "Value": "adm" } }} }, "Timestamp": "Mon Jun 06 02:04:07 AST 2016" }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
index 13acfc9..be663e3 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -105,6 +105,7 @@
       <compilation-unit name="twitter-feed">
         <output-dir compare="Text">twitter-feed</output-dir>
         <expected-error>One or more parameters are missing from adapter configuration</expected-error>
+        <expected-error>Unknown source feed</expected-error>
       </compilation-unit>
     </test-case>
     <test-case FilePath="feeds">
@@ -6378,13 +6379,13 @@
     <test-case FilePath="load">
       <compilation-unit name="issue14_query">
         <output-dir compare="Text">issue14_query</output-dir>
-        <expected-error>The parameter format must be specified</expected-error>
+        <expected-error>Unspecified parameter: format</expected-error>
       </compilation-unit>
     </test-case>
     <test-case FilePath="load">
       <compilation-unit name="issue315_query">
         <output-dir compare="Text">none</output-dir>
-        <expected-error>The parameter format must be specified</expected-error>
+        <expected-error>Invalid path</expected-error>
       </compilation-unit>
     </test-case>
     <test-case FilePath="load">
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index dc4a17d..1bc4ffe 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -6400,13 +6400,13 @@
     <test-case FilePath="load">
       <compilation-unit name="issue14_query">
         <output-dir compare="Text">issue14_query</output-dir>
-        <expected-error>The parameter format must be specified</expected-error>
+        <expected-error>Unspecified parameter: format</expected-error>
       </compilation-unit>
     </test-case>
     <test-case FilePath="load">
       <compilation-unit name="issue315_query">
         <output-dir compare="Text">none</output-dir>
-        <expected-error>The parameter format must be specified</expected-error>
+        <expected-error>Invalid path</expected-error>
       </compilation-unit>
     </test-case>
     <test-case FilePath="load">
diff --git a/asterixdb/asterix-external-data/pom.xml b/asterixdb/asterix-external-data/pom.xml
index 7e82491..c08d6a4 100644
--- a/asterixdb/asterix-external-data/pom.xml
+++ b/asterixdb/asterix-external-data/pom.xml
@@ -293,7 +293,7 @@
     <dependency>
       <groupId>com.couchbase.client</groupId>
       <artifactId>core-io</artifactId>
-      <version>1.2.7</version>
+      <version>1.2.8</version>
     </dependency>
     <dependency>
       <groupId>io.reactivex</groupId>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
index d3abd50..22f046f 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
@@ -94,8 +94,9 @@
             }
             feedLogManager.touch();
         }
-        IDataFlowController controller = DataflowControllerProvider.getDataflowController(recordType, ctx, partition,
-                dataSourceFactory, dataParserFactory, configuration, indexingOp, isFeed, feedLogManager);
+        IDataFlowController controller =
+                DataflowControllerProvider.getDataflowController(recordType, ctx, partition,
+                        dataSourceFactory, dataParserFactory, configuration, indexingOp, isFeed, feedLogManager);
         if (isFeed) {
             return new FeedAdapter((AbstractFeedDataFlowController) controller);
         } else {
@@ -122,17 +123,17 @@
     }
 
     @Override
-    public void configure(Map<String, String> configuration, ARecordType outputType, ARecordType metaType)
+    public void configure(Map<String, String> configuration)
             throws AsterixException {
-        this.recordType = outputType;
-        this.metaType = metaType;
         this.configuration = configuration;
+        ExternalDataUtils.validateDataSourceParameters(configuration);
         dataSourceFactory = DatasourceFactoryProvider.getExternalDataSourceFactory(configuration);
-        dataParserFactory = ParserFactoryProvider.getDataParserFactory(configuration);
         if (dataSourceFactory.isIndexible() && (files != null)) {
             ((IIndexibleExternalDataSource) dataSourceFactory).setSnapshot(files, indexingOp);
         }
         dataSourceFactory.configure(configuration);
+        ExternalDataUtils.validateDataParserParameters(configuration);
+        dataParserFactory = ParserFactoryProvider.getDataParserFactory(configuration);
         dataParserFactory.setRecordType(recordType);
         dataParserFactory.setMetaType(metaType);
         dataParserFactory.configure(configuration);
@@ -159,7 +160,22 @@
     }
 
     @Override
-    public ARecordType getAdapterOutputType() {
+    public ARecordType getOutputType() {
         return recordType;
     }
+
+    @Override
+    public void setOutputType(ARecordType recordType) {
+        this.recordType = recordType;
+    }
+
+    @Override
+    public ARecordType getMetaType() {
+        return metaType;
+    }
+
+    @Override
+    public void setMetaType(ARecordType metaType) {
+        this.metaType = metaType;
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
index 59a7514..505c4b2 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
@@ -66,22 +66,16 @@
 
     /**
      * @param configuration
-     * @param outputType
-     * @param metaType
      * @throws Exception
      */
-    public void configure(Map<String, String> configuration, ARecordType outputType, ARecordType metaType)
+    public void configure(Map<String, String> configuration)
             throws AsterixException;
 
-    public default void configure(final Map<String, String> configuration, final ARecordType outputType)
-            throws AsterixException {
-        configure(configuration, outputType, null);
-    }
+    public void setOutputType(ARecordType outputType);
 
-    /**
-     * Gets the record type associated with the output of the adapter
-     *
-     * @return
-     */
-    public ARecordType getAdapterOutputType();
+    public void setMetaType(ARecordType metaType);
+
+    public ARecordType getOutputType();
+
+    public ARecordType getMetaType();
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordWithMetaDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordWithMetadataParser.java
similarity index 81%
rename from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordWithMetaDataParser.java
rename to asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordWithMetadataParser.java
index 4b97e8d..88120ed 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordWithMetaDataParser.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordWithMetadataParser.java
@@ -21,6 +21,10 @@
 import java.io.DataOutput;
 import java.io.IOException;
 
-public interface IRecordWithMetaDataParser<T> extends IRecordDataParser<T> {
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+
+public interface IRecordWithMetadataParser<T> extends IRecordDataParser<T> {
     public void parseMeta(DataOutput out) throws IOException;
+
+    void appendLastParsedPrimaryKeyToTuple(ArrayTupleBuilder tb) throws IOException;
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
index aac7be2..b47d278 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
@@ -22,23 +22,23 @@
 
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.parser.RecordWithMetadataParser;
+import org.apache.asterix.external.api.IRecordWithMetadataParser;
 import org.apache.asterix.external.util.FeedLogManager;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 
-public class ChangeFeedWithMetaDataFlowController<T, O> extends FeedWithMetaDataFlowController<T, O> {
+public class ChangeFeedWithMetaDataFlowController<T> extends FeedWithMetaDataFlowController<T> {
 
     public ChangeFeedWithMetaDataFlowController(final IHyracksTaskContext ctx, final FeedTupleForwarder tupleForwarder,
             final FeedLogManager feedLogManager, final int numOfOutputFields,
-            final RecordWithMetadataParser<T, O> dataParser, final IRecordReader<T> recordReader)
-            throws HyracksDataException {
+            final IRecordWithMetadataParser<T> dataParser, final IRecordReader<T> recordReader)
+                    throws HyracksDataException {
         super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader);
     }
 
     @Override
     protected void addPrimaryKeys(final ArrayTupleBuilder tb, final IRawRecord<? extends T> record) throws IOException {
-        dataParser.appendPK(tb);
+        ((IRecordWithMetadataParser<T>) dataParser).appendLastParsedPrimaryKeyToTuple(tb);
     }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
index e7c396b..44aac60 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
@@ -22,27 +22,23 @@
 
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.parser.RecordWithMetadataParser;
+import org.apache.asterix.external.api.IRecordWithMetadataParser;
 import org.apache.asterix.external.util.FeedLogManager;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 
-public class FeedWithMetaDataFlowController<T, O> extends FeedRecordDataFlowController<T> {
-
-    //This field mask a super class field dataParser. We do this to avoid down-casting when calling parseMeta
-    protected RecordWithMetadataParser<T, O> dataParser;
+public class FeedWithMetaDataFlowController<T> extends FeedRecordDataFlowController<T> {
 
     public FeedWithMetaDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder,
-            FeedLogManager feedLogManager, int numOfOutputFields, RecordWithMetadataParser<T, O> dataParser,
+            FeedLogManager feedLogManager, int numOfOutputFields, IRecordWithMetadataParser<T> dataParser,
             IRecordReader<T> recordReader) throws HyracksDataException {
         super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader);
-        this.dataParser = dataParser;
     }
 
     @Override
     protected void addMetaPart(ArrayTupleBuilder tb, IRawRecord<? extends T> record) throws IOException {
-        dataParser.parseMeta(tb.getDataOutput());
+        ((IRecordWithMetadataParser<T>) dataParser).parseMeta(tb.getDataOutput());
         tb.addFieldEndOffset();
     }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithMetadataAndPK.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithMetadataAndPK.java
index ca6725f..3ed6af8 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithMetadataAndPK.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithMetadataAndPK.java
@@ -154,7 +154,7 @@
     }
 
     @Override
-    public void appendPk(final ArrayTupleBuilder tb) throws IOException {
+    public void appendPrimaryKeyToTuple(final ArrayTupleBuilder tb) throws IOException {
         for (int i = 0; i < pkIndexes.length; i++) {
             if (keyIndicator[i] == 1) {
                 tb.addField(getMetadata(pkIndexes[i]));
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithPK.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithPK.java
index b99d4d5..02ab028 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithPK.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithPK.java
@@ -86,7 +86,7 @@
         record.set(t);
     }
 
-    public void appendPk(final ArrayTupleBuilder tb) throws IOException {
+    public void appendPrimaryKeyToTuple(final ArrayTupleBuilder tb) throws IOException {
         for (final ArrayBackedValueStorage pkStorage : pkFieldValueBuffers) {
             tb.addField(pkStorage);
         }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPConverterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPConverterFactory.java
index 1d9311e..dc93533 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPConverterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPConverterFactory.java
@@ -51,7 +51,7 @@
 
     @Override
     public IRecordConverter<DCPRequest, RecordWithMetadataAndPK<char[]>> createConverter() {
-        return new DCPRequestToRecordWithMetadataAndPKConverter();
+        return new DCPMessageToRecordConverter();
     }
 
     @Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPRequestToRecordWithMetadataAndPKConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPMessageToRecordConverter.java
similarity index 68%
rename from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPRequestToRecordWithMetadataAndPKConverter.java
rename to asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPMessageToRecordConverter.java
index 724699c..6ce5e98 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPRequestToRecordWithMetadataAndPKConverter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/converter/DCPMessageToRecordConverter.java
@@ -39,7 +39,7 @@
 import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
 import com.couchbase.client.deps.io.netty.util.ReferenceCountUtil;
 
-public class DCPRequestToRecordWithMetadataAndPKConverter
+public class DCPMessageToRecordConverter
         implements IRecordToRecordWithMetadataAndPKConverter<DCPRequest, char[]> {
 
     private final RecordWithMetadataAndPK<char[]> recordWithMetadata;
@@ -47,18 +47,17 @@
     private final CharsetDecoder decoder = StandardCharsets.UTF_8.newDecoder();
     private final ByteBuffer bytes = ByteBuffer.allocateDirect(ExternalDataConstants.DEFAULT_BUFFER_SIZE);
     private final CharBuffer chars = CharBuffer.allocate(ExternalDataConstants.DEFAULT_BUFFER_SIZE);
-    // metaTypes = {key(string), bucket(string), vbucket(int32), seq(long), cas(long),
-    // creationTime(long),expiration(int32),flags(int32),revSeqNumber(long),lockTime(int32)}
-    private static final IAType[] CB_META_TYPES = new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING,
-            BuiltinType.AINT32, BuiltinType.AINT64, BuiltinType.AINT64, BuiltinType.AINT64, BuiltinType.AINT32,
-            BuiltinType.AINT32, BuiltinType.AINT64, BuiltinType.AINT32 };
+    private static final IAType[] CB_META_TYPES = new IAType[] { /*ID*/BuiltinType.ASTRING,
+            /*VBID*/BuiltinType.AINT32, /*SEQ*/BuiltinType.AINT64, /*CAS*/BuiltinType.AINT64,
+            /*EXPIRATION*/BuiltinType.AINT32,
+            /*FLAGS*/BuiltinType.AINT32, /*REV*/BuiltinType.AINT64, /*LOCK*/BuiltinType.AINT32 };
     private static final int[] PK_INDICATOR = { 1 };
     private static final int[] PK_INDEXES = { 0 };
     private static final IAType[] PK_TYPES = { BuiltinType.ASTRING };
 
-    public DCPRequestToRecordWithMetadataAndPKConverter() {
+    public DCPMessageToRecordConverter() {
         this.value = new CharArrayRecord();
-        this.recordWithMetadata = new RecordWithMetadataAndPK<char[]>(value, CB_META_TYPES,
+        this.recordWithMetadata = new RecordWithMetadataAndPK<>(value, CB_META_TYPES,
                 ARecordType.FULLY_OPEN_RECORD_TYPE, PK_INDICATOR, PK_INDEXES, PK_TYPES);
     }
 
@@ -67,29 +66,29 @@
         final DCPRequest dcpRequest = input.get();
         if (dcpRequest instanceof MutationMessage) {
             final MutationMessage message = (MutationMessage) dcpRequest;
-            final String key = message.key();
-            final int vbucket = message.partition();
-            final long seq = message.bySequenceNumber();
-            final String bucket = message.bucket();
-            final long cas = message.cas();
-            final long creationTime = message.creationTime();
-            final int expiration = message.expiration();
-            final int flags = message.flags();
-            final long revSeqNumber = message.revisionSequenceNumber();
-            final int lockTime = message.lockTime();
-            recordWithMetadata.reset();
-            recordWithMetadata.setMetadata(0, key);
-            recordWithMetadata.setMetadata(1, bucket);
-            recordWithMetadata.setMetadata(2, vbucket);
-            recordWithMetadata.setMetadata(3, seq);
-            recordWithMetadata.setMetadata(4, cas);
-            recordWithMetadata.setMetadata(5, creationTime);
-            recordWithMetadata.setMetadata(6, expiration);
-            recordWithMetadata.setMetadata(7, flags);
-            recordWithMetadata.setMetadata(8, revSeqNumber);
-            recordWithMetadata.setMetadata(9, lockTime);
-            DCPRequestToRecordWithMetadataAndPKConverter.set(message.content(), decoder, bytes, chars, value);
-            ReferenceCountUtil.release(message.content());
+            try {
+                final String key = message.key();
+                final int vbucket = message.partition();
+                final long seq = message.bySequenceNumber();
+                final long cas = message.cas();
+                final int expiration = message.expiration();
+                final int flags = message.flags();
+                final long revSeqNumber = message.revisionSequenceNumber();
+                final int lockTime = message.lockTime();
+                int i = 0;
+                recordWithMetadata.reset();
+                recordWithMetadata.setMetadata(i++, key);
+                recordWithMetadata.setMetadata(i++, vbucket);
+                recordWithMetadata.setMetadata(i++, seq);
+                recordWithMetadata.setMetadata(i++, cas);
+                recordWithMetadata.setMetadata(i++, expiration);
+                recordWithMetadata.setMetadata(i++, flags);
+                recordWithMetadata.setMetadata(i++, revSeqNumber);
+                recordWithMetadata.setMetadata(i, lockTime);
+                DCPMessageToRecordConverter.set(message.content(), decoder, bytes, chars, value);
+            } finally {
+                ReferenceCountUtil.release(message.content());
+            }
         } else if (dcpRequest instanceof RemoveMessage) {
             final RemoveMessage message = (RemoveMessage) dcpRequest;
             final String key = message.key();
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
index b1fd7a0..58e9ea7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
@@ -110,7 +110,8 @@
                 adaptorLibraryName);
         if (classLoader != null) {
             adapterFactory = ((IAdapterFactory) (classLoader.loadClass(adaptorFactoryClassName).newInstance()));
-            adapterFactory.configure(adaptorConfiguration, adapterOutputType);
+            adapterFactory.setOutputType(adapterOutputType);
+            adapterFactory.configure(adaptorConfiguration);
         } else {
             String message = "Unable to create adapter as class loader not configured for library " + adaptorLibraryName
                     + " in dataverse " + feedId.getDataverse();
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java
index 5c46a42..be308d4 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java
@@ -26,7 +26,7 @@
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.api.IRecordConverter;
 import org.apache.asterix.external.api.IRecordDataParser;
-import org.apache.asterix.external.api.IRecordWithMetaDataParser;
+import org.apache.asterix.external.api.IRecordWithMetadataParser;
 import org.apache.asterix.external.input.record.RecordWithMetadataAndPK;
 import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
 import org.apache.asterix.om.base.AMutableString;
@@ -39,7 +39,7 @@
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 
-public class RecordWithMetadataParser<T, O> implements IRecordWithMetaDataParser<T> {
+public class RecordWithMetadataParser<T, O> implements IRecordWithMetadataParser<T> {
 
     private final IRecordConverter<T, RecordWithMetadataAndPK<O>> converter;
     private RecordWithMetadataAndPK<O> rwm;
@@ -106,7 +106,8 @@
         }
     }
 
-    public void appendPK(ArrayTupleBuilder tb) throws IOException {
-        rwm.appendPk(tb);
+    @Override
+    public void appendLastParsedPrimaryKeyToTuple(ArrayTupleBuilder tb) throws IOException {
+        rwm.appendPrimaryKeyToTuple(tb);
     }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
index e39e3bd..8f6c85f 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
@@ -28,7 +28,6 @@
 import org.apache.asterix.external.api.IIndexingAdapterFactory;
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.external.util.ExternalDataCompatibilityUtils;
-import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
 
@@ -37,13 +36,14 @@
  */
 public class AdapterFactoryProvider {
 
-    // Internal Adapters
+    // Adapters
     public static IAdapterFactory getAdapterFactory(String adapterName, Map<String, String> configuration,
             ARecordType itemType, ARecordType metaType) throws AsterixException {
         ExternalDataCompatibilityUtils.prepare(adapterName, configuration);
-        ExternalDataUtils.validateParameters(configuration);
         GenericAdapterFactory adapterFactory = new GenericAdapterFactory();
-        adapterFactory.configure(configuration, itemType, metaType);
+        adapterFactory.setOutputType(itemType);
+        adapterFactory.setMetaType(metaType);
+        adapterFactory.configure(configuration);
         return adapterFactory;
     }
 
@@ -52,10 +52,11 @@
             Map<String, String> configuration, ARecordType itemType, List<ExternalFile> snapshot, boolean indexingOp,
             ARecordType metaType) throws AsterixException {
         ExternalDataCompatibilityUtils.prepare(adapterName, configuration);
-        ExternalDataUtils.validateParameters(configuration);
         GenericAdapterFactory adapterFactory = new GenericAdapterFactory();
+        adapterFactory.setOutputType(itemType);
+        adapterFactory.setMetaType(metaType);
         adapterFactory.setSnapshot(snapshot, indexingOp);
-        adapterFactory.configure(configuration, itemType, metaType);
+        adapterFactory.configure(configuration);
         return adapterFactory;
     }
 
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
index 50ebb71..4ad08b3 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
@@ -32,6 +32,7 @@
 import org.apache.asterix.external.api.IRecordDataParserFactory;
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.external.api.IRecordWithMetadataParser;
 import org.apache.asterix.external.api.IRecordWithPKDataParser;
 import org.apache.asterix.external.api.IStreamDataParser;
 import org.apache.asterix.external.api.IStreamDataParserFactory;
@@ -44,7 +45,6 @@
 import org.apache.asterix.external.dataflow.IndexingDataFlowController;
 import org.apache.asterix.external.dataflow.RecordDataFlowController;
 import org.apache.asterix.external.dataflow.StreamDataFlowController;
-import org.apache.asterix.external.parser.RecordWithMetadataParser;
 import org.apache.asterix.external.util.DataflowUtils;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.util.FeedLogManager;
@@ -59,7 +59,7 @@
     public static IDataFlowController getDataflowController(ARecordType recordType, IHyracksTaskContext ctx,
             int partition, IExternalDataSourceFactory dataSourceFactory, IDataParserFactory dataParserFactory,
             Map<String, String> configuration, boolean indexingOp, boolean isFeed, FeedLogManager feedLogManager)
-            throws HyracksDataException {
+                    throws HyracksDataException {
         try {
             switch (dataSourceFactory.getDataSourceType()) {
                 case RECORDS:
@@ -80,10 +80,10 @@
                             if (isChangeFeed) {
                                 int numOfKeys = ExternalDataUtils.getNumberOfKeys(configuration);
                                 return new ChangeFeedWithMetaDataFlowController(ctx, tupleForwarder, feedLogManager,
-                                        numOfKeys + 2, (RecordWithMetadataParser) dataParser, recordReader);
+                                        numOfKeys + 2, (IRecordWithMetadataParser) dataParser, recordReader);
                             } else {
                                 return new FeedWithMetaDataFlowController(ctx, tupleForwarder, feedLogManager, 2,
-                                        (RecordWithMetadataParser) dataParser, recordReader);
+                                        (IRecordWithMetadataParser) dataParser, recordReader);
                             }
                         } else if (isChangeFeed) {
                             int numOfKeys = ExternalDataUtils.getNumberOfKeys(configuration);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
index 2e1a5a7..b37198a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
@@ -45,8 +45,11 @@
             return ExternalDataUtils.createExternalParserFactory(ExternalDataUtils.getDataverse(configuration),
                     parserFactoryName);
         } else {
-            parserFactory = ParserFactoryProvider
-                    .getDataParserFactory(ExternalDataUtils.getRecordFormat(configuration));
+            String parserFactoryKey = ExternalDataUtils.getRecordFormat(configuration);
+            if (parserFactoryKey == null) {
+                parserFactoryKey = configuration.get(ExternalDataConstants.KEY_PARSER_FACTORY);
+            }
+            parserFactory = ParserFactoryProvider.getDataParserFactory(parserFactoryKey);
         }
         return parserFactory;
     }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/StreamRecordReaderProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/StreamRecordReaderProvider.java
index ea8bc98..d11e97f 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/StreamRecordReaderProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/StreamRecordReaderProvider.java
@@ -54,7 +54,7 @@
             }
             throw new AsterixException("Unknown format: " + format);
         }
-        throw new AsterixException("Unspecified paramter: " + ExternalDataConstants.KEY_FORMAT);
+        throw new AsterixException("Unspecified parameter: " + ExternalDataConstants.KEY_FORMAT);
     }
 
     public static StreamRecordReader createRecordReader(Format format, AsterixInputStream inputStream,
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index c992723..55dee04 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -55,7 +55,7 @@
     public static final String KEY_HADOOP_BUFFER_SIZE = "io.file.buffer.size";
     public static final String KEY_SOURCE_DATATYPE = "type-name";
     public static final String KEY_DELIMITER = "delimiter";
-    public static final String KEY_PARSER_FACTORY = "tuple-parser";
+    public static final String KEY_PARSER_FACTORY = "parser-factory";
     public static final String KEY_DATA_PARSER = "parser";
     public static final String KEY_HEADER = "header";
     public static final String KEY_READER = "reader";
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index e33949e..69882d0 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -78,14 +78,26 @@
     }
 
     public static void validateParameters(Map<String, String> configuration) throws AsterixException {
+        validateDataSourceParameters(configuration);
+        validateDataParserParameters(configuration);
+    }
+
+    public static void validateDataParserParameters(Map<String, String> configuration) throws AsterixException {
+        String parser = configuration.get(ExternalDataConstants.KEY_FORMAT);
+        if (parser == null) {
+            String parserFactory = configuration.get(ExternalDataConstants.KEY_PARSER_FACTORY);
+            if (parserFactory == null) {
+                throw new AsterixException("The parameter " + ExternalDataConstants.KEY_FORMAT + " or "
+                        + ExternalDataConstants.KEY_PARSER_FACTORY + " must be specified.");
+            }
+        }
+    }
+
+    public static void validateDataSourceParameters(Map<String, String> configuration) throws AsterixException {
         String reader = configuration.get(ExternalDataConstants.KEY_READER);
         if (reader == null) {
             throw new AsterixException("The parameter " + ExternalDataConstants.KEY_READER + " must be specified.");
         }
-        String parser = configuration.get(ExternalDataConstants.KEY_FORMAT);
-        if (parser == null) {
-            throw new AsterixException("The parameter " + ExternalDataConstants.KEY_FORMAT + " must be specified.");
-        }
     }
 
     public static DataSourceType getDataSourceType(Map<String, String> configuration) {
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/generator/test/DCPGeneratorTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/generator/test/DCPGeneratorTest.java
index b08fc7d..5e4e5eb 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/generator/test/DCPGeneratorTest.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/generator/test/DCPGeneratorTest.java
@@ -20,7 +20,7 @@
 
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.input.record.RecordWithMetadataAndPK;
-import org.apache.asterix.external.input.record.converter.DCPRequestToRecordWithMetadataAndPKConverter;
+import org.apache.asterix.external.input.record.converter.DCPMessageToRecordConverter;
 import org.apache.asterix.external.input.record.reader.kv.KVTestReader;
 import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
 import org.junit.Test;
@@ -34,7 +34,7 @@
         try (KVTestReader cbreader = new KVTestReader(0, "TestBucket",
                 new int[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15 }, 150, 0, 0, 0)) {
             final UTF8StringPointable pointable = new UTF8StringPointable();
-            final DCPRequestToRecordWithMetadataAndPKConverter converter = new DCPRequestToRecordWithMetadataAndPKConverter();
+            final DCPMessageToRecordConverter converter = new DCPMessageToRecordConverter();
             while (cbreader.hasNext()) {
                 final IRawRecord<DCPRequest> dcp = cbreader.next();
                 final RecordWithMetadataAndPK<char[]> record = converter.convert(dcp);
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/ClassAdParser.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/ClassAdParser.java
index a5e422c..adffe1e 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/ClassAdParser.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/ClassAdParser.java
@@ -791,6 +791,7 @@
     }
 
     public boolean parseNext(ClassAd classad) throws IOException {
+        resetPools();
         return parseClassAd(currentSource, classad, false);
     }
 
@@ -1350,7 +1351,7 @@
                     if (!parseArgumentList(argList)) {
                         tree.setInnerTree(null);
                         return false;
-                    } ;
+                    };
                     // special case function-calls should be converted
                     // into a literal expression if the argument is a
                     // string literal
@@ -1398,7 +1399,7 @@
                 tree.setInnerTree(Operation.createOperation(Operation.OpKind_PARENTHESES_OP, treeL, objectPool));
                 return (tree.size() != 0);
             }
-            // constants
+                // constants
             case LEX_OPEN_BOX: {
                 isExpr = true;
                 ClassAd newAd = objectPool.classAdPool.get();
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
index d1a4532..e029c09 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
@@ -77,9 +77,11 @@
                 ADMDataParser parser;
                 ITupleForwarder forwarder;
                 ArrayTupleBuilder tb;
-                IAsterixPropertiesProvider propertiesProvider = (IAsterixPropertiesProvider) ((NodeControllerService) ctx
-                        .getJobletContext().getApplicationContext().getControllerService()).getApplicationContext()
-                                .getApplicationObject();
+                IAsterixPropertiesProvider propertiesProvider =
+                        (IAsterixPropertiesProvider) ((NodeControllerService) ctx
+                                .getJobletContext().getApplicationContext().getControllerService())
+                                        .getApplicationContext()
+                                        .getApplicationObject();
                 ClusterPartition nodePartition = propertiesProvider.getMetadataProperties().getNodePartitions()
                         .get(nodeId)[0];
                 try {
@@ -125,13 +127,27 @@
     }
 
     @Override
-    public ARecordType getAdapterOutputType() {
+    public void configure(Map<String, String> configuration) {
+        this.configuration = configuration;
+    }
+
+    @Override
+    public void setOutputType(ARecordType outputType) {
+        this.outputType = outputType;
+    }
+
+    @Override
+    public void setMetaType(ARecordType metaType) {
+    }
+
+    @Override
+    public ARecordType getOutputType() {
         return outputType;
     }
 
     @Override
-    public void configure(Map<String, String> configuration, ARecordType outputType, ARecordType metaType) {
-        this.configuration = configuration;
-        this.outputType = outputType;
+    public ARecordType getMetaType() {
+        return null;
     }
+
 }
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/TestRecordWithPKParser.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/TestRecordWithPKParser.java
index 3d86abd..fd617bd 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/TestRecordWithPKParser.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/TestRecordWithPKParser.java
@@ -43,7 +43,7 @@
     @Override
     public void appendKeys(final ArrayTupleBuilder tb, final IRawRecord<? extends RecordWithPK<T>> record)
             throws IOException {
-        record.get().appendPk(tb);
+        record.get().appendPrimaryKeyToTuple(tb);
     }
 
 }
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java
index faff9df..c0c8eb6 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java
@@ -121,7 +121,7 @@
                 tb.addFieldEndOffset();
                 parser.parseMeta(tb.getDataOutput());
                 tb.addFieldEndOffset();
-                parser.appendPK(tb);
+                parser.appendLastParsedPrimaryKeyToTuple(tb);
                 //print tuple
                 printTuple(tb, printers, printStream);
 
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
index 08a98a4..1a51b74 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
@@ -38,6 +38,7 @@
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.external.api.IDataSourceAdapter;
+import org.apache.asterix.external.api.IDataSourceAdapter.AdapterType;
 import org.apache.asterix.external.feed.api.IFeed;
 import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
@@ -453,6 +454,71 @@
         return preProcessingRequired;
     }
 
+    public static void validateFeed(Feed feed, MetadataTransactionContext mdTxnCtx)
+            throws AsterixException {
+        try {
+            String adapterName = feed.getAdapterName();
+            Map<String, String> configuration = feed.getAdapterConfiguration();
+            ARecordType adapterOutputType = getOutputType(feed, configuration, ExternalDataConstants.KEY_TYPE_NAME);
+            ARecordType metaType = getOutputType(feed, configuration, ExternalDataConstants.KEY_META_TYPE_NAME);
+            ExternalDataUtils.prepareFeed(configuration, feed.getDataverseName(), feed.getFeedName());
+            ExternalDataUtils.prepareFeed(configuration, feed.getDataverseName(), feed.getFeedName());
+            // Get adapter from metadata dataset <Metadata dataverse>
+            DatasourceAdapter adapterEntity =
+                    MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME,
+                            adapterName);
+            // Get adapter from metadata dataset <The feed dataverse>
+            if (adapterEntity == null) {
+                adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, feed.getDataverseName(), adapterName);
+            }
+            AdapterType adapterType;
+            IAdapterFactory adapterFactory;
+            if (adapterEntity != null) {
+                adapterType = adapterEntity.getType();
+                String adapterFactoryClassname = adapterEntity.getClassname();
+                switch (adapterType) {
+                    case INTERNAL:
+                        adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
+                        break;
+                    case EXTERNAL:
+                        String[] anameComponents = adapterName.split("#");
+                        String libraryName = anameComponents[0];
+                        ClassLoader cl =
+                                ExternalLibraryManager.getLibraryClassLoader(feed.getDataverseName(), libraryName);
+                        adapterFactory = (IAdapterFactory) cl.loadClass(adapterFactoryClassname).newInstance();
+                        break;
+                    default:
+                        throw new AsterixException("Unknown Adapter type " + adapterType);
+                }
+                adapterFactory.setOutputType(adapterOutputType);
+                adapterFactory.setMetaType(metaType);
+                adapterFactory.configure(configuration);
+            } else {
+                AdapterFactoryProvider.getAdapterFactory(adapterName, configuration, adapterOutputType,
+                        metaType);
+            }
+            if (metaType == null && configuration.containsKey(ExternalDataConstants.KEY_META_TYPE_NAME)) {
+                metaType = getOutputType(feed, configuration, ExternalDataConstants.KEY_META_TYPE_NAME);
+                if (metaType == null) {
+                    throw new AsterixException("Unknown specified feed meta output data type "
+                            + configuration.get(ExternalDataConstants.KEY_META_TYPE_NAME));
+                }
+            }
+            if (adapterOutputType == null) {
+                if (!configuration.containsKey(ExternalDataConstants.KEY_TYPE_NAME)) {
+                    throw new AsterixException("Unspecified feed output data type");
+                }
+                adapterOutputType = getOutputType(feed, configuration, ExternalDataConstants.KEY_TYPE_NAME);
+                if (adapterOutputType == null) {
+                    throw new AsterixException("Unknown specified feed output data type "
+                            + configuration.get(ExternalDataConstants.KEY_TYPE_NAME));
+                }
+            }
+        } catch (Exception e) {
+            throw new AsterixException("Invalid feed parameters", e);
+        }
+    }
+
     @SuppressWarnings("rawtypes")
     public static Triple<IAdapterFactory, RecordDescriptor, IDataSourceAdapter.AdapterType>
             getPrimaryFeedFactoryAndOutput(Feed feed, FeedPolicyAccessor policyAccessor,
@@ -494,13 +560,26 @@
                                 ExternalLibraryManager.getLibraryClassLoader(feed.getDataverseName(), libraryName);
                         adapterFactory = (IAdapterFactory) cl.loadClass(adapterFactoryClassname).newInstance();
                         break;
+                    default:
+                        throw new AsterixException("Unknown Adapter type " + adapterType);
                 }
-                adapterFactory.configure(configuration, adapterOutputType, metaType);
+                adapterFactory.setOutputType(adapterOutputType);
+                adapterFactory.setMetaType(metaType);
+                adapterFactory.configure(configuration);
             } else {
                 adapterFactory = AdapterFactoryProvider.getAdapterFactory(adapterName, configuration, adapterOutputType,
                         metaType);
                 adapterType = IDataSourceAdapter.AdapterType.INTERNAL;
             }
+            if (metaType == null) {
+                metaType = getOutputType(feed, configuration, ExternalDataConstants.KEY_META_TYPE_NAME);
+            }
+            if (adapterOutputType == null) {
+                if (!configuration.containsKey(ExternalDataConstants.KEY_TYPE_NAME)) {
+                    throw new AsterixException("Unspecified feed output data type");
+                }
+                adapterOutputType = getOutputType(feed, configuration, ExternalDataConstants.KEY_TYPE_NAME);
+            }
             int numOfOutputs = 1;
             if (metaType != null) {
                 numOfOutputs++;
@@ -516,27 +595,7 @@
                 serdes[i++] = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(metaType);
             }
             if (ExternalDataUtils.isChangeFeed(configuration)) {
-                int[] pkIndexes = ExternalDataUtils.getPKIndexes(configuration);
-                if (metaType != null) {
-                    int[] pkIndicators = ExternalDataUtils.getPKSourceIndicators(configuration);
-                    for (int j = 0; j < pkIndexes.length; j++) {
-                        int aInt = pkIndexes[j];
-                        if (pkIndicators[j] == 0) {
-                            serdes[i++] = AqlSerializerDeserializerProvider.INSTANCE
-                                    .getSerializerDeserializer(adapterOutputType.getFieldTypes()[aInt]);
-                        } else if (pkIndicators[j] == 1) {
-                            serdes[i++] = AqlSerializerDeserializerProvider.INSTANCE
-                                    .getSerializerDeserializer(metaType.getFieldTypes()[aInt]);
-                        } else {
-                            throw new AlgebricksException("a key source indicator can only be 0 or 1");
-                        }
-                    }
-                } else {
-                    for (int aInt : pkIndexes) {
-                        serdes[i++] = AqlSerializerDeserializerProvider.INSTANCE
-                                .getSerializerDeserializer(adapterOutputType.getFieldTypes()[aInt]);
-                    }
-                }
+                getSerdesForPKs(serdes, configuration, metaType, adapterOutputType, i);
             }
             feedProps = new Triple<IAdapterFactory, RecordDescriptor, IDataSourceAdapter.AdapterType>(adapterFactory,
                     new RecordDescriptor(serdes), adapterType);
@@ -546,6 +605,32 @@
         return feedProps;
     }
 
+    @SuppressWarnings("rawtypes")
+    private static void getSerdesForPKs(ISerializerDeserializer[] serdes, Map<String, String> configuration,
+            ARecordType metaType, ARecordType adapterOutputType, int index) throws AlgebricksException {
+        int[] pkIndexes = ExternalDataUtils.getPKIndexes(configuration);
+        if (metaType != null) {
+            int[] pkIndicators = ExternalDataUtils.getPKSourceIndicators(configuration);
+            for (int j = 0; j < pkIndexes.length; j++) {
+                int aInt = pkIndexes[j];
+                if (pkIndicators[j] == 0) {
+                    serdes[index++] = AqlSerializerDeserializerProvider.INSTANCE
+                            .getSerializerDeserializer(adapterOutputType.getFieldTypes()[aInt]);
+                } else if (pkIndicators[j] == 1) {
+                    serdes[index++] = AqlSerializerDeserializerProvider.INSTANCE
+                            .getSerializerDeserializer(metaType.getFieldTypes()[aInt]);
+                } else {
+                    throw new AlgebricksException("a key source indicator can only be 0 or 1");
+                }
+            }
+        } else {
+            for (int aInt : pkIndexes) {
+                serdes[index++] = AqlSerializerDeserializerProvider.INSTANCE
+                        .getSerializerDeserializer(adapterOutputType.getFieldTypes()[aInt]);
+            }
+        }
+    }
+
     public static ARecordType getOutputType(IFeed feed, Map<String, String> configuration, String key)
             throws RemoteException, ACIDException, MetadataException {
         ARecordType outputType = null;
@@ -596,7 +681,7 @@
 
     public static String getSecondaryFeedOutput(Feed feed, FeedPolicyAccessor policyAccessor,
             MetadataTransactionContext mdTxnCtx)
-            throws AlgebricksException, MetadataException, RemoteException, ACIDException {
+                    throws AlgebricksException, MetadataException, RemoteException, ACIDException {
         String outputType = null;
         String primaryFeedName = feed.getSourceFeedName();
         Feed primaryFeed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, feed.getDataverseName(), primaryFeedName);