Merge branch 'zheilbron/asterix_msr_demo' of https://code.google.com/p/asterixdb into zheilbron/asterix_msr_demo
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/ConnectFeedStatement.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/ConnectFeedStatement.java
index ec8b6be..3ffc1cc 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/ConnectFeedStatement.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/ConnectFeedStatement.java
@@ -29,6 +29,7 @@
 import edu.uci.ics.asterix.metadata.MetadataManager;
 import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
 import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter.AdapterType;
 import edu.uci.ics.asterix.metadata.entities.Feed;
 import edu.uci.ics.asterix.metadata.entities.Function;
 import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies;
@@ -37,6 +38,7 @@
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
 
 public class ConnectFeedStatement implements Statement {
 
@@ -89,11 +91,12 @@
             }
         }
 
-        org.apache.commons.lang3.tuple.Pair<IAdapterFactory, ARecordType> factoryOutput = null;
+        Triple<IAdapterFactory, ARecordType, AdapterType> factoryOutput = null;
         try {
             factoryOutput = FeedUtil.getFeedFactoryAndOutput(sourceFeed, mdTxnCtx);
-            adapterOutputType = factoryOutput.getRight().getTypeName();
+            adapterOutputType = factoryOutput.second.getTypeName();
         } catch (AlgebricksException ae) {
+            ae.printStackTrace();
             throw new MetadataException(ae);
         }
 
diff --git a/asterix-external-data/src/main/resources/schema/library.xsd b/asterix-external-data/src/main/resources/schema/library.xsd
index f58175a..00f71f5 100644
--- a/asterix-external-data/src/main/resources/schema/library.xsd
+++ b/asterix-external-data/src/main/resources/schema/library.xsd
@@ -9,9 +9,7 @@
 	<xs:element name="return_type" type="xs:string" />
 	<xs:element name="function_type" type="xs:string" />
 	<xs:element name="definition" type="xs:string" />
-
 	<xs:element name="factory_class" type="xs:string" />
-	<xs:element name="adaptor_type" type="xs:string" />
 
 
 	<!-- definition of complex elements -->
@@ -40,7 +38,6 @@
 			<xs:sequence>
 				<xs:element ref="lib:name" />
 				<xs:element ref="lib:factory_class" />
-				<xs:element ref="lib:adaptor_type" />
 			</xs:sequence>
 		</xs:complexType>
 	</xs:element>
@@ -58,7 +55,7 @@
 			<xs:sequence>
 				<xs:element ref="lib:language" />
 				<xs:element ref="lib:libraryFunctions" minOccurs="0" />
-			    <xs:element ref="lib:libraryAdapters" minOccurs="0" />
+				<xs:element ref="lib:libraryAdapters" minOccurs="0" />
 			</xs:sequence>
 		</xs:complexType>
 	</xs:element>
diff --git a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adaptor/TestTypedAdaptor.java b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adaptor/TestTypedAdaptor.java
new file mode 100644
index 0000000..cb0142a
--- /dev/null
+++ b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adaptor/TestTypedAdaptor.java
@@ -0,0 +1,134 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.library.adaptor;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import edu.uci.ics.asterix.external.dataset.adapter.StreamBasedAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IFeedAdapter;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
+
+public class TestTypedAdaptor extends StreamBasedAdapter implements IFeedAdapter {
+
+    private static final long serialVersionUID = 1L;
+
+    private final PipedOutputStream pos;
+
+    private final PipedInputStream pis;
+
+    private final Map<String, String> configuration;
+
+    private DummyGenerator generator;
+
+    public TestTypedAdaptor(ITupleParserFactory parserFactory, ARecordType sourceDatatype, IHyracksTaskContext ctx,
+            Map<String, String> configuration) throws IOException {
+        super(parserFactory, sourceDatatype, ctx);
+        pos = new PipedOutputStream();
+        pis = new PipedInputStream(pos);
+        this.configuration = configuration;
+    }
+
+    @Override
+    public InputStream getInputStream(int partition) throws IOException {
+        return pis;
+    }
+
+    @Override
+    public void start(int partition, IFrameWriter frameWriter) throws Exception {
+        generator = new DummyGenerator(configuration, pos);
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        executor.execute(generator);
+        super.start(partition, frameWriter);
+    }
+
+    private static class DummyGenerator implements Runnable {
+
+        private final int nOutputRecords;
+        private final OutputStream os;
+        private final byte[] EOL = "\n".getBytes();
+        private boolean continueIngestion;
+
+        public DummyGenerator(Map<String, String> configuration, OutputStream os) {
+            nOutputRecords = Integer.parseInt(configuration.get(TestTypedAdaptorFactory.KEY_NUM_OUTPUT_RECORDS));
+            this.os = os;
+            this.continueIngestion = true;
+        }
+
+        @Override
+        public void run() {
+            DummyRecord dummyRecord = new DummyRecord();
+            try {
+                int i = 0;
+                while (continueIngestion && i < nOutputRecords) {
+                    dummyRecord.reset(i + 1, "" + (i + 1));
+                    os.write(dummyRecord.toString().getBytes());
+                    os.write(EOL);
+                    i++;
+                }
+            } catch (IOException ioe) {
+                ioe.printStackTrace();
+            } finally {
+                try {
+                    os.close();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+
+        public void stop() {
+            continueIngestion = false;
+        }
+    }
+
+    private static class DummyRecord {
+
+        private int tweetid = 0;
+        private String text = null;
+
+        public void reset(int tweetid, String text) {
+            this.tweetid = tweetid;
+            this.text = text;
+        }
+
+        @Override
+        public String toString() {
+            return "{" + "\"tweetid\":" + "int64(" + "\"" + tweetid + "\"" + ")" + "," + "\"message-text\":" + "\""
+                    + text + "\"" + "}";
+        }
+
+    }
+
+    @Override
+    public DataExchangeMode getDataExchangeMode() {
+        return DataExchangeMode.PUSH;
+    }
+
+    @Override
+    public void stop() throws Exception {
+        generator.stop();
+    }
+
+}
diff --git a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adaptor/TestTypedAdaptorFactory.java b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adaptor/TestTypedAdaptorFactory.java
new file mode 100644
index 0000000..39f7ab2
--- /dev/null
+++ b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adaptor/TestTypedAdaptorFactory.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.library.adaptor;
+
+import java.util.Map;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.runtime.operators.file.AdmSchemafullRecordParserFactory;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
+
+public class TestTypedAdaptorFactory implements ITypedAdapterFactory {
+
+    public static final String NAME = "test_typed_adaptor";
+
+    private static ARecordType adapterOutputType = initOutputType();
+
+    public static final String KEY_NUM_OUTPUT_RECORDS = "num_output_records";
+
+    private Map<String, String> configuration;
+
+    @Override
+    public SupportedOperation getSupportedOperations() {
+        return SupportedOperation.READ;
+    }
+
+    private static ARecordType initOutputType() {
+        String[] fieldNames = new String[] { "tweetid", "message-text" };
+        IAType[] fieldTypes = new IAType[] { BuiltinType.AINT64, BuiltinType.ASTRING };
+        ARecordType outputType = null;
+        try {
+            outputType = new ARecordType("TestTypedAdaptorOutputType", fieldNames, fieldTypes, false);
+        } catch (AsterixException exception) {
+            throw new IllegalStateException("Unable to create output type for adaptor " + NAME);
+        }
+        return outputType;
+    }
+
+    @Override
+    public String getName() {
+        return NAME;
+    }
+
+    @Override
+    public AdapterType getAdapterType() {
+        return AdapterType.TYPED;
+    }
+
+    @Override
+    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+        return new AlgebricksCountPartitionConstraint(1);
+    }
+
+    @Override
+    public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
+        ITupleParserFactory tupleParserFactory = new AdmSchemafullRecordParserFactory(adapterOutputType);
+        return new TestTypedAdaptor(tupleParserFactory, adapterOutputType, ctx, configuration);
+    }
+
+    @Override
+    public ARecordType getAdapterOutputType() {
+        return adapterOutputType;
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration) throws Exception {
+        this.configuration = configuration;
+    }
+
+}
diff --git a/asterix-external-data/src/test/resources/text_functions.xml b/asterix-external-data/src/test/resources/text_functions.xml
index bd485a4..a0b7bf9 100644
--- a/asterix-external-data/src/test/resources/text_functions.xml
+++ b/asterix-external-data/src/test/resources/text_functions.xml
@@ -50,4 +50,10 @@
 			</definition>
 		</libraryFunction>
 	</libraryFunctions>
+	<libraryAdapters>
+		<libraryAdapter>
+			<name>test_typed_adaptor</name>
+			<factory_class>edu.uci.ics.asterix.external.library.adaptor.TestTypedAdaptorFactory</factory_class>
+		</libraryAdapter>
+	</libraryAdapters>
 </externalLibrary>
diff --git a/asterix-installer/src/test/resources/integrationts/library/queries/library-adapters/typed_adapter/typed_adapter.1.ddl.aql b/asterix-installer/src/test/resources/integrationts/library/queries/library-adapters/typed_adapter/typed_adapter.1.ddl.aql
new file mode 100644
index 0000000..f5fe458
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/library/queries/library-adapters/typed_adapter/typed_adapter.1.ddl.aql
@@ -0,0 +1,21 @@
+/*
+ * Description  : Create a feed dataset that uses the feed simulator adapter.
+                  The feed simulator simulates feed from a file in the local fs.
+                  Associate with the feed an external user-defined function. The UDF 
+                  finds topics in each tweet. A topic is identified by a #. 
+                  Begin ingestion and apply external user defined function
+ * Expected Res : Success
+ * Date         : 23rd Apr 2013
+ */
+use dataverse externallibtest;
+
+create type TestTypedAdaptorOutputType as closed {
+  tweetid: int64,
+  message-text: string
+}
+
+create dataset TweetsTestAdaptor(TestTypedAdaptorOutputType)
+primary key tweetid;
+
+create feed TestTypedAdaptorFeed
+using "testlib#test_typed_adaptor" (("num_output_records"="5"),("type-name"="TestTypedAdaptorOutputType"));
diff --git a/asterix-installer/src/test/resources/integrationts/library/queries/library-adapters/typed_adapter/typed_adapter.2.update.aql b/asterix-installer/src/test/resources/integrationts/library/queries/library-adapters/typed_adapter/typed_adapter.2.update.aql
new file mode 100644
index 0000000..a26a148
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/library/queries/library-adapters/typed_adapter/typed_adapter.2.update.aql
@@ -0,0 +1,14 @@
+/*
+ * Description  : Create a feed dataset that uses the feed simulator adapter.
+                  The feed simulator simulates feed from a file in the local fs.
+                  Associate with the feed an external user-defined function. The UDF 
+                  finds topics in each tweet. A topic is identified by a #. 
+                  Begin ingestion and apply external user defined function
+ * Expected Res : Success
+ * Date         : 23rd Apr 2013
+ */
+use dataverse externallibtest;
+
+set wait-for-completion-feed "true";
+
+connect feed TestTypedAdaptorFeed to dataset TweetsTestAdaptor;
diff --git a/asterix-installer/src/test/resources/integrationts/library/queries/library-adapters/typed_adapter/typed_adapter.3.query.aql b/asterix-installer/src/test/resources/integrationts/library/queries/library-adapters/typed_adapter/typed_adapter.3.query.aql
new file mode 100644
index 0000000..733b5a0
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/library/queries/library-adapters/typed_adapter/typed_adapter.3.query.aql
@@ -0,0 +1,14 @@
+/*
+ * Description  : Create a feed dataset that uses the feed simulator adapter.
+                  The feed simulator simulates feed from a file in the local fs.
+                  Associate with the feed an external user-defined function. The UDF 
+                  finds topics in each tweet. A topic is identified by a #. 
+                  Begin ingestion and apply external user defined function
+ * Expected Res : Success
+ * Date         : 23rd Apr 2013
+ */
+use dataverse externallibtest;
+
+for $x in dataset TweetsTestAdaptor
+order by $x.tweetid
+return $x
diff --git a/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.1.ddl.aql b/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.1.ddl.aql
index 374e1b3..43ff18b 100644
--- a/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.1.ddl.aql
+++ b/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.1.ddl.aql
@@ -27,9 +27,9 @@
 }
 
 create feed TweetFeed
-using "edu.uci.ics.asterix.tools.external.data.RateControlledFileSystemBasedAdapterFactory"
+using file_feed
 (("type-name"="TweetInputType"),("fs"="localfs"),("path"="127.0.0.1://../../../../../../asterix-app/data/twitter/obamatweets.adm"),("format"="adm"),("tuple-interval"="10"))
 apply function testlib#parseTweet;
 
-create dataset Tweets(TweetOutputType)
+create dataset TweetsFeedIngest(TweetOutputType) 
 primary key id;
diff --git a/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.2.update.aql b/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.2.update.aql
index 8dfa98d..7414bba 100644
--- a/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.2.update.aql
+++ b/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.2.update.aql
@@ -11,4 +11,4 @@
 
 set wait-for-completion-feed "true";
 
-connect feed TweetFeed to dataset Tweets;
+connect feed TweetFeed to dataset TweetsFeedIngest;
diff --git a/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.3.query.aql b/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.3.query.aql
index 5f74687..7d838be 100644
--- a/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.3.query.aql
+++ b/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/feed_ingest/feed_ingest.3.query.aql
@@ -9,5 +9,5 @@
  */
 use dataverse externallibtest;
 
-for $x in dataset Tweets
+for $x in dataset TweetsFeedIngest
 return $x
diff --git a/asterix-installer/src/test/resources/integrationts/library/results/library-adapters/typed_adapter/typed_adapter.1.adm b/asterix-installer/src/test/resources/integrationts/library/results/library-adapters/typed_adapter/typed_adapter.1.adm
new file mode 100644
index 0000000..2ad7b60
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/library/results/library-adapters/typed_adapter/typed_adapter.1.adm
@@ -0,0 +1,5 @@
+{ "tweetid": 1i64, "message-text": "1" }
+{ "tweetid": 2i64, "message-text": "2" }
+{ "tweetid": 3i64, "message-text": "3" }
+{ "tweetid": 4i64, "message-text": "4" }
+{ "tweetid": 5i64, "message-text": "5" }
diff --git a/asterix-installer/src/test/resources/integrationts/library/testsuite.xml b/asterix-installer/src/test/resources/integrationts/library/testsuite.xml
index 65780d9..be9ba0e 100644
--- a/asterix-installer/src/test/resources/integrationts/library/testsuite.xml
+++ b/asterix-installer/src/test/resources/integrationts/library/testsuite.xml
@@ -45,5 +45,12 @@
       </compilation-unit>
     </test-case>
   </test-group>
+  <test-group name="library-adapters">
+    <test-case FilePath="library-adapters">
+      <compilation-unit name="typed_adapter">
+        <output-dir compare="Text">typed_adapter</output-dir>
+      </compilation-unit>
+    </test-case>
+  </test-group>
 </test-suite>
 
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index c6c1559..d29e2c5 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -21,6 +21,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
@@ -52,6 +53,7 @@
 import edu.uci.ics.asterix.metadata.declared.AqlDataSource.AqlDataSourceType;
 import edu.uci.ics.asterix.metadata.entities.Dataset;
 import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
+import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter.AdapterType;
 import edu.uci.ics.asterix.metadata.entities.Datatype;
 import edu.uci.ics.asterix.metadata.entities.Dataverse;
 import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
@@ -96,6 +98,7 @@
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
@@ -414,13 +417,14 @@
 
         FeedDataSource feedDataSource = (FeedDataSource) dataSource;
         FeedIntakeOperatorDescriptor feedIngestor = null;
-        org.apache.commons.lang3.tuple.Pair<IAdapterFactory, ARecordType> factoryOutput = null;
+        Triple<IAdapterFactory, ARecordType, AdapterType> factoryOutput = null;
         AlgebricksPartitionConstraint constraint = null;
 
         try {
             factoryOutput = FeedUtil.getFeedFactoryAndOutput(feedDataSource.getFeed(), mdTxnCtx);
-            IAdapterFactory adapterFactory = factoryOutput.getLeft();
-            ARecordType adapterOutputType = factoryOutput.getRight();
+            IAdapterFactory adapterFactory = factoryOutput.first;
+            ARecordType adapterOutputType = factoryOutput.second;
+            AdapterType adapterType = factoryOutput.third;
 
             ISerializerDeserializer payloadSerde = NonTaggedDataFormat.INSTANCE.getSerdeProvider()
                     .getSerializerDeserializer(adapterOutputType);
@@ -432,12 +436,25 @@
                 throw new AlgebricksException("Feed not configured with a policy");
             }
             feedPolicy.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, feedPolicy.getPolicyName());
-            feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, new FeedConnectionId(
-                    feedDataSource.getDatasourceDataverse(), feedDataSource.getDatasourceName(), feedDataSource
-                            .getFeedConnectionId().getDatasetName()), adapterFactory, (ARecordType) adapterOutputType,
-                    feedDesc, feedPolicy.getProperties());
-
-            constraint = factoryOutput.getLeft().getPartitionConstraint();
+            switch (adapterType) {
+                case INTERNAL:
+                    feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, new FeedConnectionId(
+                            feedDataSource.getDatasourceDataverse(), feedDataSource.getDatasourceName(), feedDataSource
+                                    .getFeedConnectionId().getDatasetName()), adapterFactory,
+                            (ARecordType) adapterOutputType, feedDesc, feedPolicy.getProperties());
+                    break;
+                case EXTERNAL:
+                    String libraryName = feedDataSource.getFeed().getAdaptorName().split("#")[0];
+                    feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, feedDataSource.getFeedConnectionId(),
+                            libraryName, adapterFactory.getClass().getName(), feedDataSource.getFeed()
+                                    .getAdaptorConfiguration(), (ARecordType) adapterOutputType, feedDesc,
+                            feedPolicy.getProperties());
+                    break;
+            }
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Cofigured feed intake operator with " + adapterType + " adapter");
+            }
+            constraint = factoryOutput.first.getPartitionConstraint();
         } catch (Exception e) {
             throw new AlgebricksException(e);
         }
@@ -1024,7 +1041,8 @@
             int datasetId = dataset.getDatasetId();
             TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
             SecondaryIndexModificationOperationCallbackFactory modificationCallbackFactory = new SecondaryIndexModificationOperationCallbackFactory(
-                    jobId, datasetId, modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE);
+                    jobId, datasetId, modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
+                    ResourceType.LSM_BTREE);
 
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
                     dataset, mdTxnCtx);
@@ -1150,7 +1168,8 @@
             int datasetId = dataset.getDatasetId();
             TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
             SecondaryIndexModificationOperationCallbackFactory modificationCallbackFactory = new SecondaryIndexModificationOperationCallbackFactory(
-                    jobId, datasetId, modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_INVERTED_INDEX);
+                    jobId, datasetId, modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
+                    ResourceType.LSM_INVERTED_INDEX);
 
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
                     dataset, mdTxnCtx);
@@ -1241,7 +1260,8 @@
             int datasetId = dataset.getDatasetId();
             TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
             SecondaryIndexModificationOperationCallbackFactory modificationCallbackFactory = new SecondaryIndexModificationOperationCallbackFactory(
-                    jobId, datasetId, modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_RTREE);
+                    jobId, datasetId, modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
+                    ResourceType.LSM_RTREE);
 
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
                     dataset, mdTxnCtx);
@@ -1427,6 +1447,9 @@
                 "edu.uci.ics.asterix.external.dataset.adapter..RSSFeedAdapterFactory");
         adapterFactoryMapping.put("edu.uci.ics.asterix.external.dataset.adapter.CNNFeedAdapter",
                 "edu.uci.ics.asterix.external.dataset.adapter.CNNFeedAdapterFactory");
+        adapterFactoryMapping.put("edu.uci.ics.asterix.tools.external.data.RateControlledFileSystemBasedAdapter",
+                "edu.uci.ics.asterix.tools.external.data.RateControlledFileSystemBasedAdapterFactory");
+
         return adapterFactoryMapping;
     }
 
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
index 859ed11..b732191 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
@@ -23,6 +23,7 @@
 import edu.uci.ics.asterix.common.feeds.FeedRuntime.FeedRuntimeId;
 import edu.uci.ics.asterix.common.feeds.FeedRuntime.FeedRuntimeType;
 import edu.uci.ics.asterix.common.feeds.IFeedManager;
+import edu.uci.ics.asterix.metadata.functions.ExternalLibraryManager;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -44,7 +45,7 @@
     private static final Logger LOGGER = Logger.getLogger(FeedIntakeOperatorDescriptor.class.getName());
 
     /** The type associated with the ADM data output from the feed adaptor */
-    private final IAType outptuType;
+    private final IAType outputType;
 
     /** unique identifier for a feed instance. */
     private final FeedConnectionId feedId;
@@ -58,12 +59,39 @@
     /** The (singleton) instance of IFeedManager **/
     private IFeedManager feedManager;
 
+    /** The library that contains the adapter in use. **/
+    private String adapterLibraryName;
+
+    /**
+     * The adapter factory class that is used to create an instance of the feed adapter.
+     * This value is used only in the case of external adapters.
+     **/
+    private String adapterFactoryClassName;
+
+    /** The configuration parameters associated with the adapter. **/
+    private Map<String, String> adapterConfiguration;
+
+    private ARecordType adapterOutputType;
+
     public FeedIntakeOperatorDescriptor(JobSpecification spec, FeedConnectionId feedId, IAdapterFactory adapterFactory,
             ARecordType atype, RecordDescriptor rDesc, Map<String, String> feedPolicy) {
         super(spec, 0, 1);
         recordDescriptors[0] = rDesc;
         this.adapterFactory = adapterFactory;
-        this.outptuType = atype;
+        this.outputType = atype;
+        this.feedId = feedId;
+        this.feedPolicy = feedPolicy;
+    }
+
+    public FeedIntakeOperatorDescriptor(JobSpecification spec, FeedConnectionId feedId, String adapterLibraryName,
+            String adapterFactoryClassName, Map<String, String> configuration, ARecordType atype,
+            RecordDescriptor rDesc, Map<String, String> feedPolicy) {
+        super(spec, 0, 1);
+        recordDescriptors[0] = rDesc;
+        this.adapterFactoryClassName = adapterFactoryClassName;
+        this.adapterConfiguration = configuration;
+        this.adapterLibraryName = adapterLibraryName;
+        this.outputType = atype;
         this.feedId = feedId;
         this.feedPolicy = feedPolicy;
     }
@@ -80,7 +108,7 @@
         try {
             if (ingestionRuntime == null) {
                 // create an instance of a feed adaptor to ingest data.
-                adapter = (IFeedAdapter) adapterFactory.createAdapter(ctx, partition);
+                adapter = createAdapter(ctx, partition);
                 if (LOGGER.isLoggable(Level.INFO)) {
                     LOGGER.info("Beginning new feed:" + feedId);
                 }
@@ -113,10 +141,68 @@
     }
 
     public IAType getOutputType() {
-        return outptuType;
+        return outputType;
     }
 
     public RecordDescriptor getRecordDescriptor() {
         return recordDescriptors[0];
     }
+
+    private IFeedAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
+        IFeedAdapter feedAdapter = null;
+        if (adapterFactory != null) {
+            feedAdapter = (IFeedAdapter) adapterFactory.createAdapter(ctx, partition);
+        } else {
+            ClassLoader classLoader = ExternalLibraryManager.getLibraryClassLoader(feedId.getDataverse(),
+                    adapterLibraryName);
+            if (classLoader != null) {
+                IAdapterFactory adapterFactory = ((IAdapterFactory) (classLoader.loadClass(adapterFactoryClassName)
+                        .newInstance()));
+
+                switch (adapterFactory.getAdapterType()) {
+                    case TYPED: {
+                        ((ITypedAdapterFactory) adapterFactory).configure(adapterConfiguration);
+                        feedAdapter = (IFeedAdapter) ((ITypedAdapterFactory) adapterFactory).createAdapter(ctx,
+                                partition);
+                    }
+                        break;
+                    case GENERIC: {
+                        String outputTypeName = adapterConfiguration.get(IGenericAdapterFactory.KEY_TYPE_NAME);
+                        if (outputTypeName == null) {
+                            throw new IllegalArgumentException(
+                                    "You must specify the datatype associated with the incoming data. Datatype is specified by the "
+                                            + IGenericAdapterFactory.KEY_TYPE_NAME + " configuration parameter");
+                        }
+                        ((IGenericAdapterFactory) adapterFactory).configure(adapterConfiguration,
+                                (ARecordType) adapterOutputType);
+                        ((IGenericAdapterFactory) adapterFactory).createAdapter(ctx, partition);
+                    }
+                        break;
+                }
+
+                feedAdapter = (IFeedAdapter) adapterFactory.createAdapter(ctx, partition);
+            } else {
+                String message = "Unable to create adapter as class loader not configured for library "
+                        + adapterLibraryName + " in dataverse " + feedId.getDataverse();
+                if (LOGGER.isLoggable(Level.SEVERE)) {
+                    LOGGER.severe(message);
+                }
+                throw new IllegalArgumentException(message);
+
+            }
+        }
+        return feedAdapter;
+    }
+
+    public String getAdapterLibraryName() {
+        return adapterLibraryName;
+    }
+
+    public String getAdapterFactoryClassName() {
+        return adapterFactoryClassName;
+    }
+
+    public Map<String, String> getAdapterConfiguration() {
+        return adapterConfiguration;
+    }
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
index 8f140d7..6e0ed83 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
@@ -34,6 +34,7 @@
 import edu.uci.ics.asterix.metadata.bootstrap.MetadataConstants;
 import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
 import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
+import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter.AdapterType;
 import edu.uci.ics.asterix.metadata.entities.Feed;
 import edu.uci.ics.asterix.metadata.entities.FeedActivity;
 import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
@@ -41,6 +42,7 @@
 import edu.uci.ics.asterix.metadata.functions.ExternalLibraryManager;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
 import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
 import edu.uci.ics.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
@@ -94,9 +96,15 @@
             IOperatorDescriptor opDesc = entry.getValue();
             if (opDesc instanceof FeedIntakeOperatorDescriptor) {
                 FeedIntakeOperatorDescriptor orig = (FeedIntakeOperatorDescriptor) opDesc;
-                FeedIntakeOperatorDescriptor fiop = new FeedIntakeOperatorDescriptor(altered, orig.getFeedId(),
-                        orig.getAdapterFactory(), (ARecordType) orig.getOutputType(), orig.getRecordDescriptor(),
-                        orig.getFeedPolicy());
+                FeedIntakeOperatorDescriptor fiop;
+                if (orig.getAdapterFactory() != null) {
+                    fiop = new FeedIntakeOperatorDescriptor(altered, orig.getFeedId(), orig.getAdapterFactory(),
+                            (ARecordType) orig.getOutputType(), orig.getRecordDescriptor(), orig.getFeedPolicy());
+                } else {
+                    fiop = new FeedIntakeOperatorDescriptor(altered, orig.getFeedId(), orig.getAdapterLibraryName(),
+                            orig.getAdapterFactoryClassName(), orig.getAdapterConfiguration(),
+                            (ARecordType) orig.getOutputType(), orig.getRecordDescriptor(), orig.getFeedPolicy());
+                }
                 oldNewOID.put(opDesc.getOperatorId(), fiop.getOperatorId());
             } else if (opDesc instanceof AsterixLSMTreeInsertDeleteOperatorDescriptor) {
                 FeedMetaOperatorDescriptor metaOp = new FeedMetaOperatorDescriptor(altered, feedConnectionId, opDesc,
@@ -221,7 +229,7 @@
 
     }
 
-    public static Pair<IAdapterFactory, ARecordType> getFeedFactoryAndOutput(Feed feed,
+    public static Triple<IAdapterFactory, ARecordType, AdapterType> getFeedFactoryAndOutput(Feed feed,
             MetadataTransactionContext mdTxnCtx) throws AlgebricksException {
 
         String adapterName = null;
@@ -229,7 +237,7 @@
         String adapterFactoryClassname = null;
         IAdapterFactory adapterFactory = null;
         ARecordType adapterOutputType = null;
-        Pair<IAdapterFactory, ARecordType> feedProps = null;
+        Triple<IAdapterFactory, ARecordType, AdapterType> feedProps = null;
         try {
             adapterName = feed.getAdaptorName();
             adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME,
@@ -254,8 +262,7 @@
                 }
             } else {
                 adapterFactoryClassname = AqlMetadataProvider.adapterFactoryMapping.get(adapterName);
-                if (adapterFactoryClassname != null) {
-                } else {
+                if (adapterFactoryClassname == null) {
                     adapterFactoryClassname = adapterName;
                 }
                 adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
@@ -283,7 +290,8 @@
                     throw new IllegalStateException(" Unknown factory type for " + adapterFactoryClassname);
             }
 
-            feedProps = Pair.of(adapterFactory, adapterOutputType);
+            feedProps = new Triple<IAdapterFactory, ARecordType, AdapterType>(adapterFactory, adapterOutputType,
+                    adapterEntity.getType());
         } catch (Exception e) {
             e.printStackTrace();
             throw new AlgebricksException("unable to create adapter  " + e);
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
index 9c60779..299bbfb 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
@@ -26,17 +26,14 @@
 import edu.uci.ics.asterix.common.functions.FunctionSignature;
 import edu.uci.ics.asterix.om.typecomputer.base.IResultTypeComputer;
 import edu.uci.ics.asterix.om.typecomputer.impl.ABooleanTypeComputer;
-import edu.uci.ics.asterix.om.typecomputer.impl.ACircleTypeComputer;
 import edu.uci.ics.asterix.om.typecomputer.impl.ADateTimeTypeComputer;
 import edu.uci.ics.asterix.om.typecomputer.impl.ADateTypeComputer;
 import edu.uci.ics.asterix.om.typecomputer.impl.ADoubleTypeComputer;
 import edu.uci.ics.asterix.om.typecomputer.impl.AFloatTypeComputer;
 import edu.uci.ics.asterix.om.typecomputer.impl.AInt32TypeComputer;
 import edu.uci.ics.asterix.om.typecomputer.impl.AInt64TypeComputer;
-import edu.uci.ics.asterix.om.typecomputer.impl.ALineTypeComputer;
 import edu.uci.ics.asterix.om.typecomputer.impl.ANullTypeComputer;
 import edu.uci.ics.asterix.om.typecomputer.impl.APointTypeComputer;
-import edu.uci.ics.asterix.om.typecomputer.impl.APolygonTypeComputer;
 import edu.uci.ics.asterix.om.typecomputer.impl.ARectangleTypeComputer;
 import edu.uci.ics.asterix.om.typecomputer.impl.AStringTypeComputer;
 import edu.uci.ics.asterix.om.typecomputer.impl.ATimeTypeComputer;
@@ -637,12 +634,12 @@
         addFunction(COUNT, AInt64TypeComputer.INSTANCE, true);
         addPrivateFunction(COUNTHASHED_GRAM_TOKENS, OrderedListOfAInt32TypeComputer.INSTANCE, true);
         addPrivateFunction(COUNTHASHED_WORD_TOKENS, OrderedListOfAInt32TypeComputer.INSTANCE, true);
-        addFunction(CREATE_CIRCLE, ACircleTypeComputer.INSTANCE, true);
-        addFunction(CREATE_LINE, ALineTypeComputer.INSTANCE, true);
-        addPrivateFunction(CREATE_MBR, ADoubleTypeComputer.INSTANCE, true);
-        addFunction(CREATE_POINT, APointTypeComputer.INSTANCE, true);
-        addFunction(CREATE_POLYGON, APolygonTypeComputer.INSTANCE, true);
-        addFunction(CREATE_RECTANGLE, ARectangleTypeComputer.INSTANCE, true);
+        addFunction(CREATE_CIRCLE, OptionalACircleTypeComputer.INSTANCE, true);
+        addFunction(CREATE_LINE, OptionalALineTypeComputer.INSTANCE, true);
+        addPrivateFunction(CREATE_MBR, OptionalADoubleTypeComputer.INSTANCE, true);
+        addFunction(CREATE_POINT, OptionalAPointTypeComputer.INSTANCE, true);
+        addFunction(CREATE_POLYGON, OptionalAPolygonTypeComputer.INSTANCE, true);
+        addFunction(CREATE_RECTANGLE, OptionalARectangleTypeComputer.INSTANCE, true);
         addFunction(CREATE_UUID, AUUIDTypeComputer.INSTANCE, false);
 
         addFunction(DATE_CONSTRUCTOR, OptionalADateTypeComputer.INSTANCE, true);
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/ConcatNonNullTypeComputer.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/ConcatNonNullTypeComputer.java
index 7bf2668..7680c15 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/ConcatNonNullTypeComputer.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/ConcatNonNullTypeComputer.java
@@ -15,12 +15,7 @@
 
 package edu.uci.ics.asterix.om.typecomputer.impl;
 
-import java.util.ArrayList;
-import java.util.List;
-
 import edu.uci.ics.asterix.om.typecomputer.base.IResultTypeComputer;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.AUnionType;
 import edu.uci.ics.asterix.om.types.BuiltinType;
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -46,29 +41,18 @@
         if (f.getArguments().size() < 1) {
             return BuiltinType.ANULL;
         }
-        List<IAType> possibleTypes = new ArrayList<IAType>();
+
+        TypeCompatibilityChecker tcc = new TypeCompatibilityChecker();
         for (int i = 0; i < f.getArguments().size(); i++) {
             ILogicalExpression arg = f.getArguments().get(i).getValue();
             IAType type = (IAType) env.getType(arg);
-            if (type.getTypeTag() == ATypeTag.UNION) {
-                List<IAType> typeList = ((AUnionType) type).getUnionList();
-                for (IAType t : typeList) {
-                    if (t.getTypeTag() != ATypeTag.NULL) {
-                        //CONCAT_NON_NULL cannot return null because it's only used for if-else construct
-                        if (!possibleTypes.contains(t))
-                            possibleTypes.add(t);
-                    }
-                }
-            } else {
-                if (!possibleTypes.contains(type))
-                    possibleTypes.add(type);
-            }
+            tcc.addPossibleType(type);
         }
-        if (possibleTypes.size() == 1) {
-            return possibleTypes.get(0);
-        } else {
+
+        IAType result = tcc.getCompatibleType();
+        if (result == null) {
             throw new AlgebricksException("The two branches of the if-else clause should return the same type.");
         }
+        return result;
     }
-
 }
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/NonTaggedSwitchCaseComputer.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/NonTaggedSwitchCaseComputer.java
index c1450ca..bd67f18 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/NonTaggedSwitchCaseComputer.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/NonTaggedSwitchCaseComputer.java
@@ -15,10 +15,7 @@
 package edu.uci.ics.asterix.om.typecomputer.impl;
 
 import edu.uci.ics.asterix.om.typecomputer.base.IResultTypeComputer;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.AUnionType;
 import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.asterix.om.util.NonTaggedFormatUtil;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
@@ -40,46 +37,27 @@
         if (fce.getArguments().size() < 3)
             throw new AlgebricksException(errMsg1);
 
-        IAType t0;
-        IAType t1;
-        IAType ti;
-
-        ATypeTag tag0;
-        ATypeTag tag1;
-        ATypeTag tagi;
-        try {
-            t0 = (IAType) env.getType(fce.getArguments().get(0).getValue());
-            t1 = (IAType) env.getType(fce.getArguments().get(2).getValue());
-            tag0 = t0.getTypeTag();
-            tag1 = t1.getTypeTag();
-            if (t0.getTypeTag() == ATypeTag.UNION && NonTaggedFormatUtil.isOptionalField((AUnionType) t0))
-                tag0 = ((AUnionType) t0).getUnionList().get(NonTaggedFormatUtil.OPTIONAL_TYPE_INDEX_IN_UNION_LIST)
-                        .getTypeTag();
-            if (t1.getTypeTag() == ATypeTag.UNION && NonTaggedFormatUtil.isOptionalField((AUnionType) t1))
-                tag1 = ((AUnionType) t1).getUnionList().get(NonTaggedFormatUtil.OPTIONAL_TYPE_INDEX_IN_UNION_LIST)
-                        .getTypeTag();
-            for (int i = 2; i < fce.getArguments().size(); i += 2) {
-                ti = (IAType) env.getType(fce.getArguments().get(i).getValue());
-                tagi = ti.getTypeTag();
-                if (ti.getTypeTag() == ATypeTag.UNION && NonTaggedFormatUtil.isOptionalField((AUnionType) ti))
-                    tagi = ((AUnionType) ti).getUnionList().get(NonTaggedFormatUtil.OPTIONAL_TYPE_INDEX_IN_UNION_LIST)
-                            .getTypeTag();
-                if (tag1 != tagi)
-                    if (!t1.toString().equals(ti.toString()))
-                        throw new AlgebricksException(errMsg2);
-            }
-            for (int i = 1; i < fce.getArguments().size(); i += 2) {
-                ti = (IAType) env.getType(fce.getArguments().get(i).getValue());
-                tagi = ti.getTypeTag();
-                if (ti.getTypeTag() == ATypeTag.UNION && NonTaggedFormatUtil.isOptionalField((AUnionType) ti))
-                    tagi = ((AUnionType) ti).getUnionList().get(NonTaggedFormatUtil.OPTIONAL_TYPE_INDEX_IN_UNION_LIST)
-                            .getTypeTag();
-                if (tag0 != tagi)
-                    throw new AlgebricksException(errMsg3);
-            }
-        } catch (AlgebricksException e) {
-            throw new AlgebricksException(e);
+        TypeCompatibilityChecker tcc = new TypeCompatibilityChecker();
+        for (int i = 2; i < fce.getArguments().size(); i += 2) {
+            IAType ti = (IAType) env.getType(fce.getArguments().get(i).getValue());
+            tcc.addPossibleType(ti);
         }
-        return t1;
+        IAType valueType = tcc.getCompatibleType();
+        if (valueType == null) {
+            throw new AlgebricksException(errMsg2);
+        }
+
+        IAType switchType = (IAType) env.getType(fce.getArguments().get(0).getValue());
+        tcc.reset();
+        tcc.addPossibleType(switchType);
+        for (int i = 1; i < fce.getArguments().size(); i += 2) {
+            IAType ti = (IAType) env.getType(fce.getArguments().get(i).getValue());
+            tcc.addPossibleType(ti);
+        }
+        IAType caseType = tcc.getCompatibleType();
+        if (caseType == null) {
+            throw new AlgebricksException(errMsg3);
+        }
+        return valueType;
     }
 }
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/TypeCompatibilityChecker.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/TypeCompatibilityChecker.java
new file mode 100644
index 0000000..0739b2f
--- /dev/null
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/TypeCompatibilityChecker.java
@@ -0,0 +1,61 @@
+package edu.uci.ics.asterix.om.typecomputer.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.AUnionType;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.IAType;
+
+class TypeCompatibilityChecker {
+    private final List<IAType> possibleTypes;
+    private boolean nullEncountered;
+
+    public TypeCompatibilityChecker() {
+        possibleTypes = new ArrayList<IAType>();
+        nullEncountered = false;
+    }
+
+    public void reset() {
+        possibleTypes.clear();
+        nullEncountered = false;
+    }
+
+    public void addPossibleType(IAType type) {
+        if (type.getTypeTag() == ATypeTag.UNION) {
+            List<IAType> typeList = ((AUnionType) type).getUnionList();
+            for (IAType t : typeList) {
+                if (t.getTypeTag() != ATypeTag.NULL) {
+                    //CONCAT_NON_NULL cannot return null because it's only used for if-else construct
+                    if (!possibleTypes.contains(t))
+                        possibleTypes.add(t);
+                } else {
+                    nullEncountered = true;
+                }
+            }
+        } else {
+            if (type.getTypeTag() != ATypeTag.NULL) {
+                if (!possibleTypes.contains(type)) {
+                    possibleTypes.add(type);
+                }
+            } else {
+                nullEncountered = true;
+            }
+        }
+    }
+
+    public IAType getCompatibleType() {
+        switch (possibleTypes.size()) {
+            case 0:
+                return BuiltinType.ANULL;
+            case 1:
+                if (nullEncountered) {
+                    return AUnionType.createNullableType(possibleTypes.get(0));
+                } else {
+                    return possibleTypes.get(0);
+                }
+        }
+        return null;
+    }
+}
\ No newline at end of file
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/types/ATypeTag.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/types/ATypeTag.java
index 9ce9b24..ffbbd64 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/types/ATypeTag.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/types/ATypeTag.java
@@ -15,6 +15,9 @@
 
 package edu.uci.ics.asterix.om.types;
 
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * There is a unique tag for each primitive type and for each kind of
  * non-primitive type in the object model.
@@ -72,6 +75,20 @@
         return value;
     }
 
-    public final static int TYPE_COUNT = ATypeTag.values().length;
+    public static final int TYPE_COUNT = ATypeTag.values().length;
+
+    public static final ATypeTag[] VALUE_TYPE_MAPPING;
+
+    static {
+        List<ATypeTag> typeList = new ArrayList<>();
+        for (ATypeTag tt : values()) {
+            int index = tt.value;
+            while (typeList.size() <= index) {
+                typeList.add(null);
+            }
+            typeList.set(index, tt);
+        }
+        VALUE_TYPE_MAPPING = typeList.toArray(new ATypeTag[typeList.size()]);
+    }
 
 }
\ No newline at end of file
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/constructors/AStringConstructorDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/constructors/AStringConstructorDescriptor.java
index 08105a2..7cf588e 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/constructors/AStringConstructorDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/constructors/AStringConstructorDescriptor.java
@@ -16,7 +16,16 @@
 
 import java.io.DataOutput;
 import java.io.IOException;
+import java.io.PrintStream;
+import java.io.UnsupportedEncodingException;
 
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ABooleanSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
 import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
 import edu.uci.ics.asterix.om.base.ANull;
 import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
@@ -32,13 +41,12 @@
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
 import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public class AStringConstructorDescriptor extends AbstractScalarFunctionDynamicDescriptor {
 
     private static final long serialVersionUID = 1L;
-    private final static byte SER_STRING_TYPE_TAG = ATypeTag.STRING.serialize();
-    private final static byte SER_NULL_TYPE_TAG = ATypeTag.NULL.serialize();
     public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
         public IFunctionDescriptor createFunctionDescriptor() {
             return new AStringConstructorDescriptor();
@@ -52,34 +60,109 @@
 
             @Override
             public ICopyEvaluator createEvaluator(final IDataOutputProvider output) throws AlgebricksException {
-                return new ICopyEvaluator() {
+                try {
+                    return new ICopyEvaluator() {
 
-                    private DataOutput out = output.getDataOutput();
-                    private ArrayBackedValueStorage outInput = new ArrayBackedValueStorage();
-                    private ICopyEvaluator eval = args[0].createEvaluator(outInput);
-                    private String errorMessage = "This can not be an instance of string";
-                    @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ANULL);
+                        private DataOutput out = output.getDataOutput();
+                        private ArrayBackedValueStorage outInput = new ArrayBackedValueStorage();
+                        private ByteArrayAccessibleOutputStream baaos = new ByteArrayAccessibleOutputStream();
+                        private PrintStream ps = new PrintStream(baaos, false, "UTF-8");
+                        private ICopyEvaluator eval = args[0].createEvaluator(outInput);
+                        @SuppressWarnings("unchecked")
+                        private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
+                                .getSerializerDeserializer(BuiltinType.ANULL);
 
-                    @Override
-                    public void evaluate(IFrameTupleReference tuple) throws AlgebricksException {
+                        @Override
+                        public void evaluate(IFrameTupleReference tuple) throws AlgebricksException {
+                            try {
+                                outInput.reset();
+                                eval.evaluate(tuple);
+                                byte[] serString = outInput.getByteArray();
 
-                        try {
-                            outInput.reset();
-                            eval.evaluate(tuple);
-                            byte[] serString = outInput.getByteArray();
-                            if (serString[0] == SER_STRING_TYPE_TAG) {
-                                out.write(outInput.getByteArray(), outInput.getStartOffset(), outInput.getLength());
-                            } else if (serString[0] == SER_NULL_TYPE_TAG)
-                                nullSerde.serialize(ANull.NULL, out);
-                            else
-                                throw new AlgebricksException(errorMessage);
-                        } catch (IOException e1) {
-                            throw new AlgebricksException(errorMessage);
+                                ATypeTag tt = ATypeTag.VALUE_TYPE_MAPPING[serString[0]];
+                                if (tt == ATypeTag.NULL) {
+                                    nullSerde.serialize(ANull.NULL, out);
+                                } else if (tt == ATypeTag.STRING) {
+                                    out.write(outInput.getByteArray(), outInput.getStartOffset(), outInput.getLength());
+                                } else {
+                                    baaos.write(0);
+                                    baaos.write(0);
+                                    switch (tt) {
+                                        case INT8: {
+                                            int i = AInt8SerializerDeserializer.getByte(outInput.getByteArray(), 1);
+                                            ps.print(i);
+                                            break;
+                                        }
+                                        case INT16: {
+                                            int i = AInt16SerializerDeserializer.getShort(outInput.getByteArray(), 1);
+                                            ps.print(i);
+                                            break;
+                                        }
+                                        case INT32: {
+                                            int i = AInt32SerializerDeserializer.getInt(outInput.getByteArray(), 1);
+                                            ps.print(i);
+                                            break;
+                                        }
+                                        case INT64: {
+                                            long l = AInt64SerializerDeserializer.getLong(outInput.getByteArray(), 1);
+                                            ps.print(l);
+                                            break;
+                                        }
+                                        case DOUBLE: {
+                                            double d = ADoubleSerializerDeserializer.getDouble(outInput.getByteArray(),
+                                                    1);
+                                            ps.print(d);
+                                            break;
+                                        }
+                                        case FLOAT: {
+                                            float f = AFloatSerializerDeserializer.getFloat(outInput.getByteArray(), 1);
+                                            ps.print(f);
+                                            break;
+                                        }
+                                        case BOOLEAN: {
+                                            boolean b = ABooleanSerializerDeserializer.getBoolean(
+                                                    outInput.getByteArray(), 1);
+                                            ps.print(b);
+                                            break;
+                                        }
+
+                                        // NotYetImplemented
+                                        case CIRCLE:
+                                        case DATE:
+                                        case DATETIME:
+                                        case LINE:
+                                        case TIME:
+                                        case DURATION:
+                                        case YEARMONTHDURATION:
+                                        case DAYTIMEDURATION:
+                                        case INTERVAL:
+                                        case ORDEREDLIST:
+                                        case POINT:
+                                        case POINT3D:
+                                        case RECTANGLE:
+                                        case POLYGON:
+                                        case RECORD:
+                                        case UNORDEREDLIST:
+                                        case UUID:
+                                        default:
+                                            throw new AlgebricksException("string of " + tt + " not supported");
+                                    }
+                                    ps.flush();
+                                    byte[] tmpStrBytes = baaos.getByteArray();
+                                    int utfLen = baaos.size() - 2;
+                                    tmpStrBytes[0] = (byte) ((utfLen >>> 8) & 0xFF);
+                                    tmpStrBytes[1] = (byte) ((utfLen >>> 0) & 0xFF);
+                                    out.write(ATypeTag.STRING.serialize());
+                                    out.write(tmpStrBytes);
+                                }
+                            } catch (IOException e) {
+                                throw new AlgebricksException(e);
+                            }
                         }
-                    }
-                };
+                    };
+                } catch (UnsupportedEncodingException e) {
+                    throw new AlgebricksException(e);
+                }
             }
         };
     }
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
index 41b1915..8f24dec 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
@@ -1,5 +1,5 @@
 /*
-x * Copyright 2009-2013 by The Regents of the University of California
+ * Copyright 2009-2013 by The Regents of the University of California
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from