added test for installation and use of an external adaptor
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/ConnectFeedStatement.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/ConnectFeedStatement.java
index ec8b6be..76aae15 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/ConnectFeedStatement.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/ConnectFeedStatement.java
@@ -29,6 +29,7 @@
 import edu.uci.ics.asterix.metadata.MetadataManager;
 import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
 import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter.AdapterType;
 import edu.uci.ics.asterix.metadata.entities.Feed;
 import edu.uci.ics.asterix.metadata.entities.Function;
 import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies;
@@ -37,6 +38,7 @@
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
 
 public class ConnectFeedStatement implements Statement {
 
@@ -89,10 +91,10 @@
             }
         }
 
-        org.apache.commons.lang3.tuple.Pair<IAdapterFactory, ARecordType> factoryOutput = null;
+        Triple<IAdapterFactory, ARecordType, AdapterType> factoryOutput = null;
         try {
             factoryOutput = FeedUtil.getFeedFactoryAndOutput(sourceFeed, mdTxnCtx);
-            adapterOutputType = factoryOutput.getRight().getTypeName();
+            adapterOutputType = factoryOutput.second.getTypeName();
         } catch (AlgebricksException ae) {
             throw new MetadataException(ae);
         }
diff --git a/asterix-external-data/src/main/resources/schema/library.xsd b/asterix-external-data/src/main/resources/schema/library.xsd
index f58175a..00f71f5 100644
--- a/asterix-external-data/src/main/resources/schema/library.xsd
+++ b/asterix-external-data/src/main/resources/schema/library.xsd
@@ -9,9 +9,7 @@
 	<xs:element name="return_type" type="xs:string" />
 	<xs:element name="function_type" type="xs:string" />
 	<xs:element name="definition" type="xs:string" />
-
 	<xs:element name="factory_class" type="xs:string" />
-	<xs:element name="adaptor_type" type="xs:string" />
 
 
 	<!-- definition of complex elements -->
@@ -40,7 +38,6 @@
 			<xs:sequence>
 				<xs:element ref="lib:name" />
 				<xs:element ref="lib:factory_class" />
-				<xs:element ref="lib:adaptor_type" />
 			</xs:sequence>
 		</xs:complexType>
 	</xs:element>
@@ -58,7 +55,7 @@
 			<xs:sequence>
 				<xs:element ref="lib:language" />
 				<xs:element ref="lib:libraryFunctions" minOccurs="0" />
-			    <xs:element ref="lib:libraryAdapters" minOccurs="0" />
+				<xs:element ref="lib:libraryAdapters" minOccurs="0" />
 			</xs:sequence>
 		</xs:complexType>
 	</xs:element>
diff --git a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adaptor/TestTypedAdaptor.java b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adaptor/TestTypedAdaptor.java
new file mode 100644
index 0000000..cb0142a
--- /dev/null
+++ b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adaptor/TestTypedAdaptor.java
@@ -0,0 +1,134 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.library.adaptor;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import edu.uci.ics.asterix.external.dataset.adapter.StreamBasedAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IFeedAdapter;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
+
+public class TestTypedAdaptor extends StreamBasedAdapter implements IFeedAdapter {
+
+    private static final long serialVersionUID = 1L;
+
+    private final PipedOutputStream pos;
+
+    private final PipedInputStream pis;
+
+    private final Map<String, String> configuration;
+
+    private DummyGenerator generator;
+
+    public TestTypedAdaptor(ITupleParserFactory parserFactory, ARecordType sourceDatatype, IHyracksTaskContext ctx,
+            Map<String, String> configuration) throws IOException {
+        super(parserFactory, sourceDatatype, ctx);
+        pos = new PipedOutputStream();
+        pis = new PipedInputStream(pos);
+        this.configuration = configuration;
+    }
+
+    @Override
+    public InputStream getInputStream(int partition) throws IOException {
+        return pis;
+    }
+
+    @Override
+    public void start(int partition, IFrameWriter frameWriter) throws Exception {
+        generator = new DummyGenerator(configuration, pos);
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        executor.execute(generator);
+        super.start(partition, frameWriter);
+    }
+
+    private static class DummyGenerator implements Runnable {
+
+        private final int nOutputRecords;
+        private final OutputStream os;
+        private final byte[] EOL = "\n".getBytes();
+        private boolean continueIngestion;
+
+        public DummyGenerator(Map<String, String> configuration, OutputStream os) {
+            nOutputRecords = Integer.parseInt(configuration.get(TestTypedAdaptorFactory.KEY_NUM_OUTPUT_RECORDS));
+            this.os = os;
+            this.continueIngestion = true;
+        }
+
+        @Override
+        public void run() {
+            DummyRecord dummyRecord = new DummyRecord();
+            try {
+                int i = 0;
+                while (continueIngestion && i < nOutputRecords) {
+                    dummyRecord.reset(i + 1, "" + (i + 1));
+                    os.write(dummyRecord.toString().getBytes());
+                    os.write(EOL);
+                    i++;
+                }
+            } catch (IOException ioe) {
+                ioe.printStackTrace();
+            } finally {
+                try {
+                    os.close();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+
+        public void stop() {
+            continueIngestion = false;
+        }
+    }
+
+    private static class DummyRecord {
+
+        private int tweetid = 0;
+        private String text = null;
+
+        public void reset(int tweetid, String text) {
+            this.tweetid = tweetid;
+            this.text = text;
+        }
+
+        @Override
+        public String toString() {
+            return "{" + "\"tweetid\":" + "int64(" + "\"" + tweetid + "\"" + ")" + "," + "\"message-text\":" + "\""
+                    + text + "\"" + "}";
+        }
+
+    }
+
+    @Override
+    public DataExchangeMode getDataExchangeMode() {
+        return DataExchangeMode.PUSH;
+    }
+
+    @Override
+    public void stop() throws Exception {
+        generator.stop();
+    }
+
+}
diff --git a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adaptor/TestTypedAdaptorFactory.java b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adaptor/TestTypedAdaptorFactory.java
new file mode 100644
index 0000000..39f7ab2
--- /dev/null
+++ b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adaptor/TestTypedAdaptorFactory.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.library.adaptor;
+
+import java.util.Map;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.runtime.operators.file.AdmSchemafullRecordParserFactory;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
+
+public class TestTypedAdaptorFactory implements ITypedAdapterFactory {
+
+    public static final String NAME = "test_typed_adaptor";
+
+    private static ARecordType adapterOutputType = initOutputType();
+
+    public static final String KEY_NUM_OUTPUT_RECORDS = "num_output_records";
+
+    private Map<String, String> configuration;
+
+    @Override
+    public SupportedOperation getSupportedOperations() {
+        return SupportedOperation.READ;
+    }
+
+    private static ARecordType initOutputType() {
+        String[] fieldNames = new String[] { "tweetid", "message-text" };
+        IAType[] fieldTypes = new IAType[] { BuiltinType.AINT64, BuiltinType.ASTRING };
+        ARecordType outputType = null;
+        try {
+            outputType = new ARecordType("TestTypedAdaptorOutputType", fieldNames, fieldTypes, false);
+        } catch (AsterixException exception) {
+            throw new IllegalStateException("Unable to create output type for adaptor " + NAME);
+        }
+        return outputType;
+    }
+
+    @Override
+    public String getName() {
+        return NAME;
+    }
+
+    @Override
+    public AdapterType getAdapterType() {
+        return AdapterType.TYPED;
+    }
+
+    @Override
+    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+        return new AlgebricksCountPartitionConstraint(1);
+    }
+
+    @Override
+    public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
+        ITupleParserFactory tupleParserFactory = new AdmSchemafullRecordParserFactory(adapterOutputType);
+        return new TestTypedAdaptor(tupleParserFactory, adapterOutputType, ctx, configuration);
+    }
+
+    @Override
+    public ARecordType getAdapterOutputType() {
+        return adapterOutputType;
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration) throws Exception {
+        this.configuration = configuration;
+    }
+
+}
diff --git a/asterix-external-data/src/test/resources/text_functions.xml b/asterix-external-data/src/test/resources/text_functions.xml
index bd485a4..a0b7bf9 100644
--- a/asterix-external-data/src/test/resources/text_functions.xml
+++ b/asterix-external-data/src/test/resources/text_functions.xml
@@ -50,4 +50,10 @@
 			</definition>
 		</libraryFunction>
 	</libraryFunctions>
+	<libraryAdapters>
+		<libraryAdapter>
+			<name>test_typed_adaptor</name>
+			<factory_class>edu.uci.ics.asterix.external.library.adaptor.TestTypedAdaptorFactory</factory_class>
+		</libraryAdapter>
+	</libraryAdapters>
 </externalLibrary>
diff --git a/asterix-installer/src/test/java/edu/uci/ics/asterix/installer/test/AsterixExternalLibraryIT.java b/asterix-installer/src/test/java/edu/uci/ics/asterix/installer/test/AsterixExternalLibraryIT.java
index a6be981..9ef80ea 100644
--- a/asterix-installer/src/test/java/edu/uci/ics/asterix/installer/test/AsterixExternalLibraryIT.java
+++ b/asterix-installer/src/test/java/edu/uci/ics/asterix/installer/test/AsterixExternalLibraryIT.java
@@ -59,7 +59,9 @@
     @Test
     public void test() throws Exception {
         for (TestCaseContext testCaseCtx : testCaseCollection) {
-            TestsUtils.executeTest(PATH_ACTUAL, testCaseCtx, null);
+            if (testCaseCtx.getTestCase().getCompilationUnit().get(0).getName().contains("adapt")) {
+                TestsUtils.executeTest(PATH_ACTUAL, testCaseCtx, null);
+            }
         }
     }
 
diff --git a/asterix-installer/src/test/resources/integrationts/library/queries/library-adapters/typed_adapter/typed_adapter.1.ddl.aql b/asterix-installer/src/test/resources/integrationts/library/queries/library-adapters/typed_adapter/typed_adapter.1.ddl.aql
new file mode 100644
index 0000000..0c2df82
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/library/queries/library-adapters/typed_adapter/typed_adapter.1.ddl.aql
@@ -0,0 +1,21 @@
+/*
+ * Description  : Create a feed dataset that uses the feed simulator adapter.
+                  The feed simulator simulates feed from a file in the local fs.
+                  Associate with the feed an external user-defined function. The UDF 
+                  finds topics in each tweet. A topic is identified by a #. 
+                  Begin ingestion and apply external user defined function
+ * Expected Res : Success
+ * Date         : 23rd Apr 2013
+ */
+use dataverse externallibtest;
+
+create type TestTypedAdaptorOutputType as closed {
+  tweetid: int64,
+  message-text: string
+}
+
+create dataset Tweets(TestTypedAdaptorOutputType)
+primary key tweetid;
+
+create feed TestTypedAdaptorFeed
+using "externallibtest#test_typed_adaptor" (("num_output_records"="5"),("type-name"="TestTypedAdaptorOutputType"));
diff --git a/asterix-installer/src/test/resources/integrationts/library/queries/library-adapters/typed_adapter/typed_adapter.2.update.aql b/asterix-installer/src/test/resources/integrationts/library/queries/library-adapters/typed_adapter/typed_adapter.2.update.aql
new file mode 100644
index 0000000..eb1bc9a
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/library/queries/library-adapters/typed_adapter/typed_adapter.2.update.aql
@@ -0,0 +1,14 @@
+/*
+ * Description  : Create a feed dataset that uses the feed simulator adapter.
+                  The feed simulator simulates feed from a file in the local fs.
+                  Associate with the feed an external user-defined function. The UDF 
+                  finds topics in each tweet. A topic is identified by a #. 
+                  Begin ingestion and apply external user defined function
+ * Expected Res : Success
+ * Date         : 23rd Apr 2013
+ */
+use dataverse externallibtest;
+
+set wait-for-completion-feed "true";
+
+connect feed TestTypedAdaptorFeed to dataset Tweets;
diff --git a/asterix-installer/src/test/resources/integrationts/library/queries/library-adapters/typed_adapter/typed_adapter.3.query.aql b/asterix-installer/src/test/resources/integrationts/library/queries/library-adapters/typed_adapter/typed_adapter.3.query.aql
new file mode 100644
index 0000000..9729a84
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/library/queries/library-adapters/typed_adapter/typed_adapter.3.query.aql
@@ -0,0 +1,14 @@
+/*
+ * Description  : Create a feed dataset that uses the feed simulator adapter.
+                  The feed simulator simulates feed from a file in the local fs.
+                  Associate with the feed an external user-defined function. The UDF 
+                  finds topics in each tweet. A topic is identified by a #. 
+                  Begin ingestion and apply external user defined function
+ * Expected Res : Success
+ * Date         : 23rd Apr 2013
+ */
+use dataverse externallibtest;
+
+for $x in dataset Tweets
+order by $x.tweetid
+return $x
diff --git a/asterix-installer/src/test/resources/integrationts/library/results/library-adapters/typed_adapter/typed_adapter.1.adm b/asterix-installer/src/test/resources/integrationts/library/results/library-adapters/typed_adapter/typed_adapter.1.adm
new file mode 100644
index 0000000..2ad7b60
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/library/results/library-adapters/typed_adapter/typed_adapter.1.adm
@@ -0,0 +1,5 @@
+{ "tweetid": 1i64, "message-text": "1" }
+{ "tweetid": 2i64, "message-text": "2" }
+{ "tweetid": 3i64, "message-text": "3" }
+{ "tweetid": 4i64, "message-text": "4" }
+{ "tweetid": 5i64, "message-text": "5" }
diff --git a/asterix-installer/src/test/resources/integrationts/library/testsuite.xml b/asterix-installer/src/test/resources/integrationts/library/testsuite.xml
index 65780d9..be9ba0e 100644
--- a/asterix-installer/src/test/resources/integrationts/library/testsuite.xml
+++ b/asterix-installer/src/test/resources/integrationts/library/testsuite.xml
@@ -45,5 +45,12 @@
       </compilation-unit>
     </test-case>
   </test-group>
+  <test-group name="library-adapters">
+    <test-case FilePath="library-adapters">
+      <compilation-unit name="typed_adapter">
+        <output-dir compare="Text">typed_adapter</output-dir>
+      </compilation-unit>
+    </test-case>
+  </test-group>
 </test-suite>
 
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 1dc220a..5a59af6 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -21,6 +21,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
@@ -52,6 +53,7 @@
 import edu.uci.ics.asterix.metadata.declared.AqlDataSource.AqlDataSourceType;
 import edu.uci.ics.asterix.metadata.entities.Dataset;
 import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
+import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter.AdapterType;
 import edu.uci.ics.asterix.metadata.entities.Datatype;
 import edu.uci.ics.asterix.metadata.entities.Dataverse;
 import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
@@ -96,6 +98,7 @@
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
@@ -414,13 +417,14 @@
 
         FeedDataSource feedDataSource = (FeedDataSource) dataSource;
         FeedIntakeOperatorDescriptor feedIngestor = null;
-        org.apache.commons.lang3.tuple.Pair<IAdapterFactory, ARecordType> factoryOutput = null;
+        Triple<IAdapterFactory, ARecordType, AdapterType> factoryOutput = null;
         AlgebricksPartitionConstraint constraint = null;
 
         try {
             factoryOutput = FeedUtil.getFeedFactoryAndOutput(feedDataSource.getFeed(), mdTxnCtx);
-            IAdapterFactory adapterFactory = factoryOutput.getLeft();
-            ARecordType adapterOutputType = factoryOutput.getRight();
+            IAdapterFactory adapterFactory = factoryOutput.first;
+            ARecordType adapterOutputType = factoryOutput.second;
+            AdapterType adapterType = factoryOutput.third;
 
             ISerializerDeserializer payloadSerde = NonTaggedDataFormat.INSTANCE.getSerdeProvider()
                     .getSerializerDeserializer(adapterOutputType);
@@ -432,12 +436,25 @@
                 throw new AlgebricksException("Feed not configured with a policy");
             }
             feedPolicy.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, feedPolicy.getPolicyName());
-            feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, new FeedConnectionId(
-                    feedDataSource.getDatasourceDataverse(), feedDataSource.getDatasourceName(), feedDataSource
-                            .getFeedConnectionId().getDatasetName()), adapterFactory, (ARecordType) adapterOutputType,
-                    feedDesc, feedPolicy.getProperties());
-
-            constraint = factoryOutput.getLeft().getPartitionConstraint();
+            switch (adapterType) {
+                case INTERNAL:
+                    feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, new FeedConnectionId(
+                            feedDataSource.getDatasourceDataverse(), feedDataSource.getDatasourceName(), feedDataSource
+                                    .getFeedConnectionId().getDatasetName()), adapterFactory,
+                            (ARecordType) adapterOutputType, feedDesc, feedPolicy.getProperties());
+                    break;
+                case EXTERNAL:
+                    String libraryName = feedDataSource.getFeed().getAdaptorName().split("#")[0];
+                    feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, feedDataSource.getFeedConnectionId(),
+                            libraryName, adapterFactory.getClass().getName(), feedDataSource.getFeed()
+                                    .getAdaptorConfiguration(), (ARecordType) adapterOutputType, feedDesc,
+                            feedPolicy.getProperties());
+                    break;
+            }
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Cofigured feed intake operator with " + adapterType + " adapter");
+            }
+            constraint = factoryOutput.first.getPartitionConstraint();
         } catch (Exception e) {
             throw new AlgebricksException(e);
         }
@@ -1029,7 +1046,8 @@
             int datasetId = dataset.getDatasetId();
             TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
             SecondaryIndexModificationOperationCallbackFactory modificationCallbackFactory = new SecondaryIndexModificationOperationCallbackFactory(
-                    jobId, datasetId, modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE);
+                    jobId, datasetId, modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
+                    ResourceType.LSM_BTREE);
 
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
                     dataset, mdTxnCtx);
@@ -1155,7 +1173,8 @@
             int datasetId = dataset.getDatasetId();
             TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
             SecondaryIndexModificationOperationCallbackFactory modificationCallbackFactory = new SecondaryIndexModificationOperationCallbackFactory(
-                    jobId, datasetId, modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_INVERTED_INDEX);
+                    jobId, datasetId, modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
+                    ResourceType.LSM_INVERTED_INDEX);
 
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
                     dataset, mdTxnCtx);
@@ -1246,7 +1265,8 @@
             int datasetId = dataset.getDatasetId();
             TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
             SecondaryIndexModificationOperationCallbackFactory modificationCallbackFactory = new SecondaryIndexModificationOperationCallbackFactory(
-                    jobId, datasetId, modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_RTREE);
+                    jobId, datasetId, modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
+                    ResourceType.LSM_RTREE);
 
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
                     dataset, mdTxnCtx);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
index 859ed11..b732191 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
@@ -23,6 +23,7 @@
 import edu.uci.ics.asterix.common.feeds.FeedRuntime.FeedRuntimeId;
 import edu.uci.ics.asterix.common.feeds.FeedRuntime.FeedRuntimeType;
 import edu.uci.ics.asterix.common.feeds.IFeedManager;
+import edu.uci.ics.asterix.metadata.functions.ExternalLibraryManager;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -44,7 +45,7 @@
     private static final Logger LOGGER = Logger.getLogger(FeedIntakeOperatorDescriptor.class.getName());
 
     /** The type associated with the ADM data output from the feed adaptor */
-    private final IAType outptuType;
+    private final IAType outputType;
 
     /** unique identifier for a feed instance. */
     private final FeedConnectionId feedId;
@@ -58,12 +59,39 @@
     /** The (singleton) instance of IFeedManager **/
     private IFeedManager feedManager;
 
+    /** The library that contains the adapter in use. **/
+    private String adapterLibraryName;
+
+    /**
+     * The adapter factory class that is used to create an instance of the feed adapter.
+     * This value is used only in the case of external adapters.
+     **/
+    private String adapterFactoryClassName;
+
+    /** The configuration parameters associated with the adapter. **/
+    private Map<String, String> adapterConfiguration;
+
+    private ARecordType adapterOutputType;
+
     public FeedIntakeOperatorDescriptor(JobSpecification spec, FeedConnectionId feedId, IAdapterFactory adapterFactory,
             ARecordType atype, RecordDescriptor rDesc, Map<String, String> feedPolicy) {
         super(spec, 0, 1);
         recordDescriptors[0] = rDesc;
         this.adapterFactory = adapterFactory;
-        this.outptuType = atype;
+        this.outputType = atype;
+        this.feedId = feedId;
+        this.feedPolicy = feedPolicy;
+    }
+
+    public FeedIntakeOperatorDescriptor(JobSpecification spec, FeedConnectionId feedId, String adapterLibraryName,
+            String adapterFactoryClassName, Map<String, String> configuration, ARecordType atype,
+            RecordDescriptor rDesc, Map<String, String> feedPolicy) {
+        super(spec, 0, 1);
+        recordDescriptors[0] = rDesc;
+        this.adapterFactoryClassName = adapterFactoryClassName;
+        this.adapterConfiguration = configuration;
+        this.adapterLibraryName = adapterLibraryName;
+        this.outputType = atype;
         this.feedId = feedId;
         this.feedPolicy = feedPolicy;
     }
@@ -80,7 +108,7 @@
         try {
             if (ingestionRuntime == null) {
                 // create an instance of a feed adaptor to ingest data.
-                adapter = (IFeedAdapter) adapterFactory.createAdapter(ctx, partition);
+                adapter = createAdapter(ctx, partition);
                 if (LOGGER.isLoggable(Level.INFO)) {
                     LOGGER.info("Beginning new feed:" + feedId);
                 }
@@ -113,10 +141,68 @@
     }
 
     public IAType getOutputType() {
-        return outptuType;
+        return outputType;
     }
 
     public RecordDescriptor getRecordDescriptor() {
         return recordDescriptors[0];
     }
+
+    private IFeedAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
+        IFeedAdapter feedAdapter = null;
+        if (adapterFactory != null) {
+            feedAdapter = (IFeedAdapter) adapterFactory.createAdapter(ctx, partition);
+        } else {
+            ClassLoader classLoader = ExternalLibraryManager.getLibraryClassLoader(feedId.getDataverse(),
+                    adapterLibraryName);
+            if (classLoader != null) {
+                IAdapterFactory adapterFactory = ((IAdapterFactory) (classLoader.loadClass(adapterFactoryClassName)
+                        .newInstance()));
+
+                switch (adapterFactory.getAdapterType()) {
+                    case TYPED: {
+                        ((ITypedAdapterFactory) adapterFactory).configure(adapterConfiguration);
+                        feedAdapter = (IFeedAdapter) ((ITypedAdapterFactory) adapterFactory).createAdapter(ctx,
+                                partition);
+                    }
+                        break;
+                    case GENERIC: {
+                        String outputTypeName = adapterConfiguration.get(IGenericAdapterFactory.KEY_TYPE_NAME);
+                        if (outputTypeName == null) {
+                            throw new IllegalArgumentException(
+                                    "You must specify the datatype associated with the incoming data. Datatype is specified by the "
+                                            + IGenericAdapterFactory.KEY_TYPE_NAME + " configuration parameter");
+                        }
+                        ((IGenericAdapterFactory) adapterFactory).configure(adapterConfiguration,
+                                (ARecordType) adapterOutputType);
+                        ((IGenericAdapterFactory) adapterFactory).createAdapter(ctx, partition);
+                    }
+                        break;
+                }
+
+                feedAdapter = (IFeedAdapter) adapterFactory.createAdapter(ctx, partition);
+            } else {
+                String message = "Unable to create adapter as class loader not configured for library "
+                        + adapterLibraryName + " in dataverse " + feedId.getDataverse();
+                if (LOGGER.isLoggable(Level.SEVERE)) {
+                    LOGGER.severe(message);
+                }
+                throw new IllegalArgumentException(message);
+
+            }
+        }
+        return feedAdapter;
+    }
+
+    public String getAdapterLibraryName() {
+        return adapterLibraryName;
+    }
+
+    public String getAdapterFactoryClassName() {
+        return adapterFactoryClassName;
+    }
+
+    public Map<String, String> getAdapterConfiguration() {
+        return adapterConfiguration;
+    }
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
index e3d5bed..535c6d8 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
@@ -38,6 +38,7 @@
 import edu.uci.ics.asterix.metadata.functions.ExternalLibraryManager;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
 import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
 import edu.uci.ics.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
@@ -79,9 +80,15 @@
             IOperatorDescriptor opDesc = entry.getValue();
             if (opDesc instanceof FeedIntakeOperatorDescriptor) {
                 FeedIntakeOperatorDescriptor orig = (FeedIntakeOperatorDescriptor) opDesc;
-                FeedIntakeOperatorDescriptor fiop = new FeedIntakeOperatorDescriptor(altered, orig.getFeedId(),
-                        orig.getAdapterFactory(), (ARecordType) orig.getOutputType(), orig.getRecordDescriptor(),
-                        orig.getFeedPolicy());
+                FeedIntakeOperatorDescriptor fiop;
+                if (orig.getAdapterFactory() != null) {
+                    fiop = new FeedIntakeOperatorDescriptor(altered, orig.getFeedId(), orig.getAdapterFactory(),
+                            (ARecordType) orig.getOutputType(), orig.getRecordDescriptor(), orig.getFeedPolicy());
+                } else {
+                    fiop = new FeedIntakeOperatorDescriptor(altered, orig.getFeedId(), orig.getAdapterLibraryName(),
+                            orig.getAdapterFactoryClassName(), orig.getAdapterConfiguration(),
+                            (ARecordType) orig.getOutputType(), orig.getRecordDescriptor(), orig.getFeedPolicy());
+                }
                 oldNewOID.put(opDesc.getOperatorId(), fiop.getOperatorId());
             } else if (opDesc instanceof AsterixLSMTreeInsertDeleteOperatorDescriptor) {
                 FeedMetaOperatorDescriptor metaOp = new FeedMetaOperatorDescriptor(altered, feedConnectionId, opDesc,
@@ -196,7 +203,7 @@
 
     }
 
-    public static Pair<IAdapterFactory, ARecordType> getFeedFactoryAndOutput(Feed feed,
+    public static Triple<IAdapterFactory, ARecordType, AdapterType> getFeedFactoryAndOutput(Feed feed,
             MetadataTransactionContext mdTxnCtx) throws AlgebricksException {
 
         String adapterName = null;
@@ -204,7 +211,7 @@
         String adapterFactoryClassname = null;
         IAdapterFactory adapterFactory = null;
         ARecordType adapterOutputType = null;
-        Pair<IAdapterFactory, ARecordType> feedProps = null;
+        Triple<IAdapterFactory, ARecordType, AdapterType> feedProps = null;
         try {
             adapterName = feed.getAdaptorName();
             adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME,
@@ -258,7 +265,7 @@
                     throw new IllegalStateException(" Unknown factory type for " + adapterFactoryClassname);
             }
 
-            feedProps = Pair.of(adapterFactory, adapterOutputType);
+            feedProps = new Triple(adapterFactory, adapterOutputType, adapterEntity.getType());
         } catch (Exception e) {
             e.printStackTrace();
             throw new AlgebricksException("unable to create adapter  " + e);
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
index 41b1915..8f24dec 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
@@ -1,5 +1,5 @@
 /*
-x * Copyright 2009-2013 by The Regents of the University of California
+ * Copyright 2009-2013 by The Regents of the University of California
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from