checkpoint
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
index ab3516a..d7c65c8 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
@@ -31,6 +31,7 @@
 import edu.uci.ics.asterix.optimizer.rules.InlineUnnestFunctionRule;
 import edu.uci.ics.asterix.optimizer.rules.IntroduceDynamicTypeCastRule;
 import edu.uci.ics.asterix.optimizer.rules.IntroduceEnforcedListTypeRule;
+import edu.uci.ics.asterix.optimizer.rules.IntroduceFeedInterceptOperatorRule;
 import edu.uci.ics.asterix.optimizer.rules.IntroduceInstantLockSearchCallbackRule;
 import edu.uci.ics.asterix.optimizer.rules.IntroduceRapidFrameFlushProjectRule;
 import edu.uci.ics.asterix.optimizer.rules.IntroduceSecondaryIndexInsertDeleteRule;
@@ -209,6 +210,7 @@
         accessMethod.add(new RemoveUnusedOneToOneEquiJoinRule());
         accessMethod.add(new PushSimilarityFunctionsBelowJoin());
         accessMethod.add(new RemoveUnusedAssignAndAggregateRule());
+        accessMethod.add(new IntroduceFeedInterceptOperatorRule());
         return accessMethod;
     }
 
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/UnnestToDataScanRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/UnnestToDataScanRule.java
index cfb767f..2953e42 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/UnnestToDataScanRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/UnnestToDataScanRule.java
@@ -179,6 +179,54 @@
 
                 return true;
             }
+
+            if (fid.equals(AsterixBuiltinFunctions.FEED_INTERCEPT)) {
+                if (unnest.getPositionalVariable() != null) {
+                    throw new AlgebricksException("No positional variables are allowed over datasets.");
+                }
+                ILogicalExpression expr = f.getArguments().get(0).getValue();
+                if (expr.getExpressionTag() != LogicalExpressionTag.CONSTANT) {
+                    return false;
+                }
+                ConstantExpression ce = (ConstantExpression) expr;
+                IAlgebricksConstantValue acv = ce.getValue();
+                if (!(acv instanceof AsterixConstantValue)) {
+                    return false;
+                }
+                AsterixConstantValue acv2 = (AsterixConstantValue) acv;
+                if (acv2.getObject().getType().getTypeTag() != ATypeTag.STRING) {
+                    return false;
+                }
+                String datasetArg = ((AString) acv2.getObject()).getStringValue();
+
+                AqlMetadataProvider metadataProvider = (AqlMetadataProvider) context.getMetadataProvider();
+                Pair<String, String> datasetReference = parseDatasetReference(metadataProvider, datasetArg);
+                String dataverseName = datasetReference.first;
+                String datasetName = datasetReference.second;
+                Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName);
+                if (dataset == null) {
+                    throw new AlgebricksException("Could not find dataset " + datasetName);
+                }
+
+                if (dataset.getDatasetType() != DatasetType.FEED) {
+                    throw new IllegalArgumentException("invalid dataset type:" + dataset.getDatasetType());
+                }
+
+                AqlSourceId asid = new AqlSourceId(dataverseName, datasetName);
+                ArrayList<LogicalVariable> v = new ArrayList<LogicalVariable>();
+                v.add(unnest.getVariable());
+
+                DataSourceScanOperator scan = new DataSourceScanOperator(v, createDummyFeedInterceptDataSource(asid,
+                        dataset, metadataProvider));
+
+                List<Mutable<ILogicalOperator>> scanInpList = scan.getInputs();
+                scanInpList.addAll(unnest.getInputs());
+                opRef.setValue(scan);
+                addPrimaryKey(v, context);
+                context.computeAndSetTypeEnvironmentForOperator(scan);
+
+                return true;
+            }
         }
 
         return false;
@@ -208,6 +256,20 @@
         return extDataSource;
     }
 
+    private AqlDataSource createDummyFeedInterceptDataSource(AqlSourceId aqlId, Dataset dataset,
+            AqlMetadataProvider metadataProvider) throws AlgebricksException {
+        if (!aqlId.getDataverseName().equals(
+                metadataProvider.getDefaultDataverse() == null ? null : metadataProvider.getDefaultDataverse()
+                        .getDataverseName())) {
+            return null;
+        }
+        String tName = dataset.getItemTypeName();
+        IAType itemType = metadataProvider.findType(dataset.getDataverseName(), tName);
+        ExternalFeedDataSource extDataSource = new ExternalFeedDataSource(aqlId, dataset, itemType,
+                AqlDataSource.AqlDataSourceType.FEED_INTERCEPT);
+        return extDataSource;
+    }
+
     private Pair<String, String> parseDatasetReference(AqlMetadataProvider metadataProvider, String datasetArg)
             throws AlgebricksException {
         String[] datasetNameComponents = datasetArg.split("\\.");
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index 2292cdf..f1d1dfa 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -94,6 +94,7 @@
 import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
 import edu.uci.ics.asterix.metadata.entities.NodeGroup;
 import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies;
+import edu.uci.ics.asterix.metadata.feeds.FeedUtil;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.ATypeTag;
 import edu.uci.ics.asterix.om.types.IAType;
@@ -639,7 +640,7 @@
             if (ds.getDatasetType().equals(DatasetType.FEED)) {
                 FeedActivity fa = MetadataManager.INSTANCE.getRecentFeedActivity(mdTxnCtx, dataverseName, datasetName,
                         null);
-                boolean activeFeed = FeedOperations.isFeedActive(fa);
+                boolean activeFeed = FeedUtil.isFeedActive(fa);
                 if (activeFeed) {
                     throw new AsterixException("Feed " + datasetName + " is currently " + FeedState.ACTIVE + "."
                             + " Operation not supported.");
@@ -923,7 +924,7 @@
             if (ds.getDatasetType().equals(DatasetType.FEED)) {
                 FeedActivity fa = MetadataManager.INSTANCE.getRecentFeedActivity(mdTxnCtx, dataverseName, datasetName,
                         null);
-                boolean activeFeed = FeedOperations.isFeedActive(fa);
+                boolean activeFeed = FeedUtil.isFeedActive(fa);
                 if (activeFeed) {
                     throw new AsterixException("Feed " + datasetName + " is currently " + FeedState.ACTIVE + "."
                             + " Operation not supported.");
@@ -1041,7 +1042,7 @@
             if (ds.getDatasetType().equals(DatasetType.FEED)) {
                 FeedActivity fa = MetadataManager.INSTANCE.getRecentFeedActivity(mdTxnCtx, dataverseName, datasetName,
                         null);
-                boolean activeFeed = FeedOperations.isFeedActive(fa);
+                boolean activeFeed = FeedUtil.isFeedActive(fa);
                 if (activeFeed) {
                     throw new AsterixException("Feed " + datasetName + " is currently " + FeedState.ACTIVE + "."
                             + " Operation not supported.");
@@ -1417,7 +1418,7 @@
 
             FeedActivity recentActivity = MetadataManager.INSTANCE.getRecentFeedActivity(mdTxnCtx, dataverseName, bfs
                     .getDatasetName().getValue(), null);
-            boolean isFeedActive = FeedOperations.isFeedActive(recentActivity);
+            boolean isFeedActive = FeedUtil.isFeedActive(recentActivity);
             if (isFeedActive && !bfs.isForceBegin()) {
                 throw new AsterixException("Feed " + bfs.getDatasetName().getValue()
                         + " is currently ACTIVE. Operation not supported");
@@ -1480,7 +1481,7 @@
             FeedActivity feedActivity = MetadataManager.INSTANCE.getRecentFeedActivity(mdTxnCtx, dataverseName,
                     datasetName, FeedActivityType.FEED_BEGIN, FeedActivityType.FEED_RESUME);
 
-            boolean isFeedActive = FeedOperations.isFeedActive(feedActivity);
+            boolean isFeedActive = FeedUtil.isFeedActive(feedActivity);
             if (!isFeedActive) {
                 throw new AsterixException("Feed " + cfs.getDatasetName().getValue()
                         + " is currently INACTIVE. Operation not supported");
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java
index 024e660..503f565 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java
@@ -71,10 +71,7 @@
         }
     }
 
-    public static boolean isFeedActive(FeedActivity feedActivity) {
-        return (feedActivity != null && (!feedActivity.getActivityType().equals(FeedActivityType.FEED_END) && !feedActivity
-                .getActivityType().equals(FeedActivityType.FEED_FAILURE)));
-    }
+  
 
     private static JobSpecification createSendMessageToFeedJobSpec(CompiledControlFeedStatement controlFeedStatement,
             AqlMetadataProvider metadataProvider, FeedActivity feedActivity) throws AsterixException {
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
index e1eb8c4..921e93e 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
@@ -126,8 +126,6 @@
     @Override
     public void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf) throws HyracksException {
 
-        IActivityClusterGraphGenerator acgg = acggf.createActivityClusterGraphGenerator(jobId, AsterixAppContextInfo
-                .getInstance().getCCApplicationContext(), EnumSet.noneOf(JobFlag.class));
         JobSpecification spec = acggf.getJobSpecification();
         boolean feedIngestionJob = false;
         FeedId feedId = null;
@@ -143,6 +141,10 @@
         }
         if (feedIngestionJob) {
             feedJobNotificationHandler.registerFeed(feedId, jobId, spec, feedPolicy);
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Registered feed: " + feedId + " ingestion policy "
+                        + feedPolicy.get(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY));
+            }
         }
 
     }
@@ -316,10 +318,8 @@
         }
 
         private void handleJobFinishMessage(FeedInfo feedInfo, Message message) {
-
             MetadataManager.INSTANCE.acquireWriteLatch();
             MetadataTransactionContext mdTxnCtx = null;
-
             try {
                 IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
                 JobInfo info = hcc.getJobInfo(message.jobId);
@@ -350,7 +350,6 @@
             } finally {
                 MetadataManager.INSTANCE.releaseWriteLatch();
             }
-
         }
     }
 
@@ -369,6 +368,19 @@
             this.feedPolicy = feedPolicy;
         }
 
+        @Override
+        public boolean equals(Object o) {
+            if (!(o instanceof FeedInfo)) {
+                return false;
+            }
+            return ((FeedInfo) o).feedId.equals(feedId);
+        }
+
+        @Override
+        public int hashCode() {
+            return feedId.hashCode();
+        }
+
     }
 
     @Override
@@ -540,6 +552,7 @@
         FeedActivity fa = null;
         Map<String, String> feedActivityDetails = new HashMap<String, String>();
         StringBuilder builder = new StringBuilder();
+        MetadataManager.INSTANCE.acquireWriteLatch();
         try {
             ctx = MetadataManager.INSTANCE.beginTransaction();
             for (Entry<FeedInfo, List<FeedFailure>> entry : failureReport.failures.entrySet()) {
@@ -564,6 +577,8 @@
                     throw new IllegalStateException("Unable to abort transaction " + e2);
                 }
             }
+        } finally {
+            MetadataManager.INSTANCE.releaseWriteLatch();
         }
     }
 
diff --git a/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java b/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
index b1bf69d..801082b 100644
--- a/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
+++ b/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
@@ -64,7 +64,7 @@
 
         // TODO: Uncomment when hadoop version is upgraded and adapters are
         // ported. 
-        HDFSCluster.getInstance().setup();
+    //    HDFSCluster.getInstance().setup();
 
         // Set the node resolver to be the identity resolver that expects node names 
         // to be node controller ids; a valid assumption in test environment. 
diff --git a/asterix-app/src/test/resources/hadoop/conf/core-site.xml b/asterix-app/src/test/resources/hadoop/conf/core-site.xml
index 5b1023c..433325a 100644
--- a/asterix-app/src/test/resources/hadoop/conf/core-site.xml
+++ b/asterix-app/src/test/resources/hadoop/conf/core-site.xml
@@ -21,7 +21,7 @@
 
 <property>
     <name>fs.default.name</name>
-    <value>hdfs://127.0.0.1:31888</value>
+    <value>hdfs://192.168.0.103:31888</value>
 </property>
 <property>
     <name>hadoop.tmp.dir</name>
diff --git a/asterix-events/src/main/java/edu/uci/ics/asterix/event/service/AsterixEventServiceUtil.java b/asterix-events/src/main/java/edu/uci/ics/asterix/event/service/AsterixEventServiceUtil.java
index b9ed170..68d03bf 100644
--- a/asterix-events/src/main/java/edu/uci/ics/asterix/event/service/AsterixEventServiceUtil.java
+++ b/asterix-events/src/main/java/edu/uci/ics/asterix/event/service/AsterixEventServiceUtil.java
@@ -307,8 +307,9 @@
         JAXBContext ctx = JAXBContext.newInstance(AsterixConfiguration.class);
         Marshaller marshaller = ctx.createMarshaller();
         marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);
-        marshaller.marshal(configuration, new FileOutputStream(asterixConfDir + File.separator
-                + ASTERIX_CONFIGURATION_FILE));
+        FileOutputStream os = new FileOutputStream(asterixConfDir + File.separator + ASTERIX_CONFIGURATION_FILE);
+        marshaller.marshal(configuration, os);
+        os.close();
     }
 
     private static void writeAsterixLogConfigurationFile(AsterixInstance asterixInstance, Properties logProperties)
diff --git a/asterix-events/src/main/java/edu/uci/ics/asterix/event/service/ZooKeeperService.java b/asterix-events/src/main/java/edu/uci/ics/asterix/event/service/ZooKeeperService.java
index 7063c76..5c059bb 100644
--- a/asterix-events/src/main/java/edu/uci/ics/asterix/event/service/ZooKeeperService.java
+++ b/asterix-events/src/main/java/edu/uci/ics/asterix/event/service/ZooKeeperService.java
@@ -109,7 +109,7 @@
             msg.append("1) Managix is incorrectly configured. Please run " + "managix validate"
                     + " to run a validation test and correct the errors reported.");
             msg.append("\n2) If validation in (1) is successful, ensure that java_home parameter is set correctly in Managix configuration ("
-                    + null + File.separator + AsterixEventServiceUtil.MANAGIX_CONF_XML + ")");
+                    + AsterixEventServiceUtil.MANAGIX_CONF_XML + ")");
             throw new Exception(msg.toString());
         }
         msgQ.take();
diff --git a/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/VersionCommand.java b/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/VersionCommand.java
index 50e13ac..041839d 100644
--- a/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/VersionCommand.java
+++ b/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/VersionCommand.java
@@ -23,7 +23,7 @@
 
     @Override
     protected void execCommand() throws Exception {
-        InstallerDriver.initConfig(true);
+        InstallerDriver.initConfig(false);
         String asterixZipName = AsterixEventService.getAsterixZip().substring(
                 AsterixEventService.getAsterixZip().lastIndexOf(File.separator) + 1);
         String asterixVersion = asterixZipName.substring("asterix-server-".length(),
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java
index e3e4cb7..d9c9d2f 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java
@@ -60,7 +60,8 @@
         INTERNAL,
         FEED,
         EXTERNAL,
-        EXTERNAL_FEED
+        EXTERNAL_FEED,
+        FEED_INTERCEPT
     }
 
     public AqlDataSource(AqlSourceId id, Dataset dataset, IAType itemType, AqlDataSourceType datasourceType)
@@ -76,6 +77,7 @@
                     initInternalDataset(itemType);
                     break;
                 }
+                case FEED_INTERCEPT:
                 case EXTERNAL_FEED:
                 case EXTERNAL: {
                     initExternalDataset(itemType);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 4eeb072..b3dbf48 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -52,7 +52,9 @@
 import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
 import edu.uci.ics.asterix.metadata.entities.FeedActivity;
 import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityDetails;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
 import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails;
+import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails.FeedState;
 import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
 import edu.uci.ics.asterix.metadata.entities.Index;
 import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
@@ -60,7 +62,9 @@
 import edu.uci.ics.asterix.metadata.feeds.ExternalDataScanOperatorDescriptor;
 import edu.uci.ics.asterix.metadata.feeds.FeedId;
 import edu.uci.ics.asterix.metadata.feeds.FeedIntakeOperatorDescriptor;
+import edu.uci.ics.asterix.metadata.feeds.FeedInterceptScanOperatorDescriptor;
 import edu.uci.ics.asterix.metadata.feeds.FeedMessageOperatorDescriptor;
+import edu.uci.ics.asterix.metadata.feeds.FeedUtil;
 import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
 import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory.SupportedOperation;
 import edu.uci.ics.asterix.metadata.feeds.IFeedMessage;
@@ -323,11 +327,56 @@
         String itemTypeName = dataset.getItemTypeName();
         IAType itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getDataverseName(), itemTypeName)
                 .getDatatype();
-        if (dataSource instanceof ExternalFeedDataSource) {
-            return buildFeedIntakeRuntime(jobSpec, dataset, dataSource);
-        } else {
-            return buildExternalDataScannerRuntime(jobSpec, itemType,
-                    (ExternalDatasetDetails) dataset.getDatasetDetails(), NonTaggedDataFormat.INSTANCE);
+        switch (((AqlDataSource) dataSource).getDatasourceType()) {
+            case EXTERNAL_FEED:
+                return buildFeedIntakeRuntime(jobSpec, dataset, dataSource);
+            case EXTERNAL:
+                return buildExternalDataScannerRuntime(jobSpec, itemType,
+                        (ExternalDatasetDetails) dataset.getDatasetDetails(), NonTaggedDataFormat.INSTANCE);
+            case FEED_INTERCEPT:
+                return buildFeedInterceptRuntime(jobSpec, dataset, dataSource);
+            default:
+                throw new IllegalStateException("Unknown aql datasource type: "
+                        + ((AqlDataSource) dataSource).getDatasourceType());
+        }
+    }
+
+    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildFeedInterceptRuntime(
+            JobSpecification jobSpec, Dataset dataset, IDataSource<AqlSourceId> dataSource) throws AlgebricksException {
+        FeedActivity feedActivity = null;
+        try {
+            feedActivity = MetadataManager.INSTANCE.getRecentFeedActivity(mdTxnCtx, dataset.getDataverseName(),
+                    dataset.getDatasetName(), null);
+            if (!FeedUtil.isFeedActive(feedActivity)) {
+                throw new AlgebricksException("Source feed " + dataset.getDataverseName() + ":"
+                        + dataset.getDatasetName() + " is not " + FeedState.ACTIVE);
+            }
+            String dataTypeName = dataset.getItemTypeName();
+            Datatype datatype = MetadataManager.INSTANCE
+                    .getDatatype(mdTxnCtx, dataset.getDataverseName(), dataTypeName);
+            ISerializerDeserializer payloadSerde = NonTaggedDataFormat.INSTANCE.getSerdeProvider()
+                    .getSerializerDeserializer(datatype.getDatatype());
+
+            RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
+
+            FeedInterceptScanOperatorDescriptor feedInterceptScan = new FeedInterceptScanOperatorDescriptor(jobSpec,
+                    scannerDesc, new FeedId(dataset.getDataverseName(), dataset.getDatasetName()));
+
+            FeedActivity beginFeedActivity = MetadataManager.INSTANCE.getRecentFeedActivity(mdTxnCtx,
+                    dataset.getDataverseName(), dataset.getDatasetName(), FeedActivityType.FEED_BEGIN);
+            String[] computeLocations = beginFeedActivity.getFeedActivityDetails()
+                    .get(FeedActivityDetails.COMPUTE_LOCATIONS).split(",");
+            AlgebricksPartitionConstraint constraint;
+            try {
+                constraint = new AlgebricksAbsolutePartitionConstraint(computeLocations);
+            } catch (Exception e) {
+                throw new AlgebricksException(e);
+            }
+
+            return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(feedInterceptScan, constraint);
+
+        } catch (Exception e) {
+            throw new AlgebricksException("Unable to create feed intercept scan runtime", e);
         }
     }
 
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
index b3321cb..2b04363 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
@@ -31,6 +31,7 @@
     }
 
     private Map<FeedId, List<AdapterRuntimeManager>> activeFeedRuntimeManagers = new HashMap<FeedId, List<AdapterRuntimeManager>>();
+    private Map<FeedId, IFeedDistributor> activeFeedDistributors = new HashMap<FeedId, IFeedDistributor>();
 
     @Override
     public synchronized void registerFeedRuntime(AdapterRuntimeManager adapterRuntimeMgr) {
@@ -67,8 +68,22 @@
         return null;
     }
 
+    @Override
+    public synchronized void registerSourceFeed(FeedId feedId, IFeedDistributor feedDistributor) {
+        activeFeedDistributors.put(feedId, feedDistributor);
+    }
+
+    @Override
+    public synchronized void deRegisterSourceFeed(FeedId feedId) {
+        activeFeedDistributors.remove(feedId);
+    }
+
     public List<AdapterRuntimeManager> getFeedRuntimeManagers(FeedId feedId) {
         return activeFeedRuntimeManagers.get(feedId);
     }
 
+    public IFeedDistributor getFeedDistributor(FeedId feedId) {
+        return activeFeedDistributors.get(feedId);
+    }
+
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedManager.java
index 46302e8..cbabd7e 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedManager.java
@@ -36,4 +36,15 @@
      */
     public AdapterRuntimeManager getFeedRuntimeManager(FeedId feedId, int partition);
 
+    /**
+     * @param feedId
+     */
+    public void deRegisterSourceFeed(FeedId feedId);
+
+    /**
+     * @param feedId
+     * @param feedDistributor
+     */
+    void registerSourceFeed(FeedId feedId, IFeedDistributor feedDistributor);
+
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/functions/MetadataBuiltinFunctions.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/functions/MetadataBuiltinFunctions.java
index d5d4cc2..639b77a 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/functions/MetadataBuiltinFunctions.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/functions/MetadataBuiltinFunctions.java
@@ -40,6 +40,8 @@
         AsterixBuiltinFunctions.addDatasetFunction(AsterixBuiltinFunctions.DATASET);
         AsterixBuiltinFunctions.addUnnestFun(AsterixBuiltinFunctions.FEED_INGEST, false);
         AsterixBuiltinFunctions.addDatasetFunction(AsterixBuiltinFunctions.FEED_INGEST);
+        AsterixBuiltinFunctions.addUnnestFun(AsterixBuiltinFunctions.FEED_INTERCEPT, false);
+        AsterixBuiltinFunctions.addDatasetFunction(AsterixBuiltinFunctions.FEED_INTERCEPT);
     }
 
     public static void addMetadataBuiltinFunctions() {
@@ -129,6 +131,49 @@
                 return t2;
             }
         });
+        
+        AsterixBuiltinFunctions.addFunction(AsterixBuiltinFunctions.FEED_INTERCEPT, new IResultTypeComputer() {
+
+            @Override
+            public IAType computeType(ILogicalExpression expression, IVariableTypeEnvironment env,
+                    IMetadataProvider<?, ?> mp) throws AlgebricksException {
+                AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) expression;
+                if (f.getArguments().size() != 1) {
+                    throw new AlgebricksException("dataset arity is 1, not " + f.getArguments().size());
+                }
+                ILogicalExpression a1 = f.getArguments().get(0).getValue();
+                IAType t1 = (IAType) env.getType(a1);
+                if (t1.getTypeTag() == ATypeTag.ANY) {
+                    return BuiltinType.ANY;
+                }
+                if (t1.getTypeTag() != ATypeTag.STRING) {
+                    throw new AlgebricksException("Illegal type " + t1 + " for dataset() argument.");
+                }
+                if (a1.getExpressionTag() != LogicalExpressionTag.CONSTANT) {
+                    return BuiltinType.ANY;
+                }
+                AsterixConstantValue acv = (AsterixConstantValue) ((ConstantExpression) a1).getValue();
+                String datasetArg = ((AString) acv.getObject()).getStringValue();
+                AqlMetadataProvider metadata = ((AqlMetadataProvider) mp);
+                Pair<String, String> datasetInfo = getDatasetInfo(metadata, datasetArg);
+                String dataverseName = datasetInfo.first;
+                String datasetName = datasetInfo.second;
+                if (dataverseName == null) {
+                    throw new AlgebricksException("Unspecified dataverse!");
+                }
+                Dataset dataset = metadata.findDataset(dataverseName, datasetName);
+                if (dataset == null) {
+                    throw new AlgebricksException("Could not find dataset " + datasetName + " in dataverse "
+                            + dataverseName);
+                }
+                String tn = dataset.getItemTypeName();
+                IAType t2 = metadata.findType(dataverseName, tn);
+                if (t2 == null) {
+                    throw new AlgebricksException("No type for dataset " + datasetName);
+                }
+                return t2;
+            }
+        });
     }
 
     private static Pair<String, String> getDatasetInfo(AqlMetadataProvider metadata, String datasetArg) {
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
index 1a2a0db..e3be0df 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
@@ -194,8 +194,7 @@
             "numeric-idiv", 2);
     public final static FunctionIdentifier CARET = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "caret", 2);
 
-    public final static FunctionIdentifier NUMERIC_ABS = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
-            "abs", 1);
+    public final static FunctionIdentifier NUMERIC_ABS = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "abs", 1);
     public final static FunctionIdentifier NUMERIC_CEILING = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
             "ceiling", 1);
     public final static FunctionIdentifier NUMERIC_FLOOR = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
@@ -243,6 +242,8 @@
     public final static FunctionIdentifier DATASET = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "dataset", 1);
     public final static FunctionIdentifier FEED_INGEST = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
             "feed-ingest", 1);
+    public final static FunctionIdentifier FEED_INTERCEPT = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "feed-intercept", 1);
 
     public final static FunctionIdentifier INDEX_SEARCH = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
             "index-search", FunctionIdentifier.VARARGS);
@@ -424,7 +425,8 @@
             "interval-overlaps", 2);
     public final static FunctionIdentifier INTERVAL_OVERLAPPED_BY = new FunctionIdentifier(
             FunctionConstants.ASTERIX_NS, "interval-overlapped-by", 2);
-    public final static FunctionIdentifier OVERLAP = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "interval-overlapping", 2);
+    public final static FunctionIdentifier OVERLAP = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
+            "interval-overlapping", 2);
     public final static FunctionIdentifier INTERVAL_STARTS = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
             "interval-starts", 2);
     public final static FunctionIdentifier INTERVAL_STARTED_BY = new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
@@ -958,12 +960,14 @@
     static {
         datasetFunctions.add(getAsterixFunctionInfo(DATASET));
         datasetFunctions.add(getAsterixFunctionInfo(FEED_INGEST));
+        datasetFunctions.add(getAsterixFunctionInfo(FEED_INTERCEPT));
         datasetFunctions.add(getAsterixFunctionInfo(INDEX_SEARCH));
     }
 
     static {
         addUnnestFun(DATASET, false);
         addUnnestFun(FEED_INGEST, false);
+        addUnnestFun(FEED_INTERCEPT, false);
         addUnnestFun(RANGE, true);
         addUnnestFun(SCAN_COLLECTION, false);
         addUnnestFun(SUBSET_COLLECTION, false);