fixed an issue related to correct evaluation of type associated with the output from feed adaptor
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/ConnectFeedStatement.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/ConnectFeedStatement.java
index 3ee53a6..10f5c9e 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/ConnectFeedStatement.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/ConnectFeedStatement.java
@@ -32,6 +32,10 @@
 import edu.uci.ics.asterix.metadata.entities.Feed;
 import edu.uci.ics.asterix.metadata.entities.Function;
 import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies;
+import edu.uci.ics.asterix.metadata.feeds.FeedUtil;
+import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
 
 public class ConnectFeedStatement implements Statement {
@@ -71,62 +75,62 @@
 
     public void initialize(MetadataTransactionContext mdTxnCtx, Dataset targetDataset, Feed sourceFeed)
             throws MetadataException {
-        query = new Query();
-        FunctionSignature appliedFunction = sourceFeed.getAppliedFunction();
-        Function function = null;
-        String adaptorOutputType = null;
-        if (appliedFunction != null) {
-            function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, appliedFunction);
-            if (function == null) {
-                throw new MetadataException(" Unknown function " + function);
-            } else {
-                if (function.getLanguage().equalsIgnoreCase(Function.LANGUAGE_AQL)) {
-                    adaptorOutputType = targetDataset.getItemTypeName();
-                } else {
-                    if (function.getParams().size() > 1) {
-                        throw new MetadataException(" Incompatible function: " + appliedFunction
-                                + " Number if arguments must be 1");
-                    }
-                    adaptorOutputType = function.getParams().get(0);
-                }
-            }
-        } else {
-            adaptorOutputType = targetDataset.getItemTypeName();
-        }
-        StringBuilder builder = new StringBuilder();
-        builder.append("set" + " " + FunctionUtils.IMPORT_PRIVATE_FUNCTIONS + " " + "'" + Boolean.TRUE + "'" + ";\n");
-        builder.append("insert into dataset " + datasetName + " ");
+    	   query = new Query();
+           FunctionSignature appliedFunction = sourceFeed.getAppliedFunction();
+           Function function = null;
+           String adapterOutputType = null;
+           if (appliedFunction != null) {
+               function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, appliedFunction);
+               if (function == null) {
+                   throw new MetadataException(" Unknown function " + function);
+               } else if (function.getParams().size() > 1) {
+                   throw new MetadataException(" Incompatible function: " + appliedFunction
+                           + " Number if arguments must be 1");
+               }
+           }
 
-        if (appliedFunction == null) {
-            builder.append(" (" + " for $x in feed-ingest ('" + feedName + "'" + "," + "'" + adaptorOutputType + "'"
-                    + "," + "'" + targetDataset.getDatasetName() + "'" + ")");
-            builder.append(" return $x");
-        } else {
-            if (function.getLanguage().equalsIgnoreCase(Function.LANGUAGE_AQL)) {
-                String param = function.getParams().get(0);
-                builder.append(" (" + " for" + " " + param + " in feed-ingest ('" + feedName + "'" + "," + "'"
-                        + adaptorOutputType + "'" + "," + "'" + targetDataset.getDatasetName() + "'" + ")");
-                builder.append(" let $y:=(" + function.getFunctionBody() + ")" + " return $y");
-            } else {
-                builder.append(" (" + " for $x in feed-ingest ('" + feedName + "'" + "," + "'" + adaptorOutputType
-                        + "'" + "," + "'" + targetDataset.getDatasetName() + "'" + ")");
-                builder.append(" let $y:=" + sourceFeed.getDataverseName() + "." + function.getName() + "(" + "$x"
-                        + ")");
-                builder.append(" return $y");
-            }
+           org.apache.commons.lang3.tuple.Pair<IAdapterFactory, ARecordType> factoryOutput = null;
+           try {
+               factoryOutput = FeedUtil.getFeedFactoryAndOutput(sourceFeed, mdTxnCtx);
+               adapterOutputType = factoryOutput.getRight().getTypeName();
+           } catch (AlgebricksException ae) {
+               throw new MetadataException(ae);
+           }
 
-        }
-        builder.append(")");
-        builder.append(";");
-        AQLParser parser = new AQLParser(new StringReader(builder.toString()));
+           StringBuilder builder = new StringBuilder();
+           builder.append("set" + " " + FunctionUtils.IMPORT_PRIVATE_FUNCTIONS + " " + "'" + Boolean.TRUE + "'" + ";\n");
+           builder.append("insert into dataset " + datasetName + " ");
 
-        List<Statement> statements;
-        try {
-            statements = parser.Statement();
-            query = ((InsertStatement) statements.get(1)).getQuery();
-        } catch (ParseException pe) {
-            throw new MetadataException(pe);
-        }
+           if (appliedFunction == null) {
+               builder.append(" (" + " for $x in feed-ingest ('" + feedName + "'" + "," + "'" + adapterOutputType + "'"
+                       + "," + "'" + targetDataset.getDatasetName() + "'" + ")");
+               builder.append(" return $x");
+           } else {
+               if (function.getLanguage().equalsIgnoreCase(Function.LANGUAGE_AQL)) {
+                   String param = function.getParams().get(0);
+                   builder.append(" (" + " for" + " " + param + " in feed-ingest ('" + feedName + "'" + "," + "'"
+                           + adapterOutputType + "'" + "," + "'" + targetDataset.getDatasetName() + "'" + ")");
+                   builder.append(" let $y:=(" + function.getFunctionBody() + ")" + " return $y");
+               } else {
+                   builder.append(" (" + " for $x in feed-ingest ('" + feedName + "'" + "," + "'" + adapterOutputType
+                           + "'" + "," + "'" + targetDataset.getDatasetName() + "'" + ")");
+                   builder.append(" let $y:=" + sourceFeed.getDataverseName() + "." + function.getName() + "(" + "$x"
+                           + ")");
+                   builder.append(" return $y");
+               }
+
+           }
+           builder.append(")");
+           builder.append(";");
+           AQLParser parser = new AQLParser(new StringReader(builder.toString()));
+
+           List<Statement> statements;
+           try {
+               statements = parser.Statement();
+               query = ((InsertStatement) statements.get(1)).getQuery();
+           } catch (ParseException pe) {
+               throw new MetadataException(pe);
+           }
 
     }
 
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 89368e7..b961158 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -60,6 +60,7 @@
 import edu.uci.ics.asterix.metadata.entities.Index;
 import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
 import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies;
+import edu.uci.ics.asterix.metadata.feeds.FeedUtil;
 import edu.uci.ics.asterix.metadata.feeds.EndFeedMessage;
 import edu.uci.ics.asterix.metadata.feeds.ExternalDataScanOperatorDescriptor;
 import edu.uci.ics.asterix.metadata.feeds.FeedConnectionId;
@@ -169,7 +170,7 @@
 
     private final AsterixStorageProperties storageProperties;
 
-    private static final Map<String, String> adapterFactoryMapping = initializeAdapterFactoryMapping();
+    public static final Map<String, String> adapterFactoryMapping = initializeAdapterFactoryMapping();
 
     public String getPropertyValue(String propertyName) {
         return config.get(propertyName);
@@ -412,65 +413,30 @@
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildFeedIntakeRuntime(JobSpecification jobSpec,
             IDataSource<AqlSourceId> dataSource) throws AlgebricksException {
 
-        DatasourceAdapter adapterEntity;
-        IAdapterFactory adapterFactory;
-        IAType adapterOutputType;
-        String adapterName;
-        String adapterFactoryClassname;
         FeedDataSource feedDataSource = (FeedDataSource) dataSource;
-        try {
-            adapterName = feedDataSource.getFeed().getAdaptorName();
-            adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME,
-                    adapterName);
-            if (adapterEntity != null) {
-                adapterFactoryClassname = adapterEntity.getClassname();
-                adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
-            } else {
-                adapterFactoryClassname = adapterFactoryMapping.get(adapterName);
-                if (adapterFactoryClassname != null) {
-                } else {
-                    adapterFactoryClassname = adapterName;
-                }
-                adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
-            }
-
-            Map<String, String> configuration = feedDataSource.getFeed().getAdaptorConfiguration();
-
-            switch (adapterFactory.getAdapterType()) {
-                case TYPED:
-                    adapterOutputType = ((ITypedAdapterFactory) adapterFactory).getAdapterOutputType();
-                    ((ITypedAdapterFactory) adapterFactory).configure(configuration);
-                    break;
-                case GENERIC:
-                    String outputTypeName = configuration.get("output-type-name");
-                    adapterOutputType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx,
-                            feedDataSource.getDatasourceDataverse(), outputTypeName).getDatatype();
-                    ((IGenericAdapterFactory) adapterFactory).configure(configuration, (ARecordType) adapterOutputType);
-                    break;
-                default:
-                    throw new IllegalStateException(" Unknown factory type for " + adapterFactoryClassname);
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-            throw new AlgebricksException("unable to create adapter  " + e);
-        }
-
-        ISerializerDeserializer payloadSerde = NonTaggedDataFormat.INSTANCE.getSerdeProvider()
-                .getSerializerDeserializer(adapterOutputType);
-        RecordDescriptor feedDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
-
-        FeedPolicy feedPolicy = (FeedPolicy) ((AqlDataSource) dataSource).getProperties().get(
-                BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY);
-
-        feedPolicy.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, feedPolicy.getPolicyName());
-        FeedIntakeOperatorDescriptor feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, new FeedConnectionId(
-                feedDataSource.getDatasourceDataverse(), feedDataSource.getDatasourceName(), feedDataSource
-                        .getFeedConnectionId().getDatasetName()), adapterFactory, (ARecordType) adapterOutputType,
-                feedDesc, feedPolicy.getProperties());
-
+        FeedIntakeOperatorDescriptor feedIngestor = null;
+        org.apache.commons.lang3.tuple.Pair<IAdapterFactory, ARecordType> factoryOutput = null;
         AlgebricksPartitionConstraint constraint = null;
+
         try {
-            constraint = adapterFactory.getPartitionConstraint();
+            factoryOutput = FeedUtil.getFeedFactoryAndOutput(feedDataSource.getFeed(), mdTxnCtx);
+            IAdapterFactory adapterFactory = factoryOutput.getLeft();
+            ARecordType adapterOutputType = factoryOutput.getRight();
+
+            ISerializerDeserializer payloadSerde = NonTaggedDataFormat.INSTANCE.getSerdeProvider()
+                    .getSerializerDeserializer(adapterOutputType);
+            RecordDescriptor feedDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
+
+            FeedPolicy feedPolicy = (FeedPolicy) ((AqlDataSource) dataSource).getProperties().get(
+                    BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY);
+
+            feedPolicy.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, feedPolicy.getPolicyName());
+            feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, new FeedConnectionId(
+                    feedDataSource.getDatasourceDataverse(), feedDataSource.getDatasourceName(), feedDataSource
+                            .getFeedConnectionId().getDatasetName()), adapterFactory, (ARecordType) adapterOutputType,
+                    feedDesc, feedPolicy.getProperties());
+
+            constraint = factoryOutput.getLeft().getPartitionConstraint();
         } catch (Exception e) {
             throw new AlgebricksException(e);
         }
@@ -1338,11 +1304,13 @@
     }
 
     /**
-     * Calculate an estimate size of the bloom filter. Note that this is an estimation which assumes that the data
-     * is going to be uniformly distributed across all partitions.
+     * Calculate an estimate size of the bloom filter. Note that this is an
+     * estimation which assumes that the data is going to be uniformly
+     * distributed across all partitions.
      * 
      * @param dataset
-     * @return Number of elements that will be used to create a bloom filter per dataset per partition
+     * @return Number of elements that will be used to create a bloom filter per
+     *         dataset per partition
      * @throws MetadataException
      * @throws AlgebricksException
      */
@@ -1568,13 +1536,14 @@
      * 
      * @param properties
      *            the original dataset properties
-     * @return a new map containing the original dataset properties and the scheduler/locations
+     * @return a new map containing the original dataset properties and the
+     *         scheduler/locations
      */
     private Map<String, Object> wrapProperties(Map<String, String> properties) {
         Map<String, Object> wrappedProperties = new HashMap<String, Object>();
         wrappedProperties.putAll(properties);
-        //wrappedProperties.put(SCHEDULER, hdfsScheduler);
-        //wrappedProperties.put(CLUSTER_LOCATIONS, getClusterLocations());
+        // wrappedProperties.put(SCHEDULER, hdfsScheduler);
+        // wrappedProperties.put(CLUSTER_LOCATIONS, getClusterLocations());
         return wrappedProperties;
     }
 
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedUtil.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
similarity index 77%
rename from asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedUtil.java
rename to asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
index c01664a..2ac657f 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedUtil.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
@@ -1,4 +1,4 @@
-package edu.uci.ics.asterix.file;
+package edu.uci.ics.asterix.metadata.feeds;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -11,15 +11,18 @@
 import org.apache.commons.lang3.tuple.Pair;
 
 import edu.uci.ics.asterix.common.dataflow.AsterixLSMTreeInsertDeleteOperatorDescriptor;
+import edu.uci.ics.asterix.metadata.MetadataManager;
+import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
+import edu.uci.ics.asterix.metadata.bootstrap.MetadataConstants;
+import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
+import edu.uci.ics.asterix.metadata.entities.Feed;
 import edu.uci.ics.asterix.metadata.entities.FeedActivity;
 import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
 import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
-import edu.uci.ics.asterix.metadata.feeds.FeedConnectionId;
-import edu.uci.ics.asterix.metadata.feeds.FeedIntakeOperatorDescriptor;
-import edu.uci.ics.asterix.metadata.feeds.FeedMetaOperatorDescriptor;
-import edu.uci.ics.asterix.metadata.feeds.FeedPolicyAccessor;
 import edu.uci.ics.asterix.metadata.feeds.FeedRuntime.FeedRuntimeType;
 import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
 import edu.uci.ics.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
@@ -188,5 +191,56 @@
         return altered;
 
     }
+    
+    
+    public static Pair<IAdapterFactory, ARecordType> getFeedFactoryAndOutput(Feed feed,
+            MetadataTransactionContext mdTxnCtx) throws AlgebricksException {
+
+        String adapterName = null;
+        DatasourceAdapter adapterEntity = null;
+        String adapterFactoryClassname = null;
+        IAdapterFactory adapterFactory = null;
+        ARecordType adapterOutputType = null;
+        Pair<IAdapterFactory, ARecordType> feedProps = null;
+        try {
+            adapterName = feed.getAdaptorName();
+            adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME,
+                    adapterName);
+            if (adapterEntity != null) {
+                adapterFactoryClassname = adapterEntity.getClassname();
+                adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
+            } else {
+                adapterFactoryClassname = AqlMetadataProvider.adapterFactoryMapping.get(adapterName);
+                if (adapterFactoryClassname != null) {
+                } else {
+                    adapterFactoryClassname = adapterName;
+                }
+                adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
+            }
+
+            Map<String, String> configuration = feed.getAdaptorConfiguration();
+
+            switch (adapterFactory.getAdapterType()) {
+                case TYPED:
+                    adapterOutputType = ((ITypedAdapterFactory) adapterFactory).getAdapterOutputType();
+                    ((ITypedAdapterFactory) adapterFactory).configure(configuration);
+                    break;
+                case GENERIC:
+                    String outputTypeName = configuration.get("output-type-name");
+                    adapterOutputType = (ARecordType) MetadataManager.INSTANCE.getDatatype(mdTxnCtx,
+                            feed.getDataverseName(), outputTypeName).getDatatype();
+                    ((IGenericAdapterFactory) adapterFactory).configure(configuration, (ARecordType) adapterOutputType);
+                    break;
+                default:
+                    throw new IllegalStateException(" Unknown factory type for " + adapterFactoryClassname);
+            }
+
+            feedProps = Pair.of(adapterFactory, adapterOutputType);
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new AlgebricksException("unable to create adapter  " + e);
+        }
+        return feedProps;
+    }
 
 }