Introduces Feeds 2.0

commit c3f577861fc705d848c1641605689cadd6973bae
Merge: ebc4cae fc0c2c0
Author: ramangrover29 <ramangrover29@gmail.com>
Date:   Fri Jun 26 13:04:05 2015 -0700

    Merge branch 'raman/feeds_2_release' of https://code.google.com/p/asterixdb-sandbox into raman/feeds_2_release

    Conflicts:
    	asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedServlet.java
    	asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectAccessors.java

commit ebc4cae21a7302869f953df1ebda601e798d12d2
Author: ramangrover29 <ramangrover29@gmail.com>
Date:   Sat Jun 20 17:14:45 2015 -0700

    Introduces Feeds 2.0

    Some of the prominent chnages introduced are as follows
     a) Support for building a cascade network of feeds (via secondary feeds feature)
     b) Feed Management Console for tracking active feeds and associated metrics
     c) Support for elastic runtime for data ingestion
     d) Improved fault-tolerance with support for logging of failed records

    Documentation has been added at asterix-doc/src/site/markdown/feeds/

commit fc0c2c0549a6ee8b202e57607d2e110478cd57bb
Author: ramangrover29 <ramangrover29@gmail.com>
Date:   Sat Jun 20 17:14:45 2015 -0700

    Introduces Feeds 2.0

    Some of the prominent chnages introduced are as follows
     a) Support for building a cascade network of feeds (via secondary feeds feature)
     b) Feed Management Console for tracking active feeds and associated metrics
     c) Support for elastic runtime for data ingestion
     d) Improved fault-tolerance with support for logging of failed records

    Documentation has been added at asterix-doc/src/site/markdown/feeds/

Change-Id: I498f01c591a229aaf51cec43ab20f3e5c4f072f4
Reviewed-on: https://asterix-gerrit.ics.uci.edu/297
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Steven Jacobs <sjaco002@ucr.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/base/Statement.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/base/Statement.java
index abac9df..b9c96d1 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/base/Statement.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/base/Statement.java
@@ -37,10 +37,14 @@
         INDEX_DECL,
         CREATE_DATAVERSE,
         INDEX_DROP,
-        CREATE_FEED,
+        CREATE_PRIMARY_FEED,
+        CREATE_SECONDARY_FEED,
         DROP_FEED,
         CONNECT_FEED,
         DISCONNECT_FEED,
+        SUBSCRIBE_FEED,
+        CREATE_FEED_POLICY,
+        DROP_FEED_POLICY,
         CREATE_FUNCTION,
         FUNCTION_DROP,
         COMPACT, 
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/ConnectFeedStatement.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/ConnectFeedStatement.java
index 25acff8..5d9794e 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/ConnectFeedStatement.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/ConnectFeedStatement.java
@@ -32,9 +32,10 @@
 import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter.AdapterType;
 import edu.uci.ics.asterix.metadata.entities.Feed;
 import edu.uci.ics.asterix.metadata.entities.Function;
+import edu.uci.ics.asterix.metadata.entities.PrimaryFeed;
 import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies;
 import edu.uci.ics.asterix.metadata.feeds.FeedUtil;
-import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
+import edu.uci.ics.asterix.metadata.feeds.IFeedAdapterFactory;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
@@ -75,6 +76,7 @@
         this.varCounter = varCounter;
     }
 
+    /*
     public void initialize(MetadataTransactionContext mdTxnCtx, Dataset targetDataset, Feed sourceFeed)
             throws MetadataException {
         query = new Query();
@@ -91,9 +93,9 @@
             }
         }
 
-        Triple<IAdapterFactory, ARecordType, AdapterType> factoryOutput = null;
+        Triple<IFeedAdapterFactory, ARecordType, AdapterType> factoryOutput = null;
         try {
-            factoryOutput = FeedUtil.getFeedFactoryAndOutput(sourceFeed, mdTxnCtx);
+            factoryOutput = FeedUtil.getPrimaryFeedFactoryAndOutput((PrimaryFeed) sourceFeed, mdTxnCtx);
             adapterOutputType = factoryOutput.second.getTypeName();
         } catch (AlgebricksException ae) {
             ae.printStackTrace();
@@ -135,7 +137,7 @@
             throw new MetadataException(pe);
         }
 
-    }
+    }*/
 
     public Identifier getDataverseName() {
         return dataverseName;
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/CreateFeedPolicyStatement.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/CreateFeedPolicyStatement.java
new file mode 100644
index 0000000..939d777
--- /dev/null
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/CreateFeedPolicyStatement.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.aql.expression;
+
+import java.util.Map;
+
+import edu.uci.ics.asterix.aql.base.Statement;
+import edu.uci.ics.asterix.aql.expression.visitor.IAqlExpressionVisitor;
+import edu.uci.ics.asterix.aql.expression.visitor.IAqlVisitorWithVoidReturn;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+
+public class CreateFeedPolicyStatement implements Statement {
+
+    private final String policyName;
+    private final String sourcePolicyName;
+    private final Map<String, String> properties;
+    private final String sourcePolicyFile;
+    private final String description;
+    private final boolean ifNotExists;
+
+    public CreateFeedPolicyStatement(String policyName, String sourcePolicyName, Map<String, String> properties,
+            String description, boolean ifNotExists) {
+        this.policyName = policyName;
+        this.sourcePolicyName = sourcePolicyName;
+        this.properties = properties;
+        this.description = description;
+        this.ifNotExists = ifNotExists;
+        sourcePolicyFile = null;
+    }
+
+    public CreateFeedPolicyStatement(String policyName, String sourcePolicyFile, String description, boolean ifNotExists) {
+        this.policyName = policyName;
+        this.sourcePolicyName = null;
+        this.sourcePolicyFile = sourcePolicyFile;
+        this.description = description;
+        this.properties = null;
+        this.ifNotExists = ifNotExists;
+    }
+
+    public boolean getIfNotExists() {
+        return this.ifNotExists;
+    }
+
+    @Override
+    public Kind getKind() {
+        return Statement.Kind.CREATE_FEED_POLICY;
+    }
+
+    @Override
+    public <R, T> R accept(IAqlExpressionVisitor<R, T> visitor, T arg) throws AsterixException {
+        return visitor.visitCreateFeedPolicyStatement(this, arg);
+    }
+
+    @Override
+    public <T> void accept(IAqlVisitorWithVoidReturn<T> visitor, T arg) throws AsterixException {
+        visitor.visit(this, arg);
+    }
+
+    public String getPolicyName() {
+        return policyName;
+    }
+
+    public String getSourcePolicyName() {
+        return sourcePolicyName;
+    }
+
+    public Map<String, String> getProperties() {
+        return properties;
+    }
+
+    public String getSourcePolicyFile() {
+        return sourcePolicyFile;
+    }
+
+    public String getDescription() {
+        return description;
+    }
+
+}
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/CreateFeedStatement.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/CreateFeedStatement.java
index 4e33f73..e189fd4 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/CreateFeedStatement.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/CreateFeedStatement.java
@@ -14,47 +14,32 @@
  */
 package edu.uci.ics.asterix.aql.expression;
 
-import java.util.Map;
-
 import edu.uci.ics.asterix.aql.base.Statement;
 import edu.uci.ics.asterix.aql.expression.visitor.IAqlExpressionVisitor;
 import edu.uci.ics.asterix.aql.expression.visitor.IAqlVisitorWithVoidReturn;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.common.functions.FunctionSignature;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
 
-public class CreateFeedStatement implements Statement {
+public abstract class CreateFeedStatement implements Statement {
 
-    private final Identifier dataverseName;
-    private final Identifier feedName;
-    private final String adapterName;
-    private final Map<String, String> adapterConfiguration;
+    private final Pair<Identifier, Identifier> qName;
     private final FunctionSignature appliedFunction;
     private final boolean ifNotExists;
 
-    public CreateFeedStatement(Identifier dataverseName, Identifier feedName, String adapterName,
-            Map<String, String> adapterConfiguration, FunctionSignature appliedFunction, boolean ifNotExists) {
-        this.feedName = feedName;
-        this.dataverseName = dataverseName;
-        this.adapterName = adapterName;
-        this.adapterConfiguration = adapterConfiguration;
+    public CreateFeedStatement(Pair<Identifier, Identifier> qName, FunctionSignature appliedFunction,
+            boolean ifNotExists) {
+        this.qName = qName;
         this.appliedFunction = appliedFunction;
         this.ifNotExists = ifNotExists;
     }
 
     public Identifier getDataverseName() {
-        return dataverseName;
+        return qName.first;
     }
 
     public Identifier getFeedName() {
-        return feedName;
-    }
-
-    public String getAdapterName() {
-        return adapterName;
-    }
-
-    public Map<String, String> getAdapterConfiguration() {
-        return adapterConfiguration;
+        return qName.second;
     }
 
     public FunctionSignature getAppliedFunction() {
@@ -66,14 +51,10 @@
     }
 
     @Override
-    public Kind getKind() {
-        return Kind.CREATE_FEED;
-    }
+    public abstract Kind getKind();
 
     @Override
-    public <R, T> R accept(IAqlExpressionVisitor<R, T> visitor, T arg) throws AsterixException {
-        return visitor.visitCreateFeedStatement(this, arg);
-    }
+    public abstract <R, T> R accept(IAqlExpressionVisitor<R, T> visitor, T arg) throws AsterixException;
 
     @Override
     public <T> void accept(IAqlVisitorWithVoidReturn<T> visitor, T arg) throws AsterixException {
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/CreatePrimaryFeedStatement.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/CreatePrimaryFeedStatement.java
new file mode 100644
index 0000000..810e508
--- /dev/null
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/CreatePrimaryFeedStatement.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.aql.expression;
+
+import java.util.Map;
+
+import edu.uci.ics.asterix.aql.base.Statement;
+import edu.uci.ics.asterix.aql.expression.visitor.IAqlExpressionVisitor;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.functions.FunctionSignature;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+
+public class CreatePrimaryFeedStatement extends CreateFeedStatement implements Statement {
+
+    private final String adaptorName;
+    private final Map<String, String> adaptorConfiguration;
+
+    public CreatePrimaryFeedStatement(Pair<Identifier, Identifier> qName, String adaptorName,
+            Map<String, String> adaptorConfiguration, FunctionSignature appliedFunction, boolean ifNotExists) {
+        super(qName, appliedFunction, ifNotExists);
+        this.adaptorName = adaptorName;
+        this.adaptorConfiguration = adaptorConfiguration;
+    }
+
+    public String getAdaptorName() {
+        return adaptorName;
+    }
+
+    public Map<String, String> getAdaptorConfiguration() {
+        return adaptorConfiguration;
+    }
+
+    @Override
+    public Kind getKind() {
+        return Kind.CREATE_PRIMARY_FEED;
+    }
+
+    @Override
+    public <R, T> R accept(IAqlExpressionVisitor<R, T> visitor, T arg) throws AsterixException {
+        return visitor.visitCreatePrimaryFeedStatement(this, arg);
+    }
+}
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/CreateSecondaryFeedStatement.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/CreateSecondaryFeedStatement.java
new file mode 100644
index 0000000..7d5f72a
--- /dev/null
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/CreateSecondaryFeedStatement.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.aql.expression;
+
+import edu.uci.ics.asterix.aql.base.Statement;
+import edu.uci.ics.asterix.aql.expression.visitor.IAqlExpressionVisitor;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.functions.FunctionSignature;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+
+/**
+ * Represents the AQL statement for creating a secondary feed.
+ * A secondary feed is one that derives its data from another (primary/secondary) feed.
+ */
+public class CreateSecondaryFeedStatement extends CreateFeedStatement implements Statement {
+
+    /** The source feed that provides data for this secondary feed. */
+    private final Pair<Identifier, Identifier> sourceQName;
+
+    public CreateSecondaryFeedStatement(Pair<Identifier, Identifier> qName, Pair<Identifier, Identifier> sourceQName,
+            FunctionSignature appliedFunction, boolean ifNotExists) {
+        super(qName, appliedFunction, ifNotExists);
+        this.sourceQName = sourceQName;
+    }
+
+    public String getSourceFeedDataverse() {
+        return sourceQName.first != null ? sourceQName.first.toString()
+                : getDataverseName() != null ? getDataverseName().getValue() : null;
+    }
+
+    public String getSourceFeedName() {
+        return sourceQName.second != null ? sourceQName.second.toString() : null;
+    }
+
+    @Override
+    public Kind getKind() {
+        return Kind.CREATE_SECONDARY_FEED;
+    }
+
+    @Override
+    public <R, T> R accept(IAqlExpressionVisitor<R, T> visitor, T arg) throws AsterixException {
+        return visitor.visitCreateSecondaryFeedStatement(this, arg);
+    }
+
+}
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/FeedPolicyDropStatement.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/FeedPolicyDropStatement.java
new file mode 100644
index 0000000..a9e9ee1
--- /dev/null
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/FeedPolicyDropStatement.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.aql.expression;
+
+import edu.uci.ics.asterix.aql.base.Statement;
+import edu.uci.ics.asterix.aql.expression.visitor.IAqlExpressionVisitor;
+import edu.uci.ics.asterix.aql.expression.visitor.IAqlVisitorWithVoidReturn;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+
+public class FeedPolicyDropStatement implements Statement {
+
+    private final Identifier dataverseName;
+    private final Identifier policyName;
+    private boolean ifExists;
+
+    public FeedPolicyDropStatement(Identifier dataverseName, Identifier policyName, boolean ifExists) {
+        this.dataverseName = dataverseName;
+        this.policyName = policyName;
+        this.ifExists = ifExists;
+    }
+
+    @Override
+    public Kind getKind() {
+        return Kind.DROP_FEED_POLICY;
+    }
+
+    public Identifier getDataverseName() {
+        return dataverseName;
+    }
+
+    public Identifier getPolicyName() {
+        return policyName;
+    }
+
+    public boolean getIfExists() {
+        return ifExists;
+    }
+
+    @Override
+    public <R, T> R accept(IAqlExpressionVisitor<R, T> visitor, T arg) throws AsterixException {
+        return visitor.visitDropFeedPolicyStatement(this, arg);
+    }
+
+    @Override
+    public <T> void accept(IAqlVisitorWithVoidReturn<T> visitor, T arg) throws AsterixException {
+        visitor.visit(this, arg);
+    }
+
+}
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/SubscribeFeedStatement.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/SubscribeFeedStatement.java
new file mode 100644
index 0000000..509a69a
--- /dev/null
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/SubscribeFeedStatement.java
@@ -0,0 +1,207 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.aql.expression;
+
+import java.io.StringReader;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.aql.base.Statement;
+import edu.uci.ics.asterix.aql.expression.visitor.IAqlExpressionVisitor;
+import edu.uci.ics.asterix.aql.expression.visitor.IAqlVisitorWithVoidReturn;
+import edu.uci.ics.asterix.aql.parser.AQLParser;
+import edu.uci.ics.asterix.aql.parser.ParseException;
+import edu.uci.ics.asterix.aql.util.FunctionUtils;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.feeds.FeedActivity;
+import edu.uci.ics.asterix.common.feeds.FeedConnectionRequest;
+import edu.uci.ics.asterix.common.feeds.FeedId;
+import edu.uci.ics.asterix.common.feeds.FeedPolicyAccessor;
+import edu.uci.ics.asterix.common.functions.FunctionSignature;
+import edu.uci.ics.asterix.metadata.MetadataException;
+import edu.uci.ics.asterix.metadata.MetadataManager;
+import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
+import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter.AdapterType;
+import edu.uci.ics.asterix.metadata.entities.Feed;
+import edu.uci.ics.asterix.metadata.entities.Function;
+import edu.uci.ics.asterix.metadata.entities.PrimaryFeed;
+import edu.uci.ics.asterix.metadata.entities.SecondaryFeed;
+import edu.uci.ics.asterix.metadata.feeds.FeedUtil;
+import edu.uci.ics.asterix.metadata.feeds.IFeedAdapterFactory;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
+
+/**
+ * Represents the AQL statement for subscribing to a feed.
+ * This AQL statement is private and may not be used by the end-user.
+ */
+public class SubscribeFeedStatement implements Statement {
+
+    private static final Logger LOGGER = Logger.getLogger(SubscribeFeedStatement.class.getName());
+    private final FeedConnectionRequest connectionRequest;
+    private Query query;
+    private int varCounter;
+    private final String[] locations;
+
+    public static final String WAIT_FOR_COMPLETION = "wait-for-completion-feed";
+
+    public SubscribeFeedStatement(String[] locations, FeedConnectionRequest subscriptionRequest) {
+        this.connectionRequest = subscriptionRequest;
+        this.varCounter = 0;
+        this.locations = locations;
+    }
+
+    public void initialize(MetadataTransactionContext mdTxnCtx) throws MetadataException {
+        this.query = new Query();
+        FeedId sourceFeedId = connectionRequest.getFeedJointKey().getFeedId();
+        Feed subscriberFeed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, connectionRequest.getReceivingFeedId()
+                .getDataverse(), connectionRequest.getReceivingFeedId().getFeedName());
+        if (subscriberFeed == null) {
+            throw new IllegalStateException(" Subscriber feed " + subscriberFeed + " not found.");
+        }
+
+        String feedOutputType = getOutputType(mdTxnCtx);
+        FunctionSignature appliedFunction = subscriberFeed.getAppliedFunction();
+        Function function = null;
+        if (appliedFunction != null) {
+            function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, appliedFunction);
+            if (function == null) {
+                throw new MetadataException(" Unknown function " + function);
+            } else if (function.getParams().size() > 1) {
+                throw new MetadataException(" Incompatible function: " + appliedFunction
+                        + " Number if arguments must be 1");
+            }
+        }
+
+        StringBuilder builder = new StringBuilder();
+        builder.append("use dataverse " + sourceFeedId.getDataverse() + ";\n");
+        builder.append("set" + " " + FunctionUtils.IMPORT_PRIVATE_FUNCTIONS + " " + "'" + Boolean.TRUE + "'" + ";\n");
+        builder.append("set" + " " + FeedActivity.FeedActivityDetails.FEED_POLICY_NAME + " " + "'"
+                + connectionRequest.getPolicy() + "'" + ";\n");
+
+        builder.append("insert into dataset " + connectionRequest.getTargetDataset() + " ");
+        builder.append(" (" + " for $x in feed-collect ('" + sourceFeedId.getDataverse() + "'" + "," + "'"
+                + sourceFeedId.getFeedName() + "'" + "," + "'" + connectionRequest.getReceivingFeedId().getFeedName()
+                + "'" + "," + "'" + connectionRequest.getSubscriptionLocation().name() + "'" + "," + "'"
+                + connectionRequest.getTargetDataset() + "'" + "," + "'" + feedOutputType + "'" + ")");
+
+        List<String> functionsToApply = connectionRequest.getFunctionsToApply();
+        if (functionsToApply != null && functionsToApply.isEmpty()) {
+            builder.append(" return $x");
+        } else {
+            String rValueName = "x";
+            String lValueName = "y";
+            int variableIndex = 0;
+            for (String functionName : functionsToApply) {
+                function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, appliedFunction);
+                variableIndex++;
+                switch (function.getLanguage().toUpperCase()) {
+                    case Function.LANGUAGE_AQL:
+                        builder.append(" let " + "$" + lValueName + variableIndex + ":=(" + function.getFunctionBody()
+                                + ")");
+                        builder.append("\n");
+                        break;
+                    case Function.LANGUAGE_JAVA:
+                        builder.append(" let " + "$" + lValueName + variableIndex + ":=" + functionName + "(" + "$"
+                                + rValueName + ")");
+                        rValueName = lValueName + variableIndex;
+                        break;
+                }
+                builder.append("\n");
+            }
+            builder.append("return $" + lValueName + variableIndex);
+        }
+        builder.append(")");
+        builder.append(";");
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Connect feed statement translated to\n" + builder.toString());
+        }
+        AQLParser parser = new AQLParser(new StringReader(builder.toString()));
+
+        List<Statement> statements;
+        try {
+            statements = parser.Statement();
+            query = ((InsertStatement) statements.get(3)).getQuery();
+        } catch (ParseException pe) {
+            throw new MetadataException(pe);
+        }
+
+    }
+
+    public Query getQuery() {
+        return query;
+    }
+
+    public int getVarCounter() {
+        return varCounter;
+    }
+
+    @Override
+    public Kind getKind() {
+        return Kind.SUBSCRIBE_FEED;
+    }
+
+    public String getPolicy() {
+        return connectionRequest.getPolicy();
+    }
+
+    public FeedConnectionRequest getSubscriptionRequest() {
+        return connectionRequest;
+    }
+
+    @Override
+    public <R, T> R accept(IAqlExpressionVisitor<R, T> visitor, T arg) throws AsterixException {
+        return null;
+    }
+
+    @Override
+    public <T> void accept(IAqlVisitorWithVoidReturn<T> visitor, T arg) throws AsterixException {
+    }
+
+    public String getDataverseName() {
+        return connectionRequest.getReceivingFeedId().getDataverse();
+    }
+
+    private String getOutputType(MetadataTransactionContext mdTxnCtx) throws MetadataException {
+        String outputType = null;
+        FeedId feedId = connectionRequest.getReceivingFeedId();
+        Feed feed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, feedId.getDataverse(), feedId.getFeedName());
+        FeedPolicyAccessor policyAccessor = new FeedPolicyAccessor(connectionRequest.getPolicyParameters());
+        try {
+            switch (feed.getFeedType()) {
+                case PRIMARY:
+                    Triple<IFeedAdapterFactory, ARecordType, AdapterType> factoryOutput = null;
+
+                    factoryOutput = FeedUtil.getPrimaryFeedFactoryAndOutput((PrimaryFeed) feed, policyAccessor,
+                            mdTxnCtx);
+                    outputType = factoryOutput.second.getTypeName();
+                    break;
+                case SECONDARY:
+                    outputType = FeedUtil.getSecondaryFeedOutput((SecondaryFeed) feed, policyAccessor, mdTxnCtx);
+                    break;
+            }
+            return outputType;
+
+        } catch (AlgebricksException ae) {
+            throw new MetadataException(ae);
+        }
+    }
+
+    public String[] getLocations() {
+        return locations;
+    }
+}
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/AQLPrintVisitor.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/AQLPrintVisitor.java
index 21a2388..28acfda 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/AQLPrintVisitor.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/AQLPrintVisitor.java
@@ -25,6 +25,7 @@
 import edu.uci.ics.asterix.aql.expression.CompactStatement;
 import edu.uci.ics.asterix.aql.expression.ConnectFeedStatement;
 import edu.uci.ics.asterix.aql.expression.CreateDataverseStatement;
+import edu.uci.ics.asterix.aql.expression.CreateFeedPolicyStatement;
 import edu.uci.ics.asterix.aql.expression.CreateFeedStatement;
 import edu.uci.ics.asterix.aql.expression.CreateFunctionStatement;
 import edu.uci.ics.asterix.aql.expression.CreateIndexStatement;
@@ -37,6 +38,7 @@
 import edu.uci.ics.asterix.aql.expression.DropStatement;
 import edu.uci.ics.asterix.aql.expression.FLWOGRExpression;
 import edu.uci.ics.asterix.aql.expression.FeedDropStatement;
+import edu.uci.ics.asterix.aql.expression.FeedPolicyDropStatement;
 import edu.uci.ics.asterix.aql.expression.FieldAccessor;
 import edu.uci.ics.asterix.aql.expression.FieldBinding;
 import edu.uci.ics.asterix.aql.expression.ForClause;
@@ -61,13 +63,13 @@
 import edu.uci.ics.asterix.aql.expression.OrderbyClause;
 import edu.uci.ics.asterix.aql.expression.OrderbyClause.OrderModifier;
 import edu.uci.ics.asterix.aql.expression.OrderedListTypeDefinition;
-import edu.uci.ics.asterix.aql.expression.RunStatement;
 import edu.uci.ics.asterix.aql.expression.QuantifiedExpression;
 import edu.uci.ics.asterix.aql.expression.QuantifiedPair;
 import edu.uci.ics.asterix.aql.expression.Query;
 import edu.uci.ics.asterix.aql.expression.RecordConstructor;
 import edu.uci.ics.asterix.aql.expression.RecordTypeDefinition;
 import edu.uci.ics.asterix.aql.expression.RecordTypeDefinition.RecordKind;
+import edu.uci.ics.asterix.aql.expression.RunStatement;
 import edu.uci.ics.asterix.aql.expression.SetStatement;
 import edu.uci.ics.asterix.aql.expression.TypeDecl;
 import edu.uci.ics.asterix.aql.expression.TypeDropStatement;
@@ -557,6 +559,15 @@
         // TODO Auto-generated method stub
 
     }
+    
+    @Override
+    public void visit(CreateFeedPolicyStatement stmt, Integer arg) throws AsterixException {
+        // TODO Auto-generated method stub
+    }
+
+    @Override
+    public void visit(FeedPolicyDropStatement stmt, Integer arg) throws AsterixException {
+    }
 
     @Override
     public void visit(RunStatement stmt, Integer arg) throws AsterixException {
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/IAqlExpressionVisitor.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/IAqlExpressionVisitor.java
index 84d9726..6716fff 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/IAqlExpressionVisitor.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/IAqlExpressionVisitor.java
@@ -15,21 +15,24 @@
 package edu.uci.ics.asterix.aql.expression.visitor;
 
 import edu.uci.ics.asterix.aql.expression.CallExpr;
-import edu.uci.ics.asterix.aql.expression.ConnectFeedStatement;
 import edu.uci.ics.asterix.aql.expression.CompactStatement;
+import edu.uci.ics.asterix.aql.expression.ConnectFeedStatement;
 import edu.uci.ics.asterix.aql.expression.CreateDataverseStatement;
-import edu.uci.ics.asterix.aql.expression.CreateFeedStatement;
+import edu.uci.ics.asterix.aql.expression.CreateFeedPolicyStatement;
 import edu.uci.ics.asterix.aql.expression.CreateFunctionStatement;
 import edu.uci.ics.asterix.aql.expression.CreateIndexStatement;
+import edu.uci.ics.asterix.aql.expression.CreatePrimaryFeedStatement;
+import edu.uci.ics.asterix.aql.expression.CreateSecondaryFeedStatement;
 import edu.uci.ics.asterix.aql.expression.DatasetDecl;
 import edu.uci.ics.asterix.aql.expression.DataverseDecl;
 import edu.uci.ics.asterix.aql.expression.DataverseDropStatement;
 import edu.uci.ics.asterix.aql.expression.DeleteStatement;
 import edu.uci.ics.asterix.aql.expression.DisconnectFeedStatement;
 import edu.uci.ics.asterix.aql.expression.DistinctClause;
-import edu.uci.ics.asterix.aql.expression.FeedDropStatement;
 import edu.uci.ics.asterix.aql.expression.DropStatement;
 import edu.uci.ics.asterix.aql.expression.FLWOGRExpression;
+import edu.uci.ics.asterix.aql.expression.FeedDropStatement;
+import edu.uci.ics.asterix.aql.expression.FeedPolicyDropStatement;
 import edu.uci.ics.asterix.aql.expression.FieldAccessor;
 import edu.uci.ics.asterix.aql.expression.ForClause;
 import edu.uci.ics.asterix.aql.expression.FunctionDecl;
@@ -157,9 +160,15 @@
 
     R visitConnectFeedStatement(ConnectFeedStatement del, T arg) throws AsterixException;
 
-    R visitCreateFeedStatement(CreateFeedStatement del, T arg) throws AsterixException;
+    R visitCreatePrimaryFeedStatement(CreatePrimaryFeedStatement cpfs, T arg) throws AsterixException;
+
+    R visitCreateSecondaryFeedStatement(CreateSecondaryFeedStatement csfs, T arg) throws AsterixException;
 
     R visitDropFeedStatement(FeedDropStatement del, T arg) throws AsterixException;
+    
+    R visitDropFeedPolicyStatement(FeedPolicyDropStatement dfs, T arg) throws AsterixException;
+
+    R visitCreateFeedPolicyStatement(CreateFeedPolicyStatement cfps, T arg) throws AsterixException;
 
     R visitCallExpr(CallExpr pf, T arg) throws AsterixException;
 
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/IAqlVisitorWithVoidReturn.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/IAqlVisitorWithVoidReturn.java
index f04c3a0..84e0825 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/IAqlVisitorWithVoidReturn.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/IAqlVisitorWithVoidReturn.java
@@ -18,6 +18,7 @@
 import edu.uci.ics.asterix.aql.expression.CompactStatement;
 import edu.uci.ics.asterix.aql.expression.ConnectFeedStatement;
 import edu.uci.ics.asterix.aql.expression.CreateDataverseStatement;
+import edu.uci.ics.asterix.aql.expression.CreateFeedPolicyStatement;
 import edu.uci.ics.asterix.aql.expression.CreateFeedStatement;
 import edu.uci.ics.asterix.aql.expression.CreateFunctionStatement;
 import edu.uci.ics.asterix.aql.expression.CreateIndexStatement;
@@ -30,6 +31,7 @@
 import edu.uci.ics.asterix.aql.expression.DropStatement;
 import edu.uci.ics.asterix.aql.expression.FLWOGRExpression;
 import edu.uci.ics.asterix.aql.expression.FeedDropStatement;
+import edu.uci.ics.asterix.aql.expression.FeedPolicyDropStatement;
 import edu.uci.ics.asterix.aql.expression.FieldAccessor;
 import edu.uci.ics.asterix.aql.expression.ForClause;
 import edu.uci.ics.asterix.aql.expression.FunctionDecl;
@@ -49,11 +51,11 @@
 import edu.uci.ics.asterix.aql.expression.OperatorExpr;
 import edu.uci.ics.asterix.aql.expression.OrderbyClause;
 import edu.uci.ics.asterix.aql.expression.OrderedListTypeDefinition;
-import edu.uci.ics.asterix.aql.expression.RunStatement;
 import edu.uci.ics.asterix.aql.expression.QuantifiedExpression;
 import edu.uci.ics.asterix.aql.expression.Query;
 import edu.uci.ics.asterix.aql.expression.RecordConstructor;
 import edu.uci.ics.asterix.aql.expression.RecordTypeDefinition;
+import edu.uci.ics.asterix.aql.expression.RunStatement;
 import edu.uci.ics.asterix.aql.expression.SetStatement;
 import edu.uci.ics.asterix.aql.expression.TypeDecl;
 import edu.uci.ics.asterix.aql.expression.TypeDropStatement;
@@ -164,8 +166,12 @@
 
     void visit(CreateFeedStatement stmt, T arg) throws AsterixException;
 
+    void visit(CreateFeedPolicyStatement stmt, T arg) throws AsterixException;
+
     void visit(FeedDropStatement stmt, T arg) throws AsterixException;
 
+    void visit(FeedPolicyDropStatement stmt, T arg) throws AsterixException;
+
     void visit(CreateFunctionStatement cfs, T arg) throws AsterixException;
 
     void visit(FunctionDropStatement fds, T arg) throws AsterixException;
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/rewrites/AqlRewriter.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/rewrites/AqlRewriter.java
index c416fa6..6b1df98 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/rewrites/AqlRewriter.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/rewrites/AqlRewriter.java
@@ -25,21 +25,24 @@
 import edu.uci.ics.asterix.aql.base.Expression;
 import edu.uci.ics.asterix.aql.base.Expression.Kind;
 import edu.uci.ics.asterix.aql.expression.CallExpr;
-import edu.uci.ics.asterix.aql.expression.ConnectFeedStatement;
-import edu.uci.ics.asterix.aql.expression.DisconnectFeedStatement;
 import edu.uci.ics.asterix.aql.expression.CompactStatement;
+import edu.uci.ics.asterix.aql.expression.ConnectFeedStatement;
 import edu.uci.ics.asterix.aql.expression.CreateDataverseStatement;
-import edu.uci.ics.asterix.aql.expression.CreateFeedStatement;
+import edu.uci.ics.asterix.aql.expression.CreateFeedPolicyStatement;
 import edu.uci.ics.asterix.aql.expression.CreateFunctionStatement;
 import edu.uci.ics.asterix.aql.expression.CreateIndexStatement;
+import edu.uci.ics.asterix.aql.expression.CreatePrimaryFeedStatement;
+import edu.uci.ics.asterix.aql.expression.CreateSecondaryFeedStatement;
 import edu.uci.ics.asterix.aql.expression.DatasetDecl;
 import edu.uci.ics.asterix.aql.expression.DataverseDecl;
 import edu.uci.ics.asterix.aql.expression.DataverseDropStatement;
 import edu.uci.ics.asterix.aql.expression.DeleteStatement;
+import edu.uci.ics.asterix.aql.expression.DisconnectFeedStatement;
 import edu.uci.ics.asterix.aql.expression.DistinctClause;
 import edu.uci.ics.asterix.aql.expression.DropStatement;
 import edu.uci.ics.asterix.aql.expression.FLWOGRExpression;
 import edu.uci.ics.asterix.aql.expression.FeedDropStatement;
+import edu.uci.ics.asterix.aql.expression.FeedPolicyDropStatement;
 import edu.uci.ics.asterix.aql.expression.FieldAccessor;
 import edu.uci.ics.asterix.aql.expression.FieldBinding;
 import edu.uci.ics.asterix.aql.expression.ForClause;
@@ -89,8 +92,6 @@
 import edu.uci.ics.asterix.metadata.entities.Function;
 import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
 import edu.uci.ics.asterix.om.functions.AsterixFunction;
-import edu.uci.ics.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
-import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 
 public final class AqlRewriter {
 
@@ -553,8 +554,13 @@
         }
 
         @Override
-        public Void visitCreateFeedStatement(CreateFeedStatement del, Void arg) throws AsterixException {
-            // TODO Auto-generated method stub
+        public Void visitCreatePrimaryFeedStatement(CreatePrimaryFeedStatement del, Void arg) throws AsterixException {
+            return null;
+        }
+
+        @Override
+        public Void visitCreateSecondaryFeedStatement(CreateSecondaryFeedStatement del, Void arg)
+                throws AsterixException {
             return null;
         }
 
@@ -575,6 +581,18 @@
             // TODO Auto-generated method stub
             return null;
         }
+        
+        @Override
+        public Void visitCreateFeedPolicyStatement(CreateFeedPolicyStatement cfps, Void arg) throws AsterixException {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public Void visitDropFeedPolicyStatement(FeedPolicyDropStatement dfs, Void arg) throws AsterixException {
+            // TODO Auto-generated method stub
+            return null;
+        }
 
     }
 }
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/rewrites/CloneAndSubstituteVariablesVisitor.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/rewrites/CloneAndSubstituteVariablesVisitor.java
index edb080c..f03799a 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/rewrites/CloneAndSubstituteVariablesVisitor.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/rewrites/CloneAndSubstituteVariablesVisitor.java
@@ -22,21 +22,24 @@
 import edu.uci.ics.asterix.aql.base.Expression;
 import edu.uci.ics.asterix.aql.base.IAqlExpression;
 import edu.uci.ics.asterix.aql.expression.CallExpr;
-import edu.uci.ics.asterix.aql.expression.ConnectFeedStatement;
-import edu.uci.ics.asterix.aql.expression.DisconnectFeedStatement;
 import edu.uci.ics.asterix.aql.expression.CompactStatement;
+import edu.uci.ics.asterix.aql.expression.ConnectFeedStatement;
 import edu.uci.ics.asterix.aql.expression.CreateDataverseStatement;
-import edu.uci.ics.asterix.aql.expression.CreateFeedStatement;
+import edu.uci.ics.asterix.aql.expression.CreateFeedPolicyStatement;
 import edu.uci.ics.asterix.aql.expression.CreateFunctionStatement;
 import edu.uci.ics.asterix.aql.expression.CreateIndexStatement;
+import edu.uci.ics.asterix.aql.expression.CreatePrimaryFeedStatement;
+import edu.uci.ics.asterix.aql.expression.CreateSecondaryFeedStatement;
 import edu.uci.ics.asterix.aql.expression.DatasetDecl;
 import edu.uci.ics.asterix.aql.expression.DataverseDecl;
 import edu.uci.ics.asterix.aql.expression.DataverseDropStatement;
 import edu.uci.ics.asterix.aql.expression.DeleteStatement;
+import edu.uci.ics.asterix.aql.expression.DisconnectFeedStatement;
 import edu.uci.ics.asterix.aql.expression.DistinctClause;
 import edu.uci.ics.asterix.aql.expression.DropStatement;
 import edu.uci.ics.asterix.aql.expression.FLWOGRExpression;
 import edu.uci.ics.asterix.aql.expression.FeedDropStatement;
+import edu.uci.ics.asterix.aql.expression.FeedPolicyDropStatement;
 import edu.uci.ics.asterix.aql.expression.FieldAccessor;
 import edu.uci.ics.asterix.aql.expression.FieldBinding;
 import edu.uci.ics.asterix.aql.expression.ForClause;
@@ -588,8 +591,15 @@
     }
 
     @Override
-    public Pair<IAqlExpression, List<VariableSubstitution>> visitCreateFeedStatement(CreateFeedStatement del,
-            List<VariableSubstitution> arg) throws AsterixException {
+    public Pair<IAqlExpression, List<VariableSubstitution>> visitCreatePrimaryFeedStatement(
+            CreatePrimaryFeedStatement del, List<VariableSubstitution> arg) throws AsterixException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public Pair<IAqlExpression, List<VariableSubstitution>> visitCreateSecondaryFeedStatement(
+            CreateSecondaryFeedStatement del, List<VariableSubstitution> arg) throws AsterixException {
         // TODO Auto-generated method stub
         return null;
     }
@@ -614,4 +624,17 @@
         // TODO Auto-generated method stub
         return null;
     }
+    
+
+    @Override
+    public Pair<IAqlExpression, List<VariableSubstitution>> visitCreateFeedPolicyStatement(
+            CreateFeedPolicyStatement cfps, List<VariableSubstitution> arg) throws AsterixException {
+        return null;
+    }
+
+    @Override
+    public Pair<IAqlExpression, List<VariableSubstitution>> visitDropFeedPolicyStatement(FeedPolicyDropStatement dfs,
+            List<VariableSubstitution> arg) throws AsterixException {
+        return null;
+    }
 }
\ No newline at end of file
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/rewrites/InlineUdfsVisitor.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/rewrites/InlineUdfsVisitor.java
index 58433d3..394b507 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/rewrites/InlineUdfsVisitor.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/rewrites/InlineUdfsVisitor.java
@@ -23,21 +23,24 @@
 import edu.uci.ics.asterix.aql.base.Expression.Kind;
 import edu.uci.ics.asterix.aql.base.IAqlExpression;
 import edu.uci.ics.asterix.aql.expression.CallExpr;
-import edu.uci.ics.asterix.aql.expression.ConnectFeedStatement;
-import edu.uci.ics.asterix.aql.expression.DisconnectFeedStatement;
 import edu.uci.ics.asterix.aql.expression.CompactStatement;
+import edu.uci.ics.asterix.aql.expression.ConnectFeedStatement;
 import edu.uci.ics.asterix.aql.expression.CreateDataverseStatement;
-import edu.uci.ics.asterix.aql.expression.CreateFeedStatement;
+import edu.uci.ics.asterix.aql.expression.CreateFeedPolicyStatement;
 import edu.uci.ics.asterix.aql.expression.CreateFunctionStatement;
 import edu.uci.ics.asterix.aql.expression.CreateIndexStatement;
+import edu.uci.ics.asterix.aql.expression.CreatePrimaryFeedStatement;
+import edu.uci.ics.asterix.aql.expression.CreateSecondaryFeedStatement;
 import edu.uci.ics.asterix.aql.expression.DatasetDecl;
 import edu.uci.ics.asterix.aql.expression.DataverseDecl;
 import edu.uci.ics.asterix.aql.expression.DataverseDropStatement;
 import edu.uci.ics.asterix.aql.expression.DeleteStatement;
+import edu.uci.ics.asterix.aql.expression.DisconnectFeedStatement;
 import edu.uci.ics.asterix.aql.expression.DistinctClause;
 import edu.uci.ics.asterix.aql.expression.DropStatement;
 import edu.uci.ics.asterix.aql.expression.FLWOGRExpression;
 import edu.uci.ics.asterix.aql.expression.FeedDropStatement;
+import edu.uci.ics.asterix.aql.expression.FeedPolicyDropStatement;
 import edu.uci.ics.asterix.aql.expression.FieldAccessor;
 import edu.uci.ics.asterix.aql.expression.FieldBinding;
 import edu.uci.ics.asterix.aql.expression.ForClause;
@@ -522,7 +525,15 @@
     }
 
     @Override
-    public Boolean visitCreateFeedStatement(CreateFeedStatement del, List<FunctionDecl> arg) throws AsterixException {
+    public Boolean visitCreatePrimaryFeedStatement(CreatePrimaryFeedStatement del, List<FunctionDecl> arg)
+            throws AsterixException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public Boolean visitCreateSecondaryFeedStatement(CreateSecondaryFeedStatement del, List<FunctionDecl> arg)
+            throws AsterixException {
         // TODO Auto-generated method stub
         return null;
     }
@@ -544,4 +555,18 @@
         // TODO Auto-generated method stub
         return null;
     }
+    
+    @Override
+    public Boolean visitCreateFeedPolicyStatement(CreateFeedPolicyStatement cfps, List<FunctionDecl> arg)
+            throws AsterixException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public Boolean visitDropFeedPolicyStatement(FeedPolicyDropStatement dfs, List<FunctionDecl> arg)
+            throws AsterixException {
+        // TODO Auto-generated method stub
+        return null;
+    }
 }
diff --git a/asterix-aql/src/main/javacc/AQL.jj b/asterix-aql/src/main/javacc/AQL.jj
index 8150871..7044962 100644
--- a/asterix-aql/src/main/javacc/AQL.jj
+++ b/asterix-aql/src/main/javacc/AQL.jj
@@ -37,6 +37,9 @@
 import edu.uci.ics.asterix.aql.expression.CallExpr;
 import edu.uci.ics.asterix.aql.expression.CompactStatement;
 import edu.uci.ics.asterix.aql.expression.ConnectFeedStatement;
+import edu.uci.ics.asterix.aql.expression.CreateFeedPolicyStatement;
+import edu.uci.ics.asterix.aql.expression.CreatePrimaryFeedStatement;
+import edu.uci.ics.asterix.aql.expression.CreateSecondaryFeedStatement;
 import edu.uci.ics.asterix.aql.expression.CreateDataverseStatement;
 import edu.uci.ics.asterix.aql.expression.CreateFeedStatement;
 import edu.uci.ics.asterix.aql.expression.CreateFunctionStatement;
@@ -336,6 +339,7 @@
     | stmt = DataverseSpecification()
     | stmt = FunctionSpecification()
     | stmt = FeedSpecification()
+    | stmt = FeedPolicySpecification()
   )
   {
     return stmt;
@@ -653,24 +657,61 @@
   Map<String,String> properties = null;
   FunctionSignature appliedFunction = null;
   CreateFeedStatement cfs = null;
+  Pair<Identifier,Identifier> sourceNameComponents = null;
+  
 }
 {
   (
-    "feed"  nameComponents = QualifiedName()
-    ifNotExists = IfNotExists()
-    "using" adapterName = AdapterName() properties = Configuration()
-    (appliedFunction = ApplyFunction())?
+    "secondary" "feed"  nameComponents = QualifiedName() ifNotExists = IfNotExists()
+      <FROM> "feed" sourceNameComponents = QualifiedName() (appliedFunction = ApplyFunction())?
       {
-        cfs = new CreateFeedStatement(nameComponents.first,
-                                   nameComponents.second, adapterName, properties, appliedFunction, ifNotExists);
+        cfs = new CreateSecondaryFeedStatement(nameComponents,
+                                   sourceNameComponents, appliedFunction, ifNotExists);
       }
-
+     |
+     ("primary")? "feed" nameComponents = QualifiedName() ifNotExists = IfNotExists()
+      "using" adapterName = AdapterName() properties = Configuration() (appliedFunction = ApplyFunction())?  
+       {
+        cfs = new CreatePrimaryFeedStatement(nameComponents,
+                                    adapterName, properties, appliedFunction, ifNotExists);
+       }
   )
     {
       return cfs;
     }
 }
 
+CreateFeedPolicyStatement FeedPolicySpecification() throws ParseException:
+{
+  String policyName = null;  
+  String basePolicyName = null; 
+  String sourcePolicyFile = null;
+  String definition = null;
+  boolean ifNotExists = false;
+  Map<String,String> properties = null;
+  CreateFeedPolicyStatement cfps = null;
+}
+{
+  (
+    "ingestion" "policy"  policyName = Identifier() ifNotExists = IfNotExists()
+      <FROM> 
+      ("policy" basePolicyName = Identifier() properties = Configuration() ("definition" definition = StringLiteral())?  
+      {
+        cfps = new CreateFeedPolicyStatement(policyName,
+                                   basePolicyName, properties, definition, ifNotExists);
+      }
+     | "path" sourcePolicyFile = Identifier() ("definition" definition = StringLiteral())?  
+       {
+        cfps = new CreateFeedPolicyStatement(policyName, sourcePolicyFile, definition, ifNotExists);
+       }
+     ) 
+       
+  )
+    {
+      return cfps;
+    }
+}
+
 
 
 List<VarIdentifier> ParameterList() throws ParseException: