checkpoit
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/UnnestToDataScanRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/UnnestToDataScanRule.java
index 34c5739..5fce340 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/UnnestToDataScanRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/UnnestToDataScanRule.java
@@ -12,6 +12,8 @@
 import edu.uci.ics.asterix.metadata.declared.ExternalFeedDataSource;
 import edu.uci.ics.asterix.metadata.entities.Dataset;
 import edu.uci.ics.asterix.metadata.entities.Dataverse;
+import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
+import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies;
 import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
 import edu.uci.ics.asterix.om.base.AString;
 import edu.uci.ics.asterix.om.constants.AsterixConstantValue;
@@ -97,7 +99,6 @@
                     }
                 }
                 v.add(unnest.getVariable());
-
                 DataSourceScanOperator scan = new DataSourceScanOperator(v, metadataProvider.findDataSource(asid));
                 List<Mutable<ILogicalOperator>> scanInpList = scan.getInputs();
                 scanInpList.addAll(unnest.getInputs());
@@ -141,11 +142,20 @@
                 }
 
                 AqlSourceId asid = new AqlSourceId(dataverseName, datasetName);
+                String policyName = metadataProvider.getConfig().get(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY);
+                FeedPolicy policy = metadataProvider.findFeedPolicy(dataverseName, policyName);
+                if (policy == null) {
+                    policy = BuiltinFeedPolicies.getFeedPolicy(policyName);
+                    if (policy == null) {
+                        throw new AlgebricksException("Unknown feed policy:" + policyName);
+                    }
+                }
+
                 ArrayList<LogicalVariable> v = new ArrayList<LogicalVariable>();
                 v.add(unnest.getVariable());
 
                 DataSourceScanOperator scan = new DataSourceScanOperator(v, createDummyFeedDataSource(asid, dataset,
-                        metadataProvider));
+                        metadataProvider, policy));
 
                 List<Mutable<ILogicalOperator>> scanInpList = scan.getInputs();
                 scanInpList.addAll(unnest.getInputs());
@@ -170,7 +180,7 @@
     }
 
     private AqlDataSource createDummyFeedDataSource(AqlSourceId aqlId, Dataset dataset,
-            AqlMetadataProvider metadataProvider) throws AlgebricksException {
+            AqlMetadataProvider metadataProvider, FeedPolicy feedPolicy) throws AlgebricksException {
         if (!aqlId.getDataverseName().equals(
                 metadataProvider.getDefaultDataverse() == null ? null : metadataProvider.getDefaultDataverse()
                         .getDataverseName())) {
@@ -180,6 +190,7 @@
         IAType itemType = metadataProvider.findType(dataset.getDataverseName(), tName);
         ExternalFeedDataSource extDataSource = new ExternalFeedDataSource(aqlId, dataset, itemType,
                 AqlDataSource.AqlDataSourceType.EXTERNAL_FEED);
+        extDataSource.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, feedPolicy);
         return extDataSource;
     }
 
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/CompiledStatements.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/CompiledStatements.java
index a91f2eb..b2702ae 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/CompiledStatements.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/CompiledStatements.java
@@ -362,12 +362,15 @@
     public static class CompiledBeginFeedStatement implements ICompiledDmlStatement {
         private String dataverseName;
         private String datasetName;
+        private String policyName;
         private Query query;
         private int varCounter;
 
-        public CompiledBeginFeedStatement(String dataverseName, String datasetName, Query query, int varCounter) {
+        public CompiledBeginFeedStatement(String dataverseName, String datasetName, String policyName, Query query,
+                int varCounter) {
             this.dataverseName = dataverseName;
             this.datasetName = datasetName;
+            this.policyName = policyName;
             this.query = query;
             this.varCounter = varCounter;
         }
@@ -398,6 +401,10 @@
         public Kind getKind() {
             return Kind.BEGIN_FEED;
         }
+
+        public String getPolicyName() {
+            return policyName;
+        }
     }
 
     public static class CompiledControlFeedStatement implements ICompiledDmlStatement {
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index 14eea0d..7ad75c8 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -64,7 +64,6 @@
 import edu.uci.ics.asterix.common.config.GlobalConfig;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.common.functions.FunctionSignature;
-import edu.uci.ics.asterix.external.feed.lifecycle.FeedActivityIdFactory;
 import edu.uci.ics.asterix.file.DatasetOperations;
 import edu.uci.ics.asterix.file.DataverseOperations;
 import edu.uci.ics.asterix.file.FeedOperations;
@@ -83,11 +82,12 @@
 import edu.uci.ics.asterix.metadata.entities.FeedActivity;
 import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
 import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails;
+import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails.FeedState;
 import edu.uci.ics.asterix.metadata.entities.Function;
 import edu.uci.ics.asterix.metadata.entities.Index;
 import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
 import edu.uci.ics.asterix.metadata.entities.NodeGroup;
-import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails.FeedState;
+import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.ATypeTag;
 import edu.uci.ics.asterix.om.types.IAType;
@@ -1349,7 +1349,7 @@
                     : activeDefaultDataverse.getDataverseName() : bfs.getDataverseName().getValue();
 
             CompiledBeginFeedStatement cbfs = new CompiledBeginFeedStatement(dataverseName, bfs.getDatasetName()
-                    .getValue(), bfs.getQuery(), bfs.getVarCounter());
+                    .getValue(), bfs.getPolicy(), bfs.getQuery(), bfs.getVarCounter());
 
             Dataset dataset = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(),
                     dataverseName, bfs.getDatasetName().getValue());
@@ -1364,7 +1364,7 @@
             bfs.initialize(metadataProvider.getMetadataTxnContext(), dataset);
             cbfs.setQuery(bfs.getQuery());
             metadataProvider.getConfig().put(FunctionUtils.IMPORT_PRIVATE_FUNCTIONS, "" + Boolean.TRUE);
-
+            metadataProvider.getConfig().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, cbfs.getPolicyName());
             JobSpecification compiled = rewriteCompileQuery(metadataProvider, bfs.getQuery(), cbfs);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java
index ce34076..a95ea36 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java
@@ -20,14 +20,14 @@
 
 import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.external.feed.lifecycle.AlterFeedMessage;
-import edu.uci.ics.asterix.external.feed.lifecycle.FeedMessage;
-import edu.uci.ics.asterix.external.feed.lifecycle.IFeedMessage;
-import edu.uci.ics.asterix.external.feed.lifecycle.IFeedMessage.MessageType;
 import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
 import edu.uci.ics.asterix.metadata.entities.Dataset;
 import edu.uci.ics.asterix.metadata.entities.FeedActivity;
 import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails;
+import edu.uci.ics.asterix.metadata.feeds.AlterFeedMessage;
+import edu.uci.ics.asterix.metadata.feeds.FeedMessage;
+import edu.uci.ics.asterix.metadata.feeds.IFeedMessage;
+import edu.uci.ics.asterix.metadata.feeds.IFeedMessage.MessageType;
 import edu.uci.ics.asterix.translator.CompiledStatements.CompiledControlFeedStatement;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index 0faf95e..4c42ebd 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -16,11 +16,10 @@
 import edu.uci.ics.asterix.common.api.AsterixAppContextInfo;
 import edu.uci.ics.asterix.common.config.AsterixExternalProperties;
 import edu.uci.ics.asterix.common.config.AsterixMetadataProperties;
-import edu.uci.ics.asterix.external.feed.lifecycle.FeedActivityIdFactory;
 import edu.uci.ics.asterix.metadata.MetadataManager;
 import edu.uci.ics.asterix.metadata.api.IAsterixStateProxy;
 import edu.uci.ics.asterix.metadata.bootstrap.AsterixStateProxy;
-import edu.uci.ics.asterix.metadata.declared.FeedJobLifecycleListener;
+import edu.uci.ics.asterix.metadata.feeds.FeedJobLifecycleListener;
 import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
 import edu.uci.ics.hyracks.api.application.ICCApplicationEntryPoint;
 import edu.uci.ics.hyracks.api.client.HyracksConnection;
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index 7dad984..cc985ce 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -105,12 +105,14 @@
         isMetadataNode = nodeId.equals(metadataProperties.getMetadataNodeName());
         if (isMetadataNode) {
             registerRemoteMetadataNode(proxy);
+        }
+        MetadataManager.INSTANCE = new MetadataManager(proxy, metadataProperties);
+        MetadataManager.INSTANCE.init();
 
+        if (isMetadataNode) {
             if (LOGGER.isLoggable(Level.INFO)) {
                 LOGGER.info("Bootstrapping metadata");
             }
-            MetadataManager.INSTANCE = new MetadataManager(proxy, metadataProperties);
-            MetadataManager.INSTANCE.init();
             MetadataBootstrap.startUniverse(runtimeContext, ncApplicationContext,
                     systemState == SystemState.NEW_UNIVERSE);
             MetadataBootstrap.startDDLRecovery();
diff --git a/asterix-app/src/test/resources/runtimets/queries/distinct/query-issue443-2/query-issue443-2.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/distinct/query-issue443-2/query-issue443-2.1.ddl.aql
deleted file mode 100644
index e69de29..0000000
--- a/asterix-app/src/test/resources/runtimets/queries/distinct/query-issue443-2/query-issue443-2.1.ddl.aql
+++ /dev/null
diff --git a/asterix-app/src/test/resources/runtimets/queries/distinct/query-issue443-2/query-issue443-2.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/distinct/query-issue443-2/query-issue443-2.2.update.aql
deleted file mode 100644
index e69de29..0000000
--- a/asterix-app/src/test/resources/runtimets/queries/distinct/query-issue443-2/query-issue443-2.2.update.aql
+++ /dev/null
diff --git a/asterix-app/src/test/resources/runtimets/queries/distinct/query-issue443/query-issue443.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/distinct/query-issue443/query-issue443.1.ddl.aql
deleted file mode 100644
index e69de29..0000000
--- a/asterix-app/src/test/resources/runtimets/queries/distinct/query-issue443/query-issue443.1.ddl.aql
+++ /dev/null
diff --git a/asterix-app/src/test/resources/runtimets/queries/distinct/query-issue443/query-issue443.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/distinct/query-issue443/query-issue443.2.update.aql
deleted file mode 100644
index e69de29..0000000
--- a/asterix-app/src/test/resources/runtimets/queries/distinct/query-issue443/query-issue443.2.update.aql
+++ /dev/null
diff --git a/asterix-app/src/test/resources/runtimets/queries/open-closed/query-issue442/query-issue442.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/open-closed/query-issue442/query-issue442.1.ddl.aql
deleted file mode 100644
index e69de29..0000000
--- a/asterix-app/src/test/resources/runtimets/queries/open-closed/query-issue442/query-issue442.1.ddl.aql
+++ /dev/null
diff --git a/asterix-app/src/test/resources/runtimets/queries/open-closed/query-issue442/query-issue442.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/open-closed/query-issue442/query-issue442.2.update.aql
deleted file mode 100644
index e69de29..0000000
--- a/asterix-app/src/test/resources/runtimets/queries/open-closed/query-issue442/query-issue442.2.update.aql
+++ /dev/null
diff --git a/asterix-app/src/test/resources/runtimets/results/open-closed/query-issue423/query-issue423.1.adm b/asterix-app/src/test/resources/runtimets/results/open-closed/query-issue423/query-issue423.1.adm
deleted file mode 100644
index e69de29..0000000
--- a/asterix-app/src/test/resources/runtimets/results/open-closed/query-issue423/query-issue423.1.adm
+++ /dev/null
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/BeginFeedStatement.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/BeginFeedStatement.java
index d7de106..6b9006e 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/BeginFeedStatement.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/BeginFeedStatement.java
@@ -10,7 +10,6 @@
 import edu.uci.ics.asterix.aql.parser.ParseException;
 import edu.uci.ics.asterix.aql.util.FunctionUtils;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.common.functions.FunctionConstants;
 import edu.uci.ics.asterix.common.functions.FunctionSignature;
 import edu.uci.ics.asterix.metadata.MetadataException;
 import edu.uci.ics.asterix.metadata.MetadataManager;
@@ -18,17 +17,20 @@
 import edu.uci.ics.asterix.metadata.entities.Dataset;
 import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails;
 import edu.uci.ics.asterix.metadata.entities.Function;
+import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies;
 
 public class BeginFeedStatement implements Statement {
 
     private final Identifier dataverseName;
     private final Identifier datasetName;
+    private final String policy;
     private Query query;
     private int varCounter;
 
-    public BeginFeedStatement(Identifier dataverseName, Identifier datasetName, int varCounter) {
+    public BeginFeedStatement(Identifier dataverseName, Identifier datasetName, String policy, int varCounter) {
         this.dataverseName = dataverseName;
         this.datasetName = datasetName;
+        this.policy = policy != null ? policy : BuiltinFeedPolicies.DEFAULT_POLICY.getPolicyName();
         this.varCounter = varCounter;
     }
 
@@ -96,6 +98,10 @@
         return Kind.BEGIN_FEED;
     }
 
+    public String getPolicy() {
+        return policy;
+    }
+
     @Override
     public <R, T> R accept(IAqlExpressionVisitor<R, T> visitor, T arg) throws AsterixException {
         return visitor.visitBeginFeedStatement(this, arg);
diff --git a/asterix-aql/src/main/javacc/AQL.jj b/asterix-aql/src/main/javacc/AQL.jj
index 2a5f534..a00ec65 100644
--- a/asterix-aql/src/main/javacc/AQL.jj
+++ b/asterix-aql/src/main/javacc/AQL.jj
@@ -498,6 +498,18 @@
     }
 }
 
+String GetPolicy() throws ParseException:
+{
+  String policy = null;
+}
+{
+   "using" "policy" policy = Identifier() 
+   {
+     return policy;
+   }
+   
+}   
+
 FunctionSignature FunctionSignature() throws ParseException:
 {
   Pair<Identifier,Identifier> pairId = null;
@@ -758,12 +770,13 @@
   Pair<Identifier,Identifier> nameComponents = null;
   Map<String,String> configuration = null;
   Statement stmt = null;
+  String policy = null;
 }
 {
   (
-    "begin" "feed" nameComponents = QualifiedName()
+    "begin" "feed" nameComponents = QualifiedName()  (policy = GetPolicy())? 
       {
-        stmt = new BeginFeedStatement(nameComponents.first, nameComponents.second, getVarCounter());
+        stmt = new BeginFeedStatement(nameComponents.first, nameComponents.second, policy, getVarCounter());
       }
     | "suspend" "feed" nameComponents = QualifiedName()
       {
diff --git a/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventExecutor.java b/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventExecutor.java
index e702ef3..63a791b 100644
--- a/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventExecutor.java
+++ b/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventExecutor.java
@@ -18,7 +18,6 @@
 import java.io.File;
 import java.io.IOException;
 import java.io.StringWriter;
-import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.List;
 
diff --git a/asterix-external-data/pom.xml b/asterix-external-data/pom.xml
index 2e99b7c..b54d032 100644
--- a/asterix-external-data/pom.xml
+++ b/asterix-external-data/pom.xml
@@ -96,6 +96,13 @@
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
+			<groupId>edu.uci.ics.asterix</groupId>
+			<artifactId>asterix-metadata</artifactId>
+			<version>0.0.6-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
 			<groupId>com.kenai.nbpwr</groupId>
 			<artifactId>org-apache-commons-io</artifactId>
 			<version>1.3.1-201002241208</version>
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
index f1f5884..d3a1995 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
@@ -17,7 +17,8 @@
 import java.util.Map;
 
 import edu.uci.ics.asterix.external.dataset.adapter.CNNFeedAdapter;
-import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.ITypedDatasetAdapterFactory;
 
 /**
  * A factory class for creating the @see {CNNFeedAdapter}.  
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
index 6fcb710..7cd3c9a 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
@@ -17,7 +17,8 @@
 import java.util.Map;
 
 import edu.uci.ics.asterix.external.dataset.adapter.HDFSAdapter;
-import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IGenericDatasetAdapterFactory;
 import edu.uci.ics.asterix.om.types.IAType;
 
 /**
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
index 5e28eed..f724403 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
@@ -17,7 +17,8 @@
 import java.util.Map;
 
 import edu.uci.ics.asterix.external.dataset.adapter.HiveAdapter;
-import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IGenericDatasetAdapterFactory;
 import edu.uci.ics.asterix.om.types.IAType;
 
 /**
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
index 2040949..9706a77 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
@@ -16,8 +16,9 @@
 
 import java.util.Map;
 
-import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
 import edu.uci.ics.asterix.external.dataset.adapter.NCFileSystemAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IGenericDatasetAdapterFactory;
 import edu.uci.ics.asterix.om.types.IAType;
 
 /**
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
index bc00469..fd8f24f 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
@@ -16,8 +16,9 @@
 
 import java.util.Map;
 
-import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
 import edu.uci.ics.asterix.external.dataset.adapter.PullBasedTwitterAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.ITypedDatasetAdapterFactory;
 
 /**
  * Factory class for creating an instance of PullBasedTwitterAdapter.
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java
index bbbea38..e9c37fa 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java
@@ -16,8 +16,9 @@
 
 import java.util.Map;
 
-import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
 import edu.uci.ics.asterix.external.dataset.adapter.RSSFeedAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.ITypedDatasetAdapterFactory;
 
 /**
  * Factory class for creating an instance of @see {RSSFeedAdapter}.
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorDescriptor.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorDescriptor.java
deleted file mode 100644
index 29d0c94..0000000
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorDescriptor.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.external.data.operator;
-
-import java.util.Map;
-
-import edu.uci.ics.asterix.external.adapter.factory.IAdapterFactory;
-import edu.uci.ics.asterix.external.adapter.factory.IGenericDatasetAdapterFactory;
-import edu.uci.ics.asterix.external.adapter.factory.ITypedDatasetAdapterFactory;
-import edu.uci.ics.asterix.external.dataset.adapter.ITypedDatasourceAdapter;
-import edu.uci.ics.asterix.external.feed.lifecycle.FeedId;
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
-import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-
-/**
- * Operator responsible for ingesting data from an external source. This
- * operator uses a (configurable) adapter associated with the feed dataset.
- */
-public class FeedIntakeOperatorDescriptor extends
-		AbstractSingleActivityOperatorDescriptor {
-
-	private static final long serialVersionUID = 1L;
-
-	private final String adapterFactoryClassName;
-	private final Map<String, String> adapterConfiguration;
-	private final IAType atype;
-	private final FeedId feedId;
-
-	private transient IAdapterFactory datasourceAdapterFactory;
-
-	public FeedIntakeOperatorDescriptor(JobSpecification spec, FeedId feedId,
-			String adapter, Map<String, String> arguments, ARecordType atype,
-			RecordDescriptor rDesc) {
-		super(spec, 1, 1);
-		recordDescriptors[0] = rDesc;
-		this.adapterFactoryClassName = adapter;
-		this.adapterConfiguration = arguments;
-		this.atype = atype;
-		this.feedId = feedId;
-	}
-
-	public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
-			IRecordDescriptorProvider recordDescProvider, final int partition,
-			int nPartitions) throws HyracksDataException {
-		ITypedDatasourceAdapter adapter;
-		try {
-			datasourceAdapterFactory = (IAdapterFactory) Class.forName(
-					adapterFactoryClassName).newInstance();
-			if (datasourceAdapterFactory instanceof IGenericDatasetAdapterFactory) {
-				adapter = (ITypedDatasourceAdapter) ((IGenericDatasetAdapterFactory) datasourceAdapterFactory)
-						.createAdapter(adapterConfiguration, atype);
-			} else if (datasourceAdapterFactory instanceof ITypedDatasetAdapterFactory) {
-				adapter = (ITypedDatasourceAdapter) ((ITypedDatasetAdapterFactory) datasourceAdapterFactory)
-						.createAdapter(adapterConfiguration);
-			} else {
-				throw new IllegalStateException(
-						" Unknown adapter factory type for "
-								+ adapterFactoryClassName);
-			}
-			adapter.initialize(ctx);
-		} catch (Exception e) {
-			throw new HyracksDataException("initialization of adapter failed",
-					e);
-		}
-		return new FeedIntakeOperatorNodePushable(feedId, adapter, partition);
-	}
-
-	public FeedId getFeedId() {
-		return feedId;
-	}
-}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorNodePushable.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorNodePushable.java
deleted file mode 100644
index 4d10a81..0000000
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorNodePushable.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.external.data.operator;
-
-import java.nio.ByteBuffer;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
-import edu.uci.ics.asterix.external.feed.lifecycle.AlterFeedMessage;
-import edu.uci.ics.asterix.external.feed.lifecycle.FeedId;
-import edu.uci.ics.asterix.external.feed.lifecycle.FeedManager;
-import edu.uci.ics.asterix.external.feed.lifecycle.IFeedManager;
-import edu.uci.ics.asterix.external.feed.lifecycle.IFeedMessage;
-import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
-
-/**
- * The runtime for @see{FeedIntakeOperationDescriptor}
- */
-public class FeedIntakeOperatorNodePushable extends
-		AbstractUnaryInputUnaryOutputOperatorNodePushable {
-
-	private final IDatasourceAdapter adapter;
-	private final int partition;
-	private final IFeedManager feedManager;
-	private final FeedId feedId;
-	private final LinkedBlockingQueue<IFeedMessage> inbox;
-	private FeedInboxMonitor feedInboxMonitor;
-
-	public FeedIntakeOperatorNodePushable(FeedId feedId,
-			IDatasourceAdapter adapter, int partition) {
-		this.adapter = adapter;
-		this.partition = partition;
-		this.feedManager = (IFeedManager) FeedManager.INSTANCE;
-		this.feedId = feedId;
-		inbox = new LinkedBlockingQueue<IFeedMessage>();
-	}
-
-	@Override
-	public void open() throws HyracksDataException {
-		if (adapter instanceof IManagedFeedAdapter) {
-			feedInboxMonitor = new FeedInboxMonitor(
-					(IManagedFeedAdapter) adapter, inbox, partition);
-			feedInboxMonitor.start();
-			feedManager.registerFeedMsgQueue(feedId, inbox);
-		}
-		writer.open();
-		try {
-			adapter.start(partition, writer);
-		} catch (Exception e) {
-			e.printStackTrace();
-			throw new HyracksDataException(e);
-		} finally {
-			writer.close();
-			if (adapter instanceof IManagedFeedAdapter) {
-				feedManager.unregisterFeedMsgQueue(feedId, inbox);
-			}
-		}
-	}
-
-	@Override
-	public void fail() throws HyracksDataException {
-		writer.close();
-	}
-
-	@Override
-	public void close() throws HyracksDataException {
-
-	}
-
-	@Override
-	public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-		// do nothing
-	}
-}
-
-class FeedInboxMonitor extends Thread {
-
-	private LinkedBlockingQueue<IFeedMessage> inbox;
-	private final IManagedFeedAdapter adapter;
-
-	public FeedInboxMonitor(IManagedFeedAdapter adapter,
-			LinkedBlockingQueue<IFeedMessage> inbox, int partition) {
-		this.inbox = inbox;
-		this.adapter = adapter;
-	}
-
-	@Override
-	public void run() {
-		while (true) {
-			try {
-				IFeedMessage feedMessage = inbox.take();
-				switch (feedMessage.getMessageType()) {
-				case STOP:
-					adapter.stop();
-					break;
-				case ALTER:
-					adapter.alter(((AlterFeedMessage) feedMessage)
-							.getAlteredConfParams());
-					break;
-				}
-			} catch (InterruptedException ie) {
-				break;
-			} catch (Exception e) {
-				throw new RuntimeException(e);
-			}
-		}
-	}
-
-}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java
index 3898f7e..7a8491a 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java
@@ -19,7 +19,8 @@
 import java.util.List;
 import java.util.Map;
 
-import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IManagedFeedAdapter;
 
 /**
  * An Adapter that provides the functionality of fetching news feed from CNN service
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java
index 9f8cedc..c520b9f 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java
@@ -24,6 +24,7 @@
 import edu.uci.ics.asterix.external.util.DNSResolverFactory;
 import edu.uci.ics.asterix.external.util.INodeResolver;
 import edu.uci.ics.asterix.external.util.INodeResolverFactory;
+import edu.uci.ics.asterix.metadata.feeds.AbstractDatasourceAdapter;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.ATypeTag;
 import edu.uci.ics.asterix.om.types.IAType;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java
index 3731eba..1c49373 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java
@@ -16,6 +16,7 @@
 
 import java.util.Map;
 
+import edu.uci.ics.asterix.metadata.feeds.AbstractDatasourceAdapter;
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
index 634656d..3d1c2aa 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
@@ -15,12 +15,17 @@
 package edu.uci.ics.asterix.external.dataset.adapter;
 
 import java.nio.ByteBuffer;
+import java.rmi.RemoteException;
 import java.util.Map;
 import java.util.logging.Logger;
 
 import edu.uci.ics.asterix.external.dataset.adapter.IPullBasedFeedClient.InflowState;
-import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
+import edu.uci.ics.asterix.metadata.feeds.AbstractFeedDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.FeedPolicyEnforcer;
+import edu.uci.ics.asterix.metadata.feeds.IManagedFeedAdapter;
+import edu.uci.ics.asterix.metadata.feeds.ITypedDatasourceAdapter;
 import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -32,7 +37,7 @@
  * Captures the common logic for obtaining bytes from an external source
  * and packing them into frames as tuples.
  */
-public abstract class PullBasedAdapter extends AbstractDatasourceAdapter implements ITypedDatasourceAdapter,
+public abstract class PullBasedAdapter extends AbstractFeedDatasourceAdapter implements ITypedDatasourceAdapter,
         IManagedFeedAdapter {
 
     private static final long serialVersionUID = 1L;
@@ -46,9 +51,15 @@
     protected boolean continueIngestion = true;
     protected boolean alterRequested = false;
     private Map<String, String> modifiedConfiguration = null;
+    private long tupleCount = 0;
+    private FeedPolicyEnforcer policyEnforcer;
 
     public abstract IPullBasedFeedClient getFeedClient(int partition) throws Exception;
 
+    public long getIngestedRecordsCount() {
+        return tupleCount;
+    }
+
     public void alter(Map<String, String> modifedConfiguration) {
         this.modifiedConfiguration = modifedConfiguration;
     }
@@ -69,13 +80,14 @@
                     case DATA_AVAILABLE:
                         tupleBuilder.addFieldEndOffset();
                         appendTupleToFrame(writer);
+                        tupleCount++;
                         break;
                     case NO_MORE_DATA:
                         LOGGER.info("Reached end of feed");
                         FrameUtils.flushFrame(frame, writer);
                         continueIngestion = false;
                         break;
-                    case DATA_NOT_AVAILABLE: 
+                    case DATA_NOT_AVAILABLE:
                         break;
                 }
                 if (alterRequested) {
@@ -87,9 +99,14 @@
                 }
             } catch (Exception failureException) {
                 try {
-                    pullBasedFeedClient.resetOnFailure(failureException);
-                    tupleBuilder.reset();
-                    continue;
+                    boolean continueIngestion = policyEnforcer.handleSoftwareFailure(failureException);
+                    if (continueIngestion) {
+                        pullBasedFeedClient.resetOnFailure(failureException);
+                        tupleBuilder.reset();
+                        continue;
+                    } else {
+                        throw failureException;
+                    }
                 } catch (Exception recoveryException) {
                     throw new Exception(recoveryException);
                 }
@@ -118,8 +135,10 @@
      * 
      * @throws Exception
      */
-    public void stop() {
+    public void stop() throws Exception {
         continueIngestion = false;
+        dumpStatistics();
+        timer.cancel();
     }
 
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
index ef69018..b658d1a 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
@@ -16,7 +16,7 @@
 
 import java.util.Map;
 
-import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IManagedFeedAdapter;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.BuiltinType;
 import edu.uci.ics.asterix.om.types.IAType;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
index 611183c..b3f4dd1 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
@@ -19,7 +19,7 @@
 import java.util.List;
 import java.util.Map;
 
-import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IManagedFeedAdapter;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.BuiltinType;
 import edu.uci.ics.asterix.om.types.IAType;
diff --git a/asterix-metadata/pom.xml b/asterix-metadata/pom.xml
index 425e8ff..58d7b2a 100644
--- a/asterix-metadata/pom.xml
+++ b/asterix-metadata/pom.xml
@@ -37,7 +37,7 @@
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.asterix</groupId>
-			<artifactId>asterix-external-data</artifactId>
+			<artifactId>asterix-runtime</artifactId>
 			<version>0.0.6-SNAPSHOT</version>
 			<scope>compile</scope>
 		</dependency>
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataCache.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataCache.java
index a4be0f8..89b75d7 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataCache.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataCache.java
@@ -21,16 +21,17 @@
 import java.util.Map;
 
 import edu.uci.ics.asterix.common.functions.FunctionSignature;
-import edu.uci.ics.asterix.external.feed.lifecycle.FeedId;
 import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
 import edu.uci.ics.asterix.metadata.entities.Dataset;
 import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
 import edu.uci.ics.asterix.metadata.entities.Datatype;
 import edu.uci.ics.asterix.metadata.entities.Dataverse;
 import edu.uci.ics.asterix.metadata.entities.FeedActivity;
+import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
 import edu.uci.ics.asterix.metadata.entities.Function;
 import edu.uci.ics.asterix.metadata.entities.Index;
 import edu.uci.ics.asterix.metadata.entities.NodeGroup;
+import edu.uci.ics.asterix.metadata.feeds.FeedId;
 
 /**
  * Caches metadata entities such that the MetadataManager does not have to
@@ -56,6 +57,9 @@
     // Key is FeedId   
     protected final Map<FeedId, FeedActivity> feedActivity = new HashMap<FeedId, FeedActivity>();
 
+    // Key is DataverseName, Key of the value map is the Policy name   
+    protected final Map<String, Map<String, FeedPolicy>> feedPolicies = new HashMap<String, Map<String, FeedPolicy>>();
+
     // Atomically executes all metadata operations in ctx's log.
     public void commit(MetadataTransactionContext ctx) {
         // Forward roll the operations written in ctx's log.
@@ -395,6 +399,32 @@
         }
     }
 
+    public Object addFeedPolicyIfNotExists(FeedPolicy feedPolicy) {
+        synchronized (feedPolicy) {
+            Map<String, FeedPolicy> p = feedPolicies.get(feedPolicy.getDataverseName());
+            if (p == null) {
+                p = new HashMap<String, FeedPolicy>();
+                p.put(feedPolicy.getPolicyName(), feedPolicy);
+                feedPolicies.put(feedPolicy.getDataverseName(), p);
+            } else {
+                if (p.get(feedPolicy.getPolicyName()) == null) {
+                    p.put(feedPolicy.getPolicyName(), feedPolicy);
+                }
+            }
+            return null;
+        }
+    }
+
+    public Object dropFeedPolicy(FeedPolicy feedPolicy) {
+        synchronized (feedPolicies) {
+            Map<String, FeedPolicy> p = feedPolicies.get(feedPolicy.getDataverseName());
+            if (p != null && p.get(feedPolicy.getPolicyName()) != null) {
+                return p.remove(feedPolicy).getPolicyName();
+            }
+            return null;
+        }
+    }
+
     public Object addAdapterIfNotExists(DatasourceAdapter adapter) {
         synchronized (adapters) {
             DatasourceAdapter adapterObject = adapters.get(adapter.getAdapterIdentifier().getNamespace()).get(
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataException.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataException.java
index fe613da..441816d 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataException.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataException.java
@@ -17,6 +17,7 @@
 
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
 
+
 public class MetadataException extends AsterixException {
     private static final long serialVersionUID = 1L;
 
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
index 73df2a8..6362cfd 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
@@ -22,7 +22,6 @@
 
 import edu.uci.ics.asterix.common.config.AsterixMetadataProperties;
 import edu.uci.ics.asterix.common.functions.FunctionSignature;
-import edu.uci.ics.asterix.external.feed.lifecycle.FeedId;
 import edu.uci.ics.asterix.metadata.api.IAsterixStateProxy;
 import edu.uci.ics.asterix.metadata.api.IMetadataManager;
 import edu.uci.ics.asterix.metadata.api.IMetadataNode;
@@ -31,10 +30,12 @@
 import edu.uci.ics.asterix.metadata.entities.Datatype;
 import edu.uci.ics.asterix.metadata.entities.Dataverse;
 import edu.uci.ics.asterix.metadata.entities.FeedActivity;
+import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
 import edu.uci.ics.asterix.metadata.entities.Function;
 import edu.uci.ics.asterix.metadata.entities.Index;
 import edu.uci.ics.asterix.metadata.entities.Node;
 import edu.uci.ics.asterix.metadata.entities.NodeGroup;
+import edu.uci.ics.asterix.metadata.feeds.FeedId;
 import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
 import edu.uci.ics.asterix.transaction.management.service.transaction.JobId;
 import edu.uci.ics.asterix.transaction.management.service.transaction.JobIdFactory;
@@ -543,6 +544,16 @@
     }
 
     @Override
+    public void addFeedPolicy(MetadataTransactionContext mdTxnCtx, FeedPolicy feedPolicy) throws MetadataException {
+        try {
+            metadataNode.addFeedPolicy(mdTxnCtx.getJobId(), feedPolicy);
+        } catch (RemoteException e) {
+            throw new MetadataException(e);
+        }
+        mdTxnCtx.addFeedPolicy(feedPolicy);
+    }
+
+    @Override
     public void initializeDatasetIdFactory(MetadataTransactionContext ctx) throws MetadataException {
         try {
             metadataNode.initializeDatasetIdFactory(ctx.getJobId());
@@ -640,4 +651,17 @@
         metadataLatch.readLock().unlock();
     }
 
+    @Override
+    public FeedPolicy getFeedPolicy(MetadataTransactionContext ctx, String dataverse, String policyName)
+            throws MetadataException {
+
+        FeedPolicy FeedPolicy = null;
+        try {
+            FeedPolicy = metadataNode.getFeedPolicy(ctx.getJobId(), dataverse, policyName);
+        } catch (RemoteException e) {
+            throw new MetadataException(e);
+        }
+        return FeedPolicy;
+    }
+
 }
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
index d363d01..cad1e33 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
@@ -25,8 +25,6 @@
 import edu.uci.ics.asterix.common.context.AsterixAppRuntimeContext;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.common.functions.FunctionSignature;
-import edu.uci.ics.asterix.external.feed.lifecycle.FeedActivityIdFactory;
-import edu.uci.ics.asterix.external.feed.lifecycle.FeedId;
 import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
 import edu.uci.ics.asterix.metadata.api.IMetadataIndex;
 import edu.uci.ics.asterix.metadata.api.IMetadataNode;
@@ -38,6 +36,7 @@
 import edu.uci.ics.asterix.metadata.entities.Datatype;
 import edu.uci.ics.asterix.metadata.entities.Dataverse;
 import edu.uci.ics.asterix.metadata.entities.FeedActivity;
+import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
 import edu.uci.ics.asterix.metadata.entities.Function;
 import edu.uci.ics.asterix.metadata.entities.Index;
 import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
@@ -48,10 +47,13 @@
 import edu.uci.ics.asterix.metadata.entitytupletranslators.DatatypeTupleTranslator;
 import edu.uci.ics.asterix.metadata.entitytupletranslators.DataverseTupleTranslator;
 import edu.uci.ics.asterix.metadata.entitytupletranslators.FeedActivityTupleTranslator;
+import edu.uci.ics.asterix.metadata.entitytupletranslators.FeedPolicyTupleTranslator;
 import edu.uci.ics.asterix.metadata.entitytupletranslators.FunctionTupleTranslator;
 import edu.uci.ics.asterix.metadata.entitytupletranslators.IndexTupleTranslator;
 import edu.uci.ics.asterix.metadata.entitytupletranslators.NodeGroupTupleTranslator;
 import edu.uci.ics.asterix.metadata.entitytupletranslators.NodeTupleTranslator;
+import edu.uci.ics.asterix.metadata.feeds.FeedActivityIdFactory;
+import edu.uci.ics.asterix.metadata.feeds.FeedId;
 import edu.uci.ics.asterix.metadata.valueextractors.DatasetNameValueExtractor;
 import edu.uci.ics.asterix.metadata.valueextractors.DatatypeNameValueExtractor;
 import edu.uci.ics.asterix.metadata.valueextractors.MetadataEntityValueExtractor;
@@ -770,6 +772,7 @@
             }
             return results.get(0);
         } catch (Exception e) {
+            e.printStackTrace();
             throw new MetadataException(e);
         }
     }
@@ -1189,4 +1192,40 @@
         }
 
     }
+
+    @Override
+    public void addFeedPolicy(JobId jobId, FeedPolicy feedPolicy) throws MetadataException, RemoteException {
+        try {
+            // Insert into the 'FeedPolicy' dataset.
+            FeedPolicyTupleTranslator tupleReaderWriter = new FeedPolicyTupleTranslator(true);
+            ITupleReference feedPolicyTuple = tupleReaderWriter.getTupleFromMetadataEntity(feedPolicy);
+            insertTupleIntoIndex(jobId, MetadataPrimaryIndexes.FEED_POLICY_DATASET, feedPolicyTuple);
+
+        } catch (BTreeDuplicateKeyException e) {
+            throw new MetadataException("A feed policy with this name " + feedPolicy.getPolicyName()
+                    + " already exists in dataverse '" + feedPolicy.getPolicyName() + "'.", e);
+        } catch (Exception e) {
+            throw new MetadataException(e);
+        }
+
+    }
+
+    @Override
+    public FeedPolicy getFeedPolicy(JobId jobId, String dataverse, String policyName) throws MetadataException,
+            RemoteException {
+
+        try {
+            ITupleReference searchKey = createTuple(dataverse, policyName);
+            FeedPolicyTupleTranslator tupleReaderWriter = new FeedPolicyTupleTranslator(false);
+            List<FeedPolicy> results = new ArrayList<FeedPolicy>();
+            IValueExtractor<FeedPolicy> valueExtractor = new MetadataEntityValueExtractor<FeedPolicy>(tupleReaderWriter);
+            searchIndex(jobId, MetadataPrimaryIndexes.FEED_POLICY_DATASET, searchKey, valueExtractor, results);
+            if (!results.isEmpty()) {
+                return results.get(0);
+            }
+            return null;
+        } catch (Exception e) {
+            throw new MetadataException(e);
+        }
+    }
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java
index cbd37d6..8bd7056 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java
@@ -18,15 +18,16 @@
 import java.util.ArrayList;
 
 import edu.uci.ics.asterix.common.functions.FunctionSignature;
-import edu.uci.ics.asterix.external.dataset.adapter.AdapterIdentifier;
 import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
 import edu.uci.ics.asterix.metadata.entities.Dataset;
 import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
 import edu.uci.ics.asterix.metadata.entities.Datatype;
 import edu.uci.ics.asterix.metadata.entities.Dataverse;
+import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
 import edu.uci.ics.asterix.metadata.entities.Function;
 import edu.uci.ics.asterix.metadata.entities.Index;
 import edu.uci.ics.asterix.metadata.entities.NodeGroup;
+import edu.uci.ics.asterix.metadata.feeds.AdapterIdentifier;
 import edu.uci.ics.asterix.transaction.management.service.transaction.JobId;
 
 /**
@@ -200,4 +201,10 @@
         droppedCache.clear();
         opLog.clear();
     }
+
+    public void addFeedPolicy(FeedPolicy feedPolicy) {
+        droppedCache.dropFeedPolicy(feedPolicy);
+        logAndApply(new MetadataLogicalOperation(feedPolicy, true));
+
+    }
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataIndex.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataIndex.java
index ca6ab08..9870df2 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataIndex.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataIndex.java
@@ -18,15 +18,12 @@
 import java.util.List;
 
 import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
-import edu.uci.ics.asterix.transaction.management.service.logging.IndexLogger;
 import edu.uci.ics.asterix.transaction.management.service.transaction.DatasetId;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
 
 /**
  * Descriptor interface for a primary or secondary index on metadata datasets.
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
index 5eff5b3..59248eb 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
@@ -19,7 +19,6 @@
 import java.util.List;
 
 import edu.uci.ics.asterix.common.functions.FunctionSignature;
-import edu.uci.ics.asterix.external.feed.lifecycle.FeedId;
 import edu.uci.ics.asterix.metadata.MetadataException;
 import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
 import edu.uci.ics.asterix.metadata.entities.Dataset;
@@ -27,12 +26,13 @@
 import edu.uci.ics.asterix.metadata.entities.Datatype;
 import edu.uci.ics.asterix.metadata.entities.Dataverse;
 import edu.uci.ics.asterix.metadata.entities.FeedActivity;
+import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
 import edu.uci.ics.asterix.metadata.entities.Function;
 import edu.uci.ics.asterix.metadata.entities.Index;
 import edu.uci.ics.asterix.metadata.entities.Node;
 import edu.uci.ics.asterix.metadata.entities.NodeGroup;
+import edu.uci.ics.asterix.metadata.feeds.FeedId;
 import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
-import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
 
 /**
  * A metadata manager provides user access to Asterix metadata (e.g., types,
@@ -462,6 +462,16 @@
     public FeedActivity getRecentFeedActivity(MetadataTransactionContext ctx, String dataverseName, String datasetName)
             throws MetadataException;
 
+    /**
+     * @param ctx
+     * @param policy
+     * @throws MetadataException
+     */
+    public void addFeedPolicy(MetadataTransactionContext ctx, FeedPolicy policy) throws MetadataException;
+
+    public FeedPolicy getFeedPolicy(MetadataTransactionContext ctx, String dataverse, String policyName)
+            throws MetadataException;
+
     public void initializeDatasetIdFactory(MetadataTransactionContext ctx) throws MetadataException;
 
     public int getMostRecentDatasetId() throws MetadataException;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java
index e58a2f0..b524acc 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java
@@ -21,17 +21,18 @@
 import java.util.List;
 
 import edu.uci.ics.asterix.common.functions.FunctionSignature;
-import edu.uci.ics.asterix.external.feed.lifecycle.FeedId;
 import edu.uci.ics.asterix.metadata.MetadataException;
-import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
 import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
 import edu.uci.ics.asterix.metadata.entities.Datatype;
 import edu.uci.ics.asterix.metadata.entities.Dataverse;
 import edu.uci.ics.asterix.metadata.entities.FeedActivity;
+import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
 import edu.uci.ics.asterix.metadata.entities.Function;
 import edu.uci.ics.asterix.metadata.entities.Index;
 import edu.uci.ics.asterix.metadata.entities.Node;
 import edu.uci.ics.asterix.metadata.entities.NodeGroup;
+import edu.uci.ics.asterix.metadata.feeds.FeedId;
 import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
 import edu.uci.ics.asterix.transaction.management.service.transaction.JobId;
 
@@ -504,4 +505,23 @@
     public void registerFeedActivity(JobId jobId, FeedId feedId, FeedActivity feedActivity) throws MetadataException,
             RemoteException;
 
+    /**
+     * @param jobId
+     * @param feedPolicy
+     * @throws MetadataException
+     * @throws RemoteException
+     */
+    public void addFeedPolicy(JobId jobId, FeedPolicy feedPolicy) throws MetadataException, RemoteException;
+
+    /**
+     * @param jobId
+     * @param dataverse
+     * @param policy
+     * @return
+     * @throws MetadataException
+     * @throws RemoteException
+     */
+    public FeedPolicy getFeedPolicy(JobId jobId, String dataverse, String policy) throws MetadataException,
+            RemoteException;
+
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
index e6eb4fe..faac080 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -33,8 +33,6 @@
 import edu.uci.ics.asterix.common.config.IAsterixPropertiesProvider;
 import edu.uci.ics.asterix.common.context.AsterixAppRuntimeContext;
 import edu.uci.ics.asterix.common.context.AsterixRuntimeComponentsProvider;
-import edu.uci.ics.asterix.external.adapter.factory.IAdapterFactory;
-import edu.uci.ics.asterix.external.dataset.adapter.AdapterIdentifier;
 import edu.uci.ics.asterix.metadata.IDatasetDetails;
 import edu.uci.ics.asterix.metadata.MetadataException;
 import edu.uci.ics.asterix.metadata.MetadataManager;
@@ -46,12 +44,16 @@
 import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
 import edu.uci.ics.asterix.metadata.entities.Datatype;
 import edu.uci.ics.asterix.metadata.entities.Dataverse;
+import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
 import edu.uci.ics.asterix.metadata.entities.Index;
 import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
 import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails.FileStructure;
 import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails.PartitioningStrategy;
 import edu.uci.ics.asterix.metadata.entities.Node;
 import edu.uci.ics.asterix.metadata.entities.NodeGroup;
+import edu.uci.ics.asterix.metadata.feeds.AdapterIdentifier;
+import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies;
+import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
 import edu.uci.ics.asterix.om.types.BuiltinType;
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.asterix.runtime.formats.NonTaggedDataFormat;
@@ -186,6 +188,7 @@
                 insertNodes(mdTxnCtx);
                 insertInitialGroups(mdTxnCtx);
                 insertInitialAdapters(mdTxnCtx);
+                insertInitialFeedPolicies(mdTxnCtx);
 
                 if (LOGGER.isLoggable(Level.INFO)) {
                     LOGGER.info("Finished creating metadata B-trees.");
@@ -327,6 +330,12 @@
         }
     }
 
+    private static void insertInitialFeedPolicies(MetadataTransactionContext mdTxnCtx) throws Exception {
+        for (FeedPolicy feedPolicy : BuiltinFeedPolicies.policies) {
+            MetadataManager.INSTANCE.addFeedPolicy(mdTxnCtx, feedPolicy);
+        }
+    }
+
     private static DatasourceAdapter getAdapter(String adapterFactoryClassName) throws Exception {
         String adapterName = ((IAdapterFactory) (Class.forName(adapterFactoryClassName).newInstance())).getName();
         return new DatasourceAdapter(new AdapterIdentifier(MetadataConstants.METADATA_DATAVERSE_NAME, adapterName),
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataIndex.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataIndex.java
index c0ce030..db295d4 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataIndex.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataIndex.java
@@ -27,17 +27,13 @@
 import edu.uci.ics.asterix.metadata.api.IMetadataIndex;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
-import edu.uci.ics.asterix.transaction.management.service.logging.IndexLogger;
 import edu.uci.ics.asterix.transaction.management.service.transaction.DatasetId;
-import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager.ResourceType;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
 
 /**
  * Descriptor for a primary or secondary index on metadata datasets.
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java
index 631be89..f922169 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java
@@ -49,6 +49,8 @@
     public static ARecordType DATASOURCE_ADAPTER_RECORDTYPE;
     public static ARecordType FEED_ACTIVITY_RECORDTYPE;
     public static ARecordType FEED_POLICY_RECORDTYPE;
+    public static ARecordType POLICY_PARAMS_RECORDTYPE;
+    public static ARecordType FEED_ACTIVITY_DETAILS_RECORDTYPE;
 
     /**
      * Create all metadata record types.
@@ -58,6 +60,7 @@
         // depend on other types being created first.
         // These calls are one "dependency chain".
         try {
+            POLICY_PARAMS_RECORDTYPE = createPropertiesRecordType();
             DATASOURCE_ADAPTER_PROPERTIES_RECORDTYPE = createPropertiesRecordType();
             INTERNAL_DETAILS_RECORDTYPE = createInternalDetailsRecordType();
             EXTERNAL_DETAILS_RECORDTYPE = createExternalDetailsRecordType();
@@ -78,6 +81,7 @@
             NODEGROUP_RECORDTYPE = createNodeGroupRecordType();
             FUNCTION_RECORDTYPE = createFunctionRecordType();
             DATASOURCE_ADAPTER_RECORDTYPE = createDatasourceAdapterRecordType();
+            FEED_ACTIVITY_DETAILS_RECORDTYPE = createPropertiesRecordType();
             FEED_ACTIVITY_RECORDTYPE = createFeedActivityRecordType();
             FEED_POLICY_RECORDTYPE = createFeedPolicyRecordType();
         } catch (AsterixException e) {
@@ -91,12 +95,10 @@
     public static final int FEED_POLICY_ARECORD_PROPERTIES_FIELD_INDEX = 3;
 
     private static ARecordType createFeedPolicyRecordType() throws AsterixException {
-        IAType propertiesRecordType = createPropertiesRecordType();
-        AUnorderedListType listPropertiesType = new AUnorderedListType(propertiesRecordType, null);
-
+        AUnorderedListType listPropertiesType = new AUnorderedListType(POLICY_PARAMS_RECORDTYPE, null);
         String[] fieldNames = { "DataverseName", "PolicyName", "Description", "Properties" };
         IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, listPropertiesType };
-        return new ARecordType(null, fieldNames, fieldTypes, true);
+        return new ARecordType("FeedPolicyRecordType", fieldNames, fieldTypes, true);
     }
 
     // Helper constants for accessing fields in an ARecord of type
@@ -115,8 +117,8 @@
     // Helper constants for accessing fields in an ARecord of anonymous type
     // dataset properties.
     // Used for dataset hints or dataset adapter properties.
-    public static final int DATASOURCE_PROPERTIES_NAME_FIELD_INDEX = 0;
-    public static final int DATASOURCE_PROPERTIES_VALUE_FIELD_INDEX = 1;
+    public static final int PROPERTIES_NAME_FIELD_INDEX = 0;
+    public static final int PROPERTIES_VALUE_FIELD_INDEX = 1;
 
     private static final ARecordType createPropertiesRecordType() throws AsterixException {
         String[] fieldNames = { "Name", "Value" };
@@ -385,16 +387,15 @@
     public static final int FEED_ACTIVITY_ARECORD_DATASET_NAME_FIELD_INDEX = 1;
     public static final int FEED_ACTIVITY_ARECORD_ACTIVITY_ID_FIELD_INDEX = 2;
     public static final int FEED_ACTIVITY_ARECORD_ACTIVITY_TYPE_FIELD_INDEX = 3;
-    public static final int FEED_ACTIVITY_ARECORD_INGEST_NODES_FIELD_INDEX = 4;
-    public static final int FEED_ACTIVITY_ARECORD_COMPUTE_NODES_FIELD_INDEX = 5;
-    public static final int FEED_ACTIVITY_ARECORD_LAST_UPDATE_TIMESTAMP_FIELD_INDEX = 6;
+    public static final int FEED_ACTIVITY_ARECORD_LAST_UPDATE_TIMESTAMP_FIELD_INDEX = 4;
+    public static final int FEED_ACTIVITY_ARECORD_DETAILS_FIELD_INDEX = 5;
 
     private static ARecordType createFeedActivityRecordType() throws AsterixException {
-        AUnorderedListType unorderedListType = new AUnorderedListType(BuiltinType.ASTRING, null);
-        String[] fieldNames = { "DataverseName", "DatasetName", "ActivityId", "ActivityType", "IngestNodes",
-                "ComputeNodes", "UpdateTimestamp" };
+        AUnorderedListType unorderedPropertyListType = new AUnorderedListType(FEED_ACTIVITY_DETAILS_RECORDTYPE, null);
+        String[] fieldNames = { "DataverseName", "DatasetName", "ActivityId", "ActivityType", "UpdateTimestamp",
+                "Details" };
         IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.AINT32, BuiltinType.ASTRING,
-                unorderedListType, unorderedListType, BuiltinType.ASTRING };
+                BuiltinType.ASTRING, unorderedPropertyListType };
         return new ARecordType("FeedActivityRecordType", fieldNames, fieldTypes, true);
     }
 
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/dataset/hints/IHint.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/dataset/hints/IHint.java
index 8957ca7..107eb53 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/dataset/hints/IHint.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/dataset/hints/IHint.java
@@ -16,6 +16,7 @@
 
 import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
 
+
 /**
  * Represents a hint provided as part of an AQL statement.
  */
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java
index 0ed3c78..e3e4cb7 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java
@@ -16,8 +16,11 @@
 package edu.uci.ics.asterix.metadata.declared;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
@@ -51,6 +54,7 @@
     private IAType[] schemaTypes;
     private INodeDomain domain;
     private AqlDataSourceType datasourceType;
+    private Map<String, Serializable> properties = new HashMap<String, Serializable>();
 
     public enum AqlDataSourceType {
         INTERNAL,
@@ -268,4 +272,12 @@
         return datasourceType;
     }
 
+    public Map<String, Serializable> getProperties() {
+        return properties;
+    }
+
+    public void setProperties(Map<String, Serializable> properties) {
+        this.properties = properties;
+    }
+
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index b98e2d7..80e1b9f 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -33,16 +33,6 @@
 import edu.uci.ics.asterix.common.dataflow.IAsterixApplicationContextInfo;
 import edu.uci.ics.asterix.common.parse.IParseFileSplitsDecl;
 import edu.uci.ics.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory;
-import edu.uci.ics.asterix.external.adapter.factory.IAdapterFactory;
-import edu.uci.ics.asterix.external.adapter.factory.IGenericDatasetAdapterFactory;
-import edu.uci.ics.asterix.external.adapter.factory.ITypedDatasetAdapterFactory;
-import edu.uci.ics.asterix.external.data.operator.ExternalDataScanOperatorDescriptor;
-import edu.uci.ics.asterix.external.data.operator.FeedIntakeOperatorDescriptor;
-import edu.uci.ics.asterix.external.data.operator.FeedMessageOperatorDescriptor;
-import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
-import edu.uci.ics.asterix.external.dataset.adapter.ITypedDatasourceAdapter;
-import edu.uci.ics.asterix.external.feed.lifecycle.FeedId;
-import edu.uci.ics.asterix.external.feed.lifecycle.IFeedMessage;
 import edu.uci.ics.asterix.formats.base.IDataFormat;
 import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
 import edu.uci.ics.asterix.formats.nontagged.AqlTypeTraitProvider;
@@ -57,9 +47,22 @@
 import edu.uci.ics.asterix.metadata.entities.Dataverse;
 import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
 import edu.uci.ics.asterix.metadata.entities.FeedActivity;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityDetails;
 import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails;
+import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
 import edu.uci.ics.asterix.metadata.entities.Index;
 import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
+import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies;
+import edu.uci.ics.asterix.metadata.feeds.ExternalDataScanOperatorDescriptor;
+import edu.uci.ics.asterix.metadata.feeds.FeedId;
+import edu.uci.ics.asterix.metadata.feeds.FeedIntakeOperatorDescriptor;
+import edu.uci.ics.asterix.metadata.feeds.FeedMessageOperatorDescriptor;
+import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
+import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IFeedMessage;
+import edu.uci.ics.asterix.metadata.feeds.IGenericDatasetAdapterFactory;
+import edu.uci.ics.asterix.metadata.feeds.ITypedDatasetAdapterFactory;
+import edu.uci.ics.asterix.metadata.feeds.ITypedDatasourceAdapter;
 import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
 import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
 import edu.uci.ics.asterix.om.types.ARecordType;
@@ -309,7 +312,7 @@
         IAType itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getDataverseName(), itemTypeName)
                 .getDatatype();
         if (dataSource instanceof ExternalFeedDataSource) {
-            return buildFeedIntakeRuntime(jobSpec, dataset);
+            return buildFeedIntakeRuntime(jobSpec, dataset, dataSource);
         } else {
             return buildExternalDataScannerRuntime(jobSpec, itemType,
                     (ExternalDatasetDetails) dataset.getDatasetDetails(), NonTaggedDataFormat.INSTANCE);
@@ -399,7 +402,7 @@
 
     @SuppressWarnings("rawtypes")
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildFeedIntakeRuntime(JobSpecification jobSpec,
-            Dataset dataset) throws AlgebricksException {
+            Dataset dataset, IDataSource<AqlSourceId> dataSource) throws AlgebricksException {
 
         FeedDatasetDetails datasetDetails = (FeedDatasetDetails) dataset.getDatasetDetails();
         DatasourceAdapter adapterEntity;
@@ -449,9 +452,11 @@
                 .getSerializerDeserializer(adapterOutputType);
         RecordDescriptor feedDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
 
+        FeedPolicy feedPolicy = (FeedPolicy) ((AqlDataSource) dataSource).getProperties().get(
+                BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY);
         FeedIntakeOperatorDescriptor feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, new FeedId(
                 dataset.getDataverseName(), dataset.getDatasetName()), adapterFactoryClassname,
-                datasetDetails.getProperties(), (ARecordType) adapterOutputType, feedDesc);
+                datasetDetails.getProperties(), (ARecordType) adapterOutputType, feedDesc, feedPolicy.getProperties());
 
         AlgebricksPartitionConstraint constraint = null;
         try {
@@ -467,7 +472,7 @@
             String dataverse, String dataset, List<IFeedMessage> feedMessages, FeedActivity feedActivity)
             throws AlgebricksException {
         AlgebricksPartitionConstraint partitionConstraint = new AlgebricksAbsolutePartitionConstraint(feedActivity
-                .getIngestNodes().toArray(new String[] {}));
+                .getFeedActivityDetails().get(FeedActivityDetails.INGEST_LOCATIONS).split(","));
         FeedMessageOperatorDescriptor feedMessenger = new FeedMessageOperatorDescriptor(jobSpec, dataverse, dataset,
                 feedMessages);
         return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(feedMessenger, partitionConstraint);
@@ -1455,6 +1460,14 @@
         return type.getDatatype();
     }
 
+    public FeedPolicy findFeedPolicy(String dataverse, String policyName) throws AlgebricksException {
+        try {
+            return MetadataManager.INSTANCE.getFeedPolicy(mdTxnCtx, dataverse, policyName);
+        } catch (MetadataException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
     public List<Index> getDatasetIndexes(String dataverseName, String datasetName) throws AlgebricksException {
         try {
             return MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FileSplitDataSink.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FileSplitDataSink.java
index 586be9c..777b335 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FileSplitDataSink.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FileSplitDataSink.java
@@ -18,6 +18,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSink;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
 
+
 public class FileSplitDataSink implements IDataSink {
 
     private FileSplitSinkId id;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FileSplitSinkId.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FileSplitSinkId.java
index 00625fe..59aa62b 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FileSplitSinkId.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FileSplitSinkId.java
@@ -17,6 +17,7 @@
 
 import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
 
+
 public class FileSplitSinkId {
 
     private FileSplit fileSplit;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/ResultSetDataSink.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/ResultSetDataSink.java
index ad91111..93faef5 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/ResultSetDataSink.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/ResultSetDataSink.java
@@ -20,6 +20,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ResultSetDomain;
 
+
 public class ResultSetDataSink implements IDataSink {
 
     private ResultSetSinkId id;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/ResultSetSinkId.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/ResultSetSinkId.java
index 1eb4336..6db14da 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/ResultSetSinkId.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/ResultSetSinkId.java
@@ -16,6 +16,7 @@
 
 import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 
+
 public class ResultSetSinkId {
 
     private final ResultSetId resultSetId;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/DatasourceAdapter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/DatasourceAdapter.java
index 2955a08..6f63199 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/DatasourceAdapter.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/DatasourceAdapter.java
@@ -14,9 +14,9 @@
  */
 package edu.uci.ics.asterix.metadata.entities;
 
-import edu.uci.ics.asterix.external.dataset.adapter.AdapterIdentifier;
 import edu.uci.ics.asterix.metadata.MetadataCache;
 import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
+import edu.uci.ics.asterix.metadata.feeds.AdapterIdentifier;
 
 public class DatasourceAdapter implements IMetadataEntity {
 
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java
index eb2673f..72a1f03 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java
@@ -16,6 +16,7 @@
 package edu.uci.ics.asterix.metadata.entities;
 
 import java.util.List;
+import java.util.Map;
 
 import edu.uci.ics.asterix.metadata.MetadataCache;
 import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
@@ -33,10 +34,9 @@
     // Enforced to be unique within a dataverse.
     private final String datasetName;
 
-    private List<String> ingestNodes;
-    private List<String> computeNodes;
     private String lastUpdatedTimestamp;
     private FeedActivityType activityType;
+    private Map<String, String> feedActivityDetails;
 
     public static enum FeedActivityType {
         FEED_BEGIN,
@@ -47,13 +47,22 @@
         FEED_SHRINK
     }
 
+    public static class FeedActivityDetails {
+        public static final String COMPUTE_LOCATIONS = "compute-locations";
+        public static final String INGEST_LOCATIONS = "ingest-locations";
+        public static final String TOTAL_INGESTED = "total-ingested";
+        public static final String INGESTION_RATE = "ingestion-rate";
+        public static final String EXCEPTION_LOCATION = "exception-location";
+        public static final String EXCEPTION_MESSAGE = "exception-message";
+
+    }
+
     public FeedActivity(String dataverseName, String datasetName, FeedActivityType feedActivityType,
-            List<String> ingestNodes, List<String> computeNodes) {
+            Map<String, String> feedActivityDetails) {
         this.dataverseName = dataverseName;
         this.datasetName = datasetName;
         this.activityType = feedActivityType;
-        this.ingestNodes = ingestNodes;
-        this.computeNodes = computeNodes;
+        this.feedActivityDetails = feedActivityDetails;
     }
 
     public String getDataverseName() {
@@ -100,22 +109,6 @@
         this.activityType = feedActivityType;
     }
 
-    public List<String> getIngestNodes() {
-        return ingestNodes;
-    }
-
-    public void setIngestNodes(List<String> ingestNodes) {
-        this.ingestNodes = ingestNodes;
-    }
-
-    public List<String> getComputeNodes() {
-        return computeNodes;
-    }
-
-    public void setComputeNodes(List<String> computeNodes) {
-        this.computeNodes = computeNodes;
-    }
-
     public String getLastUpdatedTimestamp() {
         return lastUpdatedTimestamp;
     }
@@ -132,8 +125,25 @@
         this.activityId = activityId;
     }
 
+    public Map<String, String> getFeedActivityDetails() {
+        return feedActivityDetails;
+    }
+
+    public void setFeedActivityDetails(Map<String, String> feedActivityDetails) {
+        this.feedActivityDetails = feedActivityDetails;
+    }
+
+    public FeedActivityType getActivityType() {
+        return activityType;
+    }
+
+    public void setActivityType(FeedActivityType activityType) {
+        this.activityType = activityType;
+    }
+
     @Override
     public int compareTo(FeedActivity o) {
         return this.activityId - o.getActivityId();
     }
+
 }
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedPolicy.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedPolicy.java
index f796cb7..b011e5c 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedPolicy.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedPolicy.java
@@ -30,7 +30,7 @@
     private final String dataverseName;
     // Enforced to be unique within a dataverse.
     private final String policyName;
-    // A descriptiokn of the policy
+    // A description of the policy
     private final String description;
     // The policy properties associated with the feed dataset
     private Map<String, String> properties;
@@ -39,6 +39,7 @@
         this.dataverseName = dataverseName;
         this.policyName = policyName;
         this.description = description;
+        this.properties = properties;
     }
 
     public String getDataverseName() {
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
index 2a68ae9..65b2d78 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
@@ -138,9 +138,9 @@
                 String value;
                 while (cursor.next()) {
                     ARecord field = (ARecord) cursor.get();
-                    key = ((AString) field.getValueByPos(MetadataRecordTypes.DATASOURCE_PROPERTIES_NAME_FIELD_INDEX))
+                    key = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_NAME_FIELD_INDEX))
                             .getStringValue();
-                    value = ((AString) field.getValueByPos(MetadataRecordTypes.DATASOURCE_PROPERTIES_VALUE_FIELD_INDEX))
+                    value = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_VALUE_FIELD_INDEX))
                             .getStringValue();
                     properties.put(key, value);
                 }
@@ -228,9 +228,9 @@
                 String value;
                 while (cursor.next()) {
                     ARecord field = (ARecord) cursor.get();
-                    key = ((AString) field.getValueByPos(MetadataRecordTypes.DATASOURCE_PROPERTIES_NAME_FIELD_INDEX))
+                    key = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_NAME_FIELD_INDEX))
                             .getStringValue();
-                    value = ((AString) field.getValueByPos(MetadataRecordTypes.DATASOURCE_PROPERTIES_VALUE_FIELD_INDEX))
+                    value = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_VALUE_FIELD_INDEX))
                             .getStringValue();
                     properties.put(key, value);
                 }
@@ -359,9 +359,9 @@
         IACursor cursor = list.getCursor();
         while (cursor.next()) {
             ARecord field = (ARecord) cursor.get();
-            key = ((AString) field.getValueByPos(MetadataRecordTypes.DATASOURCE_PROPERTIES_NAME_FIELD_INDEX))
+            key = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_NAME_FIELD_INDEX))
                     .getStringValue();
-            value = ((AString) field.getValueByPos(MetadataRecordTypes.DATASOURCE_PROPERTIES_VALUE_FIELD_INDEX))
+            value = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_VALUE_FIELD_INDEX))
                     .getStringValue();
             hints.put(key, value);
         }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java
index 4a5e4dcf..e93fd97 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java
@@ -22,13 +22,13 @@
 import java.util.Calendar;
 
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.external.dataset.adapter.AdapterIdentifier;
 import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
 import edu.uci.ics.asterix.metadata.MetadataException;
 import edu.uci.ics.asterix.metadata.bootstrap.MetadataPrimaryIndexes;
 import edu.uci.ics.asterix.metadata.bootstrap.MetadataRecordTypes;
 import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
 import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter.AdapterType;
+import edu.uci.ics.asterix.metadata.feeds.AdapterIdentifier;
 import edu.uci.ics.asterix.om.base.ARecord;
 import edu.uci.ics.asterix.om.base.AString;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/FeedActivityTupleTranslator.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/FeedActivityTupleTranslator.java
index e81eb3e..51678dc2 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/FeedActivityTupleTranslator.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/FeedActivityTupleTranslator.java
@@ -18,11 +18,16 @@
 import java.io.ByteArrayInputStream;
 import java.io.DataInput;
 import java.io.DataInputStream;
+import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Calendar;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
+import edu.uci.ics.asterix.builders.IARecordBuilder;
+import edu.uci.ics.asterix.builders.RecordBuilder;
 import edu.uci.ics.asterix.builders.UnorderedListBuilder;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
@@ -33,6 +38,7 @@
 import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
 import edu.uci.ics.asterix.om.base.AInt32;
 import edu.uci.ics.asterix.om.base.AMutableInt32;
+import edu.uci.ics.asterix.om.base.AMutableString;
 import edu.uci.ics.asterix.om.base.ARecord;
 import edu.uci.ics.asterix.om.base.AString;
 import edu.uci.ics.asterix.om.base.AUnorderedList;
@@ -40,6 +46,7 @@
 import edu.uci.ics.asterix.om.types.AUnorderedListType;
 import edu.uci.ics.asterix.om.types.BuiltinType;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 
@@ -93,22 +100,20 @@
         String feedActivityType = ((AString) feedActivityRecord
                 .getValueByPos(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_ACTIVITY_TYPE_FIELD_INDEX)).getStringValue();
 
-        List<String> ingestNodes = new ArrayList<String>();
         IACursor cursor = ((AUnorderedList) feedActivityRecord
-                .getValueByPos(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_INGEST_NODES_FIELD_INDEX)).getCursor();
+                .getValueByPos(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_DETAILS_FIELD_INDEX)).getCursor();
+        Map<String, String> activityDetails = new HashMap<String, String>();
+        String key;
+        String value;
         while (cursor.next()) {
-            ingestNodes.add(((AString) cursor.get()).getStringValue());
-        }
-
-        List<String> computeNodes = new ArrayList<String>();
-        cursor = ((AUnorderedList) feedActivityRecord
-                .getValueByPos(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_COMPUTE_NODES_FIELD_INDEX)).getCursor();
-        while (cursor.next()) {
-            computeNodes.add(((AString) cursor.get()).getStringValue());
+            ARecord field = (ARecord) cursor.get();
+            key = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_NAME_FIELD_INDEX)).getStringValue();
+            value = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_VALUE_FIELD_INDEX)).getStringValue();
+            activityDetails.put(key, value);
         }
 
         FeedActivity fa = new FeedActivity(dataverseName, datasetName, FeedActivityType.valueOf(feedActivityType),
-                ingestNodes, computeNodes);
+                activityDetails);
         fa.setActivityId(activityId);
         return fa;
     }
@@ -159,38 +164,27 @@
         recordBuilder.addField(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_ACTIVITY_TYPE_FIELD_INDEX, fieldValue);
 
         // write field 4
-        UnorderedListBuilder listBuilder = new UnorderedListBuilder();
-        listBuilder
-                .reset((AUnorderedListType) MetadataRecordTypes.FEED_ACTIVITY_RECORDTYPE.getFieldTypes()[MetadataRecordTypes.FEED_ACTIVITY_ARECORD_INGEST_NODES_FIELD_INDEX]);
-        for (String field : feedActivity.getIngestNodes()) {
-            itemValue.reset();
-            aString.setValue(field);
-            stringSerde.serialize(aString, itemValue.getDataOutput());
-            listBuilder.addItem(itemValue);
-        }
-        fieldValue.reset();
-        listBuilder.write(fieldValue.getDataOutput(), true);
-        recordBuilder.addField(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_INGEST_NODES_FIELD_INDEX, fieldValue);
-
-        // write field 5
-        listBuilder
-                .reset((AUnorderedListType) MetadataRecordTypes.FEED_ACTIVITY_RECORDTYPE.getFieldTypes()[MetadataRecordTypes.FEED_ACTIVITY_ARECORD_COMPUTE_NODES_FIELD_INDEX]);
-        for (String field : feedActivity.getIngestNodes()) {
-            itemValue.reset();
-            aString.setValue(field);
-            stringSerde.serialize(aString, itemValue.getDataOutput());
-            listBuilder.addItem(itemValue);
-        }
-        fieldValue.reset();
-        listBuilder.write(fieldValue.getDataOutput(), true);
-        recordBuilder.addField(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_COMPUTE_NODES_FIELD_INDEX, fieldValue);
-
-        // write field 6
         fieldValue.reset();
         aString.setValue(Calendar.getInstance().getTime().toString());
         stringSerde.serialize(aString, fieldValue.getDataOutput());
         recordBuilder.addField(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_LAST_UPDATE_TIMESTAMP_FIELD_INDEX, fieldValue);
 
+        // write field 5
+        Map<String, String> properties = feedActivity.getFeedActivityDetails();
+        UnorderedListBuilder listBuilder = new UnorderedListBuilder();
+        listBuilder
+                .reset((AUnorderedListType) MetadataRecordTypes.FEED_ACTIVITY_RECORDTYPE.getFieldTypes()[MetadataRecordTypes.FEED_ACTIVITY_ARECORD_DETAILS_FIELD_INDEX]);
+        for (Map.Entry<String, String> property : properties.entrySet()) {
+            String name = property.getKey();
+            String value = property.getValue();
+            itemValue.reset();
+            writePropertyTypeRecord(name, value, itemValue.getDataOutput());
+            listBuilder.addItem(itemValue);
+        }
+        fieldValue.reset();
+        listBuilder.write(fieldValue.getDataOutput(), true);
+        recordBuilder.addField(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_DETAILS_FIELD_INDEX, fieldValue);
+
         // write record
         try {
             recordBuilder.write(tupleBuilder.getDataOutput(), true);
@@ -203,4 +197,31 @@
         return tuple;
     }
 
+    public void writePropertyTypeRecord(String name, String value, DataOutput out) throws HyracksDataException {
+        IARecordBuilder propertyRecordBuilder = new RecordBuilder();
+        ArrayBackedValueStorage fieldValue = new ArrayBackedValueStorage();
+        propertyRecordBuilder.reset(MetadataRecordTypes.FEED_ACTIVITY_DETAILS_RECORDTYPE);
+        AMutableString aString = new AMutableString("");
+        ISerializerDeserializer<AString> stringSerde = AqlSerializerDeserializerProvider.INSTANCE
+                .getSerializerDeserializer(BuiltinType.ASTRING);
+
+        // write field 0
+        fieldValue.reset();
+        aString.setValue(name);
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        propertyRecordBuilder.addField(0, fieldValue);
+
+        // write field 1
+        fieldValue.reset();
+        aString.setValue(value);
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        propertyRecordBuilder.addField(1, fieldValue);
+
+        try {
+            propertyRecordBuilder.write(out, true);
+        } catch (IOException | AsterixException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
 }
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/FeedPolicyTupleTranslator.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/FeedPolicyTupleTranslator.java
index 05effd8..57d2d1c 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/FeedPolicyTupleTranslator.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/FeedPolicyTupleTranslator.java
@@ -18,10 +18,13 @@
 import java.io.ByteArrayInputStream;
 import java.io.DataInput;
 import java.io.DataInputStream;
+import java.io.DataOutput;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
+import edu.uci.ics.asterix.builders.IARecordBuilder;
+import edu.uci.ics.asterix.builders.RecordBuilder;
 import edu.uci.ics.asterix.builders.UnorderedListBuilder;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
@@ -31,13 +34,15 @@
 import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
 import edu.uci.ics.asterix.om.base.AInt32;
 import edu.uci.ics.asterix.om.base.AMutableInt32;
-import edu.uci.ics.asterix.om.base.AOrderedList;
+import edu.uci.ics.asterix.om.base.AMutableString;
 import edu.uci.ics.asterix.om.base.ARecord;
 import edu.uci.ics.asterix.om.base.AString;
+import edu.uci.ics.asterix.om.base.AUnorderedList;
 import edu.uci.ics.asterix.om.base.IACursor;
 import edu.uci.ics.asterix.om.types.AUnorderedListType;
 import edu.uci.ics.asterix.om.types.BuiltinType;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 
@@ -88,7 +93,18 @@
         String description = ((AString) feedPolicyRecord
                 .getValueByPos(MetadataRecordTypes.FEED_POLICY_ARECORD_DESCRIPTION_FIELD_INDEX)).getStringValue();
 
+        IACursor cursor = ((AUnorderedList) feedPolicyRecord
+                .getValueByPos(MetadataRecordTypes.FEED_POLICY_ARECORD_PROPERTIES_FIELD_INDEX)).getCursor();
         Map<String, String> policyParamters = new HashMap<String, String>();
+        String key;
+        String value;
+        while (cursor.next()) {
+            ARecord field = (ARecord) cursor.get();
+            key = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_NAME_FIELD_INDEX)).getStringValue();
+            value = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_VALUE_FIELD_INDEX)).getStringValue();
+            policyParamters.put(key, value);
+        }
+
         feedPolicy = new FeedPolicy(dataverseName, policyName, description, policyParamters);
         return feedPolicy;
     }
@@ -128,8 +144,10 @@
         recordBuilder.addField(MetadataRecordTypes.FEED_POLICY_ARECORD_POLICY_NAME_FIELD_INDEX, fieldValue);
 
         // write field 3 (properties)
+        Map<String, String> properties = feedPolicy.getProperties();
         UnorderedListBuilder listBuilder = new UnorderedListBuilder();
-        listBuilder.reset((AUnorderedListType) MetadataRecordTypes.FEED_POLICY_RECORDTYPE.getFieldTypes()[6]);
+        listBuilder
+                .reset((AUnorderedListType) MetadataRecordTypes.FEED_POLICY_RECORDTYPE.getFieldTypes()[MetadataRecordTypes.FEED_POLICY_ARECORD_PROPERTIES_FIELD_INDEX]);
         for (Map.Entry<String, String> property : properties.entrySet()) {
             String name = property.getKey();
             String value = property.getValue();
@@ -139,24 +157,6 @@
         }
         fieldValue.reset();
         listBuilder.write(fieldValue.getDataOutput(), true);
-        feedRecordBuilder.addField(MetadataRecordTypes.FEED_DETAILS_ARECORD_PROPERTIES_FIELD_INDEX, fieldValue);
-
-        fieldValue.reset();
-        Map<String, String> properties = new HashMap<String, String>();
-        String key;
-        String value;
-        IACursor cursor = ((AOrderedList) feedPolicy
-                .getValueByPos(MetadataRecordTypes.FEED_POLICY_ARECORD_PROPERTIES_FIELD_INDEX)).getCursor();
-
-        while (cursor.next()) {
-            ARecord field = (ARecord) cursor.get();
-            key = ((AString) field.getValueByPos(MetadataRecordTypes.DATASOURCE_PROPERTIES_NAME_FIELD_INDEX))
-                    .getStringValue();
-            value = ((AString) field.getValueByPos(MetadataRecordTypes.DATASOURCE_PROPERTIES_VALUE_FIELD_INDEX))
-                    .getStringValue();
-            properties.put(key, value);
-        }
-
         recordBuilder.addField(MetadataRecordTypes.FEED_POLICY_ARECORD_PROPERTIES_FIELD_INDEX, fieldValue);
 
         // write record
@@ -171,4 +171,30 @@
         return tuple;
     }
 
-}
\ No newline at end of file
+    public void writePropertyTypeRecord(String name, String value, DataOutput out) throws HyracksDataException {
+        IARecordBuilder propertyRecordBuilder = new RecordBuilder();
+        ArrayBackedValueStorage fieldValue = new ArrayBackedValueStorage();
+        propertyRecordBuilder.reset(MetadataRecordTypes.POLICY_PARAMS_RECORDTYPE);
+        AMutableString aString = new AMutableString("");
+        ISerializerDeserializer<AString> stringSerde = AqlSerializerDeserializerProvider.INSTANCE
+                .getSerializerDeserializer(BuiltinType.ASTRING);
+
+        // write field 0
+        fieldValue.reset();
+        aString.setValue(name);
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        propertyRecordBuilder.addField(0, fieldValue);
+
+        // write field 1
+        fieldValue.reset();
+        aString.setValue(value);
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        propertyRecordBuilder.addField(1, fieldValue);
+
+        try {
+            propertyRecordBuilder.write(out, true);
+        } catch (IOException | AsterixException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AbstractDatasourceAdapter.java
similarity index 96%
rename from asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java
rename to asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AbstractDatasourceAdapter.java
index 440ee8c..476fdbb 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AbstractDatasourceAdapter.java
@@ -12,11 +12,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.asterix.external.dataset.adapter;
+package edu.uci.ics.asterix.metadata.feeds;
 
 import java.util.HashMap;
 import java.util.Map;
 
+import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
 import edu.uci.ics.asterix.om.types.ATypeTag;
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AbstractFeedDatasourceAdapter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AbstractFeedDatasourceAdapter.java
new file mode 100644
index 0000000..d350a9e
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AbstractFeedDatasourceAdapter.java
@@ -0,0 +1,106 @@
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.rmi.RemoteException;
+import java.util.HashMap;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import edu.uci.ics.asterix.metadata.MetadataManager;
+import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityDetails;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
+import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
+
+public abstract class AbstractFeedDatasourceAdapter extends AbstractDatasourceAdapter {
+
+    private static final long serialVersionUID = 1L;
+
+    private int batchPersistSize = 10;
+    protected FeedPolicyEnforcer policyEnforcer;
+    protected StatisticsCollector collector;
+    protected Timer timer;
+
+    public FeedPolicyEnforcer getPolicyEnforcer() {
+        return policyEnforcer;
+    }
+
+    public void setFeedPolicyEnforcer(FeedPolicyEnforcer policyEnforcer) {
+        this.policyEnforcer = policyEnforcer;
+        FeedPolicyAccessor policyAccessor = this.policyEnforcer.getFeedPolicyAccessor();
+        if (policyAccessor.collectStatistics()) {
+            long period = policyAccessor.getStatisicsCollectionPeriodInSecs();
+            collector = new StatisticsCollector(this, policyEnforcer.getFeedId(), period, batchPersistSize);
+            timer = new Timer();
+            timer.schedule(collector, period * 1000, period * 1000);
+        }
+    }
+
+    public abstract long getIngestedRecordsCount();
+
+    protected void dumpStatistics() throws RemoteException, ACIDException {
+        collector.persistStatistics(true);
+    }
+
+    private static class StatisticsCollector extends TimerTask {
+
+        private final AbstractFeedDatasourceAdapter adapter;
+        private final FeedId feedId;
+        //   private List<Long> previousCountValues = new ArrayList<Long>();
+        //   private List<Integer> ingestionRates = new ArrayList<Integer>();
+        private final long period;
+        private FeedActivity feedActivity;
+        private int numMeasurements = 0;
+        private int batchPersistSize;
+        private StringBuilder count = new StringBuilder();
+        private StringBuilder rate = new StringBuilder();
+        private long previousCount = 0;
+
+        public StatisticsCollector(AbstractFeedDatasourceAdapter adapter, FeedId feedId, long period,
+                int batchPersistSize) {
+            this.adapter = adapter;
+            this.feedId = feedId;
+            this.period = period;
+            this.batchPersistSize = batchPersistSize;
+            this.feedActivity = new FeedActivity(feedId.getDataverse(), feedId.getDataset(),
+                    FeedActivityType.FEED_STATS, new HashMap<String, String>());
+        }
+
+        @Override
+        public void run() {
+            try {
+                persistStatistics(false);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+
+        }
+
+        public void persistStatistics(boolean force) throws RemoteException, ACIDException {
+            MetadataTransactionContext ctx = null;
+            try {
+                long currentCount = adapter.getIngestedRecordsCount();
+                long diff = (currentCount - previousCount);
+                count.append(currentCount + ",");
+                rate.append(new Double(diff / period).intValue() + ",");
+                feedActivity.getFeedActivityDetails().put(FeedActivityDetails.INGESTION_RATE, "" + rate.toString());
+                feedActivity.getFeedActivityDetails().put(FeedActivityDetails.TOTAL_INGESTED, "" + count.toString());
+                numMeasurements++;
+                if (numMeasurements == batchPersistSize || force) {
+                    numMeasurements = 0;
+                    ctx = MetadataManager.INSTANCE.beginTransaction();
+                    MetadataManager.INSTANCE.registerFeedActivity(ctx, feedId, feedActivity);
+                    MetadataManager.INSTANCE.commitTransaction(ctx);
+                    count.delete(0, count.length() - 1);
+                    rate.delete(0, rate.length() - 1);
+                }
+                previousCount = currentCount;
+            } catch (Exception e) {
+                if (ctx != null) {
+                    MetadataManager.INSTANCE.abortTransaction(ctx);
+                }
+            }
+        }
+    }
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AdapterIdentifier.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterIdentifier.java
similarity index 96%
rename from asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AdapterIdentifier.java
rename to asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterIdentifier.java
index ac36733..ee63f69 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AdapterIdentifier.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterIdentifier.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.asterix.external.dataset.adapter;
+package edu.uci.ics.asterix.metadata.feeds;
 
 import java.io.Serializable;
 
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/feed/lifecycle/AlterFeedMessage.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AlterFeedMessage.java
similarity index 91%
rename from asterix-external-data/src/main/java/edu/uci/ics/asterix/external/feed/lifecycle/AlterFeedMessage.java
rename to asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AlterFeedMessage.java
index 537bf07..a15c2b0 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/feed/lifecycle/AlterFeedMessage.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AlterFeedMessage.java
@@ -12,10 +12,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.asterix.external.feed.lifecycle;
+package edu.uci.ics.asterix.metadata.feeds;
 
 import java.util.Map;
 
+import edu.uci.ics.asterix.metadata.feeds.IFeedMessage.MessageType;
+
 /**
  * A feed control message containing the altered values for
  * adapter configuration parameters. This message is dispatched
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/BuiltinFeedPolicies.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/BuiltinFeedPolicies.java
new file mode 100644
index 0000000..9556ae0
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/BuiltinFeedPolicies.java
@@ -0,0 +1,83 @@
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import edu.uci.ics.asterix.metadata.bootstrap.MetadataConstants;
+import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
+
+public class BuiltinFeedPolicies {
+
+    public static final FeedPolicy MISSTION_CRITICAL = initializeMissionCriticalFeedPolicy();
+
+    public static final FeedPolicy ADVANCED = initializeAdvancedFeedPolicy();
+
+    public static final FeedPolicy BASIC_MONITORED = initializeBasicMonitoredPolicy();
+
+    public static final FeedPolicy BASIC = initializeBasicPolicy();
+
+    public static final FeedPolicy[] policies = new FeedPolicy[] { MISSTION_CRITICAL, ADVANCED, BASIC_MONITORED, BASIC };
+
+    public static final FeedPolicy DEFAULT_POLICY = BASIC_MONITORED;
+
+    public static final String CONFIG_FEED_POLICY_KEY = "policy";
+
+    public static FeedPolicy getFeedPolicy(String policyName) {
+        for (FeedPolicy policy : policies) {
+            if (policy.getPolicyName().equalsIgnoreCase(policyName)) {
+                return policy;
+            }
+        }
+        return null;
+    }
+
+    private static FeedPolicy initializeMissionCriticalFeedPolicy() {
+        Map<String, String> policyParams = new HashMap<String, String>();
+        policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_PERSIST_EXCEPTION, "true");
+        policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_CONTINUE_ON_EXCEPTION, "true");
+        policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_AUTO_RESTART, "true");
+        policyParams.put(FeedPolicyAccessor.STATISTICS_COLLECT, "true");
+        policyParams.put(FeedPolicyAccessor.STATISTICS_COLLECT_PERIOD, "60");
+        policyParams.put(FeedPolicyAccessor.STATISTICS_COLLECT_PERIOD_UNIT, FeedPolicyAccessor.TimeUnit.SEC.name());
+        policyParams.put(FeedPolicyAccessor.ELASTIC, "false");
+        String description = "MissionCritical";
+        return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "MissionCritical", description, policyParams);
+    }
+
+    private static FeedPolicy initializeBasicPolicy() {
+        Map<String, String> policyParams = new HashMap<String, String>();
+        policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_PERSIST_EXCEPTION, "false");
+        policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_CONTINUE_ON_EXCEPTION, "false");
+        policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_AUTO_RESTART, "false");
+        policyParams.put(FeedPolicyAccessor.STATISTICS_COLLECT, "false");
+        policyParams.put(FeedPolicyAccessor.ELASTIC, "false");
+        String description = "Basic";
+        return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "Basic", description, policyParams);
+    }
+
+    private static FeedPolicy initializeBasicMonitoredPolicy() {
+        Map<String, String> policyParams = new HashMap<String, String>();
+        policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_PERSIST_EXCEPTION, "true");
+        policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_CONTINUE_ON_EXCEPTION, "false");
+        policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_AUTO_RESTART, "false");
+        policyParams.put(FeedPolicyAccessor.STATISTICS_COLLECT, "true");
+        policyParams.put(FeedPolicyAccessor.STATISTICS_COLLECT_PERIOD, "5");
+        policyParams.put(FeedPolicyAccessor.STATISTICS_COLLECT_PERIOD_UNIT, FeedPolicyAccessor.TimeUnit.SEC.name());
+        policyParams.put(FeedPolicyAccessor.ELASTIC, "false");
+        String description = "BasicMonitored";
+        return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "BasicMonitored", description, policyParams);
+    }
+
+    private static FeedPolicy initializeAdvancedFeedPolicy() {
+        Map<String, String> policyParams = new HashMap<String, String>();
+        policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_PERSIST_EXCEPTION, "true");
+        policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_CONTINUE_ON_EXCEPTION, "true");
+        policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_AUTO_RESTART, "false");
+        policyParams.put(FeedPolicyAccessor.STATISTICS_COLLECT, "true");
+        policyParams.put(FeedPolicyAccessor.STATISTICS_COLLECT_PERIOD, "60");
+        policyParams.put(FeedPolicyAccessor.STATISTICS_COLLECT_PERIOD_UNIT, FeedPolicyAccessor.TimeUnit.SEC.name());
+        policyParams.put(FeedPolicyAccessor.ELASTIC, "false");
+        String description = "Advanced";
+        return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "Advanced", description, policyParams);
+    }
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ExternalDataScanOperatorDescriptor.java
similarity index 95%
rename from asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java
rename to asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ExternalDataScanOperatorDescriptor.java
index fb4cc99..d4ab54b 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ExternalDataScanOperatorDescriptor.java
@@ -12,12 +12,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.asterix.external.data.operator;
+package edu.uci.ics.asterix.metadata.feeds;
 
 import java.util.Map;
 
-import edu.uci.ics.asterix.external.adapter.factory.IGenericDatasetAdapterFactory;
-import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
 import edu.uci.ics.hyracks.api.constraints.IConstraintAcceptor;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/feed/lifecycle/FeedActivityIdFactory.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedActivityIdFactory.java
similarity index 90%
rename from asterix-external-data/src/main/java/edu/uci/ics/asterix/external/feed/lifecycle/FeedActivityIdFactory.java
rename to asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedActivityIdFactory.java
index b90f897..db1a62a 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/feed/lifecycle/FeedActivityIdFactory.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedActivityIdFactory.java
@@ -1,4 +1,4 @@
-package edu.uci.ics.asterix.external.feed.lifecycle;
+package edu.uci.ics.asterix.metadata.feeds;
 
 import java.util.concurrent.atomic.AtomicInteger;
 
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/feed/lifecycle/FeedId.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedId.java
similarity index 96%
rename from asterix-external-data/src/main/java/edu/uci/ics/asterix/external/feed/lifecycle/FeedId.java
rename to asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedId.java
index b1889ee..76ad775 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/feed/lifecycle/FeedId.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedId.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.asterix.external.feed.lifecycle;
+package edu.uci.ics.asterix.metadata.feeds;
 
 import java.io.Serializable;
 
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
new file mode 100644
index 0000000..b8dbefa
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.util.Map;
+
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+
+/**
+ * Operator responsible for ingesting data from an external source. This
+ * operator uses a (configurable) adapter associated with the feed dataset.
+ */
+public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    private final String adapterFactoryClassName;
+    private final Map<String, String> adapterConfiguration;
+    private final IAType atype;
+    private final FeedId feedId;
+    private final Map<String, String> feedPolicy;
+
+    private transient IAdapterFactory datasourceAdapterFactory;
+
+    public FeedIntakeOperatorDescriptor(JobSpecification spec, FeedId feedId, String adapter,
+            Map<String, String> arguments, ARecordType atype, RecordDescriptor rDesc, Map<String, String> feedPolicy) {
+        super(spec, 1, 1);
+        recordDescriptors[0] = rDesc;
+        this.adapterFactoryClassName = adapter;
+        this.adapterConfiguration = arguments;
+        this.atype = atype;
+        this.feedId = feedId;
+        this.feedPolicy = feedPolicy;
+    }
+
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
+            throws HyracksDataException {
+        ITypedDatasourceAdapter adapter;
+        try {
+            datasourceAdapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassName).newInstance();
+            if (datasourceAdapterFactory instanceof IGenericDatasetAdapterFactory) {
+                adapter = (ITypedDatasourceAdapter) ((IGenericDatasetAdapterFactory) datasourceAdapterFactory)
+                        .createAdapter(adapterConfiguration, atype);
+            } else if (datasourceAdapterFactory instanceof ITypedDatasetAdapterFactory) {
+                adapter = (ITypedDatasourceAdapter) ((ITypedDatasetAdapterFactory) datasourceAdapterFactory)
+                        .createAdapter(adapterConfiguration);
+            } else {
+                throw new IllegalStateException(" Unknown adapter factory type for " + adapterFactoryClassName);
+            }
+            adapter.initialize(ctx);
+        } catch (Exception e) {
+            throw new HyracksDataException("initialization of adapter failed", e);
+        }
+        return new FeedIntakeOperatorNodePushable(feedId, adapter, feedPolicy, partition);
+    }
+
+    public FeedId getFeedId() {
+        return feedId;
+    }
+}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
new file mode 100644
index 0000000..0e6249c
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
@@ -0,0 +1,128 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+
+/**
+ * The runtime for @see{FeedIntakeOperationDescriptor}
+ */
+public class FeedIntakeOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+
+    private final IDatasourceAdapter adapter;
+    private final int partition;
+    private final IFeedManager feedManager;
+    private final FeedId feedId;
+    private final LinkedBlockingQueue<IFeedMessage> inbox;
+    private FeedInboxMonitor feedInboxMonitor;
+    private final Map<String, String> feedPolicy;
+    private final FeedPolicyEnforcer policyEnforcer;
+
+    public FeedIntakeOperatorNodePushable(FeedId feedId, IDatasourceAdapter adapter, Map<String, String> feedPolicy,
+            int partition) {
+        this.adapter = adapter;
+        this.partition = partition;
+        this.feedManager = (IFeedManager) FeedManager.INSTANCE;
+        this.feedId = feedId;
+        inbox = new LinkedBlockingQueue<IFeedMessage>();
+        this.feedPolicy = feedPolicy;
+        policyEnforcer = new FeedPolicyEnforcer(feedId, feedPolicy);
+
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        if (adapter instanceof IManagedFeedAdapter) {
+            feedInboxMonitor = new FeedInboxMonitor((IManagedFeedAdapter) adapter, inbox, partition);
+            feedInboxMonitor.start();
+            feedManager.registerFeedMsgQueue(feedId, inbox);
+        }
+        writer.open();
+        try {
+            ((AbstractFeedDatasourceAdapter) adapter).setFeedPolicyEnforcer(policyEnforcer);
+            adapter.start(partition, writer);
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new HyracksDataException(e);
+        } finally {
+            writer.close();
+            if (adapter instanceof IManagedFeedAdapter) {
+                try {
+                    ((IManagedFeedAdapter) adapter).stop();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+                feedManager.unregisterFeedMsgQueue(feedId, inbox);
+            }
+        }
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        writer.close();
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        // do nothing
+    }
+
+    public Map<String, String> getFeedPolicy() {
+        return feedPolicy;
+    }
+}
+
+class FeedInboxMonitor extends Thread {
+
+    private LinkedBlockingQueue<IFeedMessage> inbox;
+    private final IManagedFeedAdapter adapter;
+
+    public FeedInboxMonitor(IManagedFeedAdapter adapter, LinkedBlockingQueue<IFeedMessage> inbox, int partition) {
+        this.inbox = inbox;
+        this.adapter = adapter;
+    }
+
+    @Override
+    public void run() {
+        while (true) {
+            try {
+                IFeedMessage feedMessage = inbox.take();
+                switch (feedMessage.getMessageType()) {
+                    case STOP:
+                        adapter.stop();
+                        break;
+                    case ALTER:
+                        adapter.alter(((AlterFeedMessage) feedMessage).getAlteredConfParams());
+                        break;
+                }
+            } catch (InterruptedException ie) {
+                break;
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FeedJobLifecycleListener.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedJobLifecycleListener.java
similarity index 89%
rename from asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FeedJobLifecycleListener.java
rename to asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedJobLifecycleListener.java
index 0d044fe..f7e1325 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FeedJobLifecycleListener.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedJobLifecycleListener.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.asterix.metadata.declared;
+package edu.uci.ics.asterix.metadata.feeds;
 
 import java.io.Serializable;
 import java.rmi.RemoteException;
@@ -25,8 +25,6 @@
 import java.util.concurrent.LinkedBlockingQueue;
 
 import edu.uci.ics.asterix.common.api.AsterixAppContextInfo;
-import edu.uci.ics.asterix.external.data.operator.FeedIntakeOperatorDescriptor;
-import edu.uci.ics.asterix.external.feed.lifecycle.FeedId;
 import edu.uci.ics.asterix.metadata.MetadataException;
 import edu.uci.ics.asterix.metadata.MetadataManager;
 import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
@@ -50,6 +48,8 @@
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.api.job.JobStatus;
 
+//import edu.uci.ics.hyracks.api.job.JobInfo;
+
 public class FeedJobLifecycleListener implements IJobLifecycleListener, Serializable {
 
     private static final long serialVersionUID = 1L;
@@ -82,9 +82,10 @@
 
     @Override
     public void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf) throws HyracksException {
+
         IActivityClusterGraphGenerator acgg = acggf.createActivityClusterGraphGenerator(jobId, AsterixAppContextInfo
                 .getInstance().getCCApplicationContext(), EnumSet.noneOf(JobFlag.class));
-        JobSpecification spec = acgg.getJobSpecification();
+        JobSpecification spec = acggf.getJobSpecification();
         boolean feedIngestionJob = false;
         FeedId feedId = null;
         for (IOperatorDescriptor opDesc : spec.getOperatorMap().values()) {
@@ -98,6 +99,7 @@
         if (feedIngestionJob) {
             feedJobNotificationHandler.registerFeed(feedId, jobId, spec);
         }
+
     }
 
     private static class Message {
@@ -160,6 +162,7 @@
         }
 
         private void handleJobStartMessage(FeedInfo feedInfo, Message message) {
+
             JobSpecification jobSpec = feedInfo.jobSpec;
 
             List<OperatorDescriptorId> ingestOperatorIds = new ArrayList<OperatorDescriptorId>();
@@ -184,9 +187,12 @@
                 IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
                 JobInfo info = hcc.getJobInfo(message.jobId);
 
+                Map<String, String> feedActivityDetails = new HashMap<String, String>();
+                StringBuilder ingestLocs = new StringBuilder();
                 for (OperatorDescriptorId ingestOpId : ingestOperatorIds) {
                     feedInfo.ingestLocations.addAll(info.getOperatorLocations().get(ingestOpId));
                 }
+                StringBuilder computeLocs = new StringBuilder();
                 for (OperatorDescriptorId computeOpId : computeOperatorIds) {
                     List<String> locations = info.getOperatorLocations().get(computeOpId);
                     if (locations != null) {
@@ -195,9 +201,21 @@
                         feedInfo.computeLocations.addAll(feedInfo.ingestLocations);
                     }
                 }
+
+                for (String ingestLoc : feedInfo.ingestLocations) {
+                    ingestLocs.append(ingestLoc);
+                    ingestLocs.append(",");
+                }
+                for (String computeLoc : feedInfo.computeLocations) {
+                    computeLocs.append(computeLoc);
+                    computeLocs.append(",");
+                }
+
+                feedActivityDetails.put(FeedActivity.FeedActivityDetails.INGEST_LOCATIONS, ingestLocs.toString());
+                feedActivityDetails.put(FeedActivity.FeedActivityDetails.COMPUTE_LOCATIONS, computeLocs.toString());
+
                 FeedActivity feedActivity = new FeedActivity(feedInfo.feedId.getDataverse(),
-                        feedInfo.feedId.getDataset(), FeedActivityType.FEED_BEGIN, feedInfo.ingestLocations,
-                        feedInfo.computeLocations);
+                        feedInfo.feedId.getDataset(), FeedActivityType.FEED_BEGIN, feedActivityDetails);
 
                 MetadataManager.INSTANCE.acquireWriteLatch();
                 MetadataTransactionContext mdTxnCtx = null;
@@ -214,9 +232,11 @@
             } catch (Exception e) {
                 // TODO Add Exception handling here
             }
+
         }
 
         private void handleJobFinishMessage(FeedInfo feedInfo, Message message) {
+
             MetadataManager.INSTANCE.acquireWriteLatch();
             MetadataTransactionContext mdTxnCtx = null;
 
@@ -224,16 +244,18 @@
                 IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
                 JobInfo info = hcc.getJobInfo(message.jobId);
                 JobStatus status = info.getPendingStatus();
-                Exception e;
+                List<Exception> exceptions;
                 boolean failure = status != null && status.equals(JobStatus.FAILURE);
                 FeedActivityType activityType = FeedActivityType.FEED_END;
+                Map<String, String> details = new HashMap<String, String>();
                 if (failure) {
-                    e = info.getPendingException();
+                    exceptions = info.getPendingExceptions();
                     activityType = FeedActivityType.FEED_FAILURE;
+                    details.put(FeedActivity.FeedActivityDetails.EXCEPTION_MESSAGE, exceptions.get(0).getMessage());
                 }
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 FeedActivity feedActivity = new FeedActivity(feedInfo.feedId.getDataverse(),
-                        feedInfo.feedId.getDataset(), activityType, feedInfo.ingestLocations, feedInfo.computeLocations);
+                        feedInfo.feedId.getDataset(), activityType, details);
                 MetadataManager.INSTANCE.registerFeedActivity(mdTxnCtx, new FeedId(feedInfo.feedId.getDataverse(),
                         feedInfo.feedId.getDataset()), feedActivity);
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -248,8 +270,8 @@
             } finally {
                 MetadataManager.INSTANCE.releaseWriteLatch();
             }
-        }
 
+        }
     }
 
     private static class FeedInfo {
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/feed/lifecycle/FeedManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
similarity index 91%
rename from asterix-external-data/src/main/java/edu/uci/ics/asterix/external/feed/lifecycle/FeedManager.java
rename to asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
index 8314267..eb47287 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/feed/lifecycle/FeedManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.asterix.external.feed.lifecycle;
+package edu.uci.ics.asterix.metadata.feeds;
 
 import java.util.HashMap;
 import java.util.HashSet;
@@ -21,6 +21,9 @@
 import java.util.concurrent.LinkedBlockingQueue;
 
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.metadata.feeds.FeedId;
+import edu.uci.ics.asterix.metadata.feeds.IFeedManager;
+import edu.uci.ics.asterix.metadata.feeds.IFeedMessage;
 
 /**
  * Handle (de-)registration of feeds for delivery of control messages.
@@ -43,7 +46,7 @@
                 for (LinkedBlockingQueue<IFeedMessage> queue : operatorQueues) {
                     queue.put(feedMessage);
                 }
-            } 
+            }
         } catch (Exception e) {
             throw new AsterixException(e);
         }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/feed/lifecycle/FeedMessage.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessage.java
similarity index 91%
rename from asterix-external-data/src/main/java/edu/uci/ics/asterix/external/feed/lifecycle/FeedMessage.java
rename to asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessage.java
index af84d4f..68c2017 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/feed/lifecycle/FeedMessage.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessage.java
@@ -12,7 +12,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.asterix.external.feed.lifecycle;
+package edu.uci.ics.asterix.metadata.feeds;
+
+import edu.uci.ics.asterix.metadata.feeds.IFeedMessage;
 
 /**
  * A control message that can be sent to the runtime instance of a 
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedMessageOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorDescriptor.java
similarity index 91%
rename from asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedMessageOperatorDescriptor.java
rename to asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorDescriptor.java
index d0dc5ca..bbd8869 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedMessageOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorDescriptor.java
@@ -12,12 +12,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.asterix.external.data.operator;
+package edu.uci.ics.asterix.metadata.feeds;
 
 import java.util.List;
 
-import edu.uci.ics.asterix.external.feed.lifecycle.FeedId;
-import edu.uci.ics.asterix.external.feed.lifecycle.IFeedMessage;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedMessageOperatorNodePushable.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
similarity index 86%
rename from asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedMessageOperatorNodePushable.java
rename to asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
index d03eeaa..b51ffed 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedMessageOperatorNodePushable.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
@@ -12,15 +12,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.asterix.external.data.operator;
+package edu.uci.ics.asterix.metadata.feeds;
 
 import java.util.ArrayList;
 import java.util.List;
 
-import edu.uci.ics.asterix.external.feed.lifecycle.FeedId;
-import edu.uci.ics.asterix.external.feed.lifecycle.FeedManager;
-import edu.uci.ics.asterix.external.feed.lifecycle.IFeedManager;
-import edu.uci.ics.asterix.external.feed.lifecycle.IFeedMessage;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedPolicyAccessor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedPolicyAccessor.java
new file mode 100644
index 0000000..3e16877
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedPolicyAccessor.java
@@ -0,0 +1,81 @@
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.util.Map;
+
+public class FeedPolicyAccessor {
+    public static final String APPLICATION_FAILURE_PERSIST_EXCEPTION = "software.failure.persist.exception";
+    public static final String APPLICATION_FAILURE_CONTINUE_ON_EXCEPTION = "software.failure.continue.on.exception";
+    public static final String HARDWARE_FAILURE_AUTO_RESTART = "hardware.failure.auto.restart";
+    public static final String STATISTICS_COLLECT = "statistics.collect";
+    public static final String STATISTICS_COLLECT_PERIOD = "statistics.collect.period";
+    public static final String STATISTICS_COLLECT_PERIOD_UNIT = "statistics.collect.period.unit";
+    public static final String ELASTIC = "elastic";
+
+    public enum TimeUnit {
+        SEC,
+        MIN,
+        HRS,
+        DAYS
+    }
+
+    private Map<String, String> feedPolicy;
+
+    public FeedPolicyAccessor(Map<String, String> feedPolicy) {
+        this.feedPolicy = feedPolicy;
+    }
+
+    public boolean persistExceptionDetailsOnApplicationFailure() {
+        return getBooleanPropertyValue(APPLICATION_FAILURE_PERSIST_EXCEPTION);
+    }
+
+    public boolean continueOnApplicationFailure() {
+        return getBooleanPropertyValue(APPLICATION_FAILURE_CONTINUE_ON_EXCEPTION);
+    }
+
+    public boolean autoRestartOnHardwareFailure() {
+        return getBooleanPropertyValue(HARDWARE_FAILURE_AUTO_RESTART);
+    }
+
+    public boolean collectStatistics() {
+        return getBooleanPropertyValue(STATISTICS_COLLECT);
+    }
+
+    public long getStatisicsCollectionPeriodInSecs() {
+        return getIntegerPropertyValue(STATISTICS_COLLECT_PERIOD) * getTimeUnitFactor();
+    }
+
+    public boolean isElastic() {
+        return getBooleanPropertyValue(ELASTIC);
+    }
+
+    private int getTimeUnitFactor() {
+        String v = feedPolicy.get(STATISTICS_COLLECT_PERIOD_UNIT);
+        int factor = 1;
+        switch (TimeUnit.valueOf(v)) {
+            case SEC:
+                factor = 1;
+                break;
+            case MIN:
+                factor = 60;
+                break;
+            case HRS:
+                factor = 3600;
+                break;
+            case DAYS:
+                factor = 216000;
+                break;
+
+        }
+        return factor;
+    }
+
+    private boolean getBooleanPropertyValue(String key) {
+        String v = feedPolicy.get(key);
+        return v == null ? false : Boolean.valueOf(v);
+    }
+
+    private int getIntegerPropertyValue(String key) {
+        String v = feedPolicy.get(key);
+        return Integer.parseInt(v);
+    }
+}
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedPolicyEnforcer.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedPolicyEnforcer.java
new file mode 100644
index 0000000..7eeee4c
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedPolicyEnforcer.java
@@ -0,0 +1,64 @@
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.rmi.RemoteException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.asterix.metadata.MetadataManager;
+import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
+import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
+
+public class FeedPolicyEnforcer {
+
+    private final FeedId feedId;
+    private final FeedPolicyAccessor feedPolicyAccessor;
+    private final FeedActivity feedActivity;
+
+    public FeedPolicyEnforcer(FeedId feedId, Map<String, String> feedPolicy) {
+        this.feedId = feedId;
+        this.feedPolicyAccessor = new FeedPolicyAccessor(feedPolicy);
+        this.feedActivity = new FeedActivity(feedId.getDataverse(), feedId.getDataset(), null,
+                new HashMap<String, String>());
+    }
+
+    public boolean handleSoftwareFailure(Exception e) throws RemoteException, ACIDException {
+        boolean continueIngestion = feedPolicyAccessor.continueOnApplicationFailure();
+        if (feedPolicyAccessor.persistExceptionDetailsOnApplicationFailure()) {
+            persistExceptionDetails(e);
+        }
+        return continueIngestion;
+    }
+
+    private synchronized void persistExceptionDetails(Exception e) throws RemoteException, ACIDException {
+        MetadataManager.INSTANCE.acquireWriteLatch();
+        MetadataTransactionContext ctx = null;
+        try {
+            ctx = MetadataManager.INSTANCE.beginTransaction();
+            feedActivity.setActivityType(FeedActivityType.FEED_FAILURE);
+            feedActivity.getFeedActivityDetails().put(FeedActivity.FeedActivityDetails.EXCEPTION_MESSAGE,
+                    e.getMessage());
+            MetadataManager.INSTANCE.registerFeedActivity(ctx, feedId, feedActivity);
+            MetadataManager.INSTANCE.commitTransaction(ctx);
+        } catch (Exception e2) {
+            MetadataManager.INSTANCE.abortTransaction(ctx);
+        } finally {
+            MetadataManager.INSTANCE.releaseWriteLatch();
+        }
+    }
+
+    public void handleHardwareFailure(List<String> nodeId) {
+
+    }
+
+    public FeedPolicyAccessor getFeedPolicyAccessor() {
+        return feedPolicyAccessor;
+    }
+
+    public FeedId getFeedId() {
+        return feedId;
+    }
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IAdapterFactory.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IAdapterFactory.java
similarity index 94%
rename from asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IAdapterFactory.java
rename to asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IAdapterFactory.java
index 45fd6cf..e5f7e97 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IAdapterFactory.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IAdapterFactory.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.asterix.external.adapter.factory;
+package edu.uci.ics.asterix.metadata.feeds;
 
 /**
  * Base interface for IGenericDatasetAdapterFactory and ITypedDatasetAdapterFactory.
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IDatasourceAdapter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IDatasourceAdapter.java
similarity index 98%
rename from asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IDatasourceAdapter.java
rename to asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IDatasourceAdapter.java
index b0dc32f..5d44dbd 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IDatasourceAdapter.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IDatasourceAdapter.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.asterix.external.dataset.adapter;
+package edu.uci.ics.asterix.metadata.feeds;
 
 import java.io.Serializable;
 import java.util.Map;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/feed/lifecycle/IFeedManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedManager.java
similarity index 97%
rename from asterix-external-data/src/main/java/edu/uci/ics/asterix/external/feed/lifecycle/IFeedManager.java
rename to asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedManager.java
index 587d5a7..6e6e1ac 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/feed/lifecycle/IFeedManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedManager.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.asterix.external.feed.lifecycle;
+package edu.uci.ics.asterix.metadata.feeds;
 
 import java.util.concurrent.LinkedBlockingQueue;
 
@@ -56,4 +56,5 @@
      * @throws Exception
      */
     public void deliverMessage(FeedId feedId, IFeedMessage feedMessage) throws AsterixException;
+
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/feed/lifecycle/IFeedMessage.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedMessage.java
similarity index 93%
rename from asterix-external-data/src/main/java/edu/uci/ics/asterix/external/feed/lifecycle/IFeedMessage.java
rename to asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedMessage.java
index 9e1e907..dc50071 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/feed/lifecycle/IFeedMessage.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedMessage.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.asterix.external.feed.lifecycle;
+package edu.uci.ics.asterix.metadata.feeds;
 
 import java.io.Serializable;
 
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IGenericDatasetAdapterFactory.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IGenericDatasetAdapterFactory.java
similarity index 92%
rename from asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IGenericDatasetAdapterFactory.java
rename to asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IGenericDatasetAdapterFactory.java
index 093a3dd..8f8f496 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IGenericDatasetAdapterFactory.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IGenericDatasetAdapterFactory.java
@@ -12,11 +12,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.asterix.external.adapter.factory;
+package edu.uci.ics.asterix.metadata.feeds;
 
 import java.util.Map;
 
-import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
 import edu.uci.ics.asterix.om.types.IAType;
 
 /**
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/managed/adapter/IManagedFeedAdapter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IManagedFeedAdapter.java
similarity index 93%
rename from asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/managed/adapter/IManagedFeedAdapter.java
rename to asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IManagedFeedAdapter.java
index 6f993ae..51a3272 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/managed/adapter/IManagedFeedAdapter.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IManagedFeedAdapter.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.asterix.feed.managed.adapter;
+package edu.uci.ics.asterix.metadata.feeds;
 
 import java.util.Map;
 
@@ -27,7 +27,7 @@
      * 
      * @throws Exception
      */
-    public void stop();
+    public void stop() throws Exception;
 
     /**
      * Modify the adapter configuration parameters. This method is called
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/ITypedDatasetAdapterFactory.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ITypedDatasetAdapterFactory.java
similarity index 91%
rename from asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/ITypedDatasetAdapterFactory.java
rename to asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ITypedDatasetAdapterFactory.java
index 0f9978e..38e4fe4 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/ITypedDatasetAdapterFactory.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ITypedDatasetAdapterFactory.java
@@ -12,12 +12,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.asterix.external.adapter.factory;
+package edu.uci.ics.asterix.metadata.feeds;
 
 import java.util.Map;
 
-import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
-
 /**
  * A base interface for an adapter factory that creates instance of an adapter kind that
  * is 'typed' in nature. A 'typed' adapter returns records with a pre-defined datatype.
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/ITypedDatasourceAdapter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ITypedDatasourceAdapter.java
similarity index 89%
rename from asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/ITypedDatasourceAdapter.java
rename to asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ITypedDatasourceAdapter.java
index 3a4b97b..226ac0f 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/ITypedDatasourceAdapter.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ITypedDatasourceAdapter.java
@@ -12,8 +12,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.asterix.external.dataset.adapter;
+package edu.uci.ics.asterix.metadata.feeds;
 
+import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
 import edu.uci.ics.asterix.om.types.ARecordType;
 
 /**
diff --git a/asterix-tools/pom.xml b/asterix-tools/pom.xml
index 248cdb5..f9c4219 100644
--- a/asterix-tools/pom.xml
+++ b/asterix-tools/pom.xml
@@ -134,6 +134,12 @@
 			<scope>compile</scope>
 		</dependency>
 		<dependency>
+			<groupId>edu.uci.ics.asterix</groupId>
+			<artifactId>asterix-external-data</artifactId>
+			<version>0.0.6-SNAPSHOT</version>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
 			<groupId>org.apache.httpcomponents</groupId>
 			<artifactId>httpclient</artifactId>
 			<version>4.2.2</version>
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/DataGenerator.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/DataGenerator.java
index 8996a85..39537a8 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/DataGenerator.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/DataGenerator.java
@@ -10,7 +10,6 @@
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
-import java.util.Calendar;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
index a522336..4ffe61a 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
@@ -19,10 +19,10 @@
 import java.util.Map;
 
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.external.adapter.factory.IGenericDatasetAdapterFactory;
 import edu.uci.ics.asterix.external.dataset.adapter.FileSystemBasedAdapter;
-import edu.uci.ics.asterix.external.dataset.adapter.ITypedDatasourceAdapter;
-import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IGenericDatasetAdapterFactory;
+import edu.uci.ics.asterix.metadata.feeds.IManagedFeedAdapter;
+import edu.uci.ics.asterix.metadata.feeds.ITypedDatasourceAdapter;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.ATypeTag;
 import edu.uci.ics.asterix.runtime.operators.file.ADMDataParser;
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
index 6c32acb..5b5bb47 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
@@ -16,8 +16,8 @@
 
 import java.util.Map;
 
-import edu.uci.ics.asterix.external.adapter.factory.IGenericDatasetAdapterFactory;
-import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IGenericDatasetAdapterFactory;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.IAType;
 
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java
index f66d9e1..c8dcf20 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java
@@ -75,7 +75,7 @@
 
     @Override
     public IPullBasedFeedClient getFeedClient(int partition) throws Exception {
-        return new OkSyntheticTwitterFeedClient(configuration, adapterOutputType, partition);
+        return new SyntheticTwitterFeedClient(configuration, adapterOutputType, partition);
     }
 
     @Override
@@ -83,24 +83,26 @@
         return new AlgebricksCountPartitionConstraint(1);
     }
 
-    private static class OkSyntheticTwitterFeedClient extends PullBasedFeedClient implements IPullBasedFeedClient {
+    private static class SyntheticTwitterFeedClient extends PullBasedFeedClient implements IPullBasedFeedClient {
 
-        private static final Logger LOGGER = Logger.getLogger(OkSyntheticTwitterFeedClient.class.getName());
+        private static final Logger LOGGER = Logger.getLogger(SyntheticTwitterFeedClient.class.getName());
 
         public static final String KEY_DURATION = "duration";
         public static final String KEY_TPS = "tps";
+        public static final String KEY_EXCEPTION_PERIOD = "exception-period";
 
         private int duration;
         private long tweetInterval;
         private int numTweetsBeforeDelay;
         private TweetMessageIterator tweetIterator = null;
+        private long exeptionInterval;
 
         private IAObject[] mutableFields;
         private ARecordType outputRecordType;
         private int partition;
         private int tweetCount = 0;
 
-        public OkSyntheticTwitterFeedClient(Map<String, String> configuration, ARecordType outputRecordType,
+        public SyntheticTwitterFeedClient(Map<String, String> configuration, ARecordType outputRecordType,
                 int partition) throws AsterixException {
             this.outputRecordType = outputRecordType;
             String value = configuration.get(KEY_DURATION);
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java
index 5883a62..897091c 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2009-2012 by The Regents of the University of California
+x * Copyright 2009-2012 by The Regents of the University of California
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
@@ -16,8 +16,8 @@
 
 import java.util.Map;
 
-import edu.uci.ics.asterix.external.adapter.factory.ITypedDatasetAdapterFactory;
-import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.ITypedDatasetAdapterFactory;
 
 /**
  * Factory class for creating @see{RateControllerFileSystemBasedAdapter} The