Merge branch 'salsubaiee/master_flush_frames_in_feeds_rapidly'
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
index a24a459..ffde764 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
@@ -33,7 +33,7 @@
 import edu.uci.ics.asterix.optimizer.rules.IntroduceEnforcedListTypeRule;
 import edu.uci.ics.asterix.optimizer.rules.IntroduceInstantLockSearchCallbackRule;
 import edu.uci.ics.asterix.optimizer.rules.IntroduceMaterializationForInsertWithSelfScanRule;
-import edu.uci.ics.asterix.optimizer.rules.IntroduceRapidFrameFlushProjectRule;
+import edu.uci.ics.asterix.optimizer.rules.IntroduceRapidFrameFlushProjectAssignRule;
 import edu.uci.ics.asterix.optimizer.rules.IntroduceSecondaryIndexInsertDeleteRule;
 import edu.uci.ics.asterix.optimizer.rules.IntroduceStaticTypeCastForInsertRule;
 import edu.uci.ics.asterix.optimizer.rules.IntroduceUnnestForCollectionToSequenceRule;
@@ -264,7 +264,7 @@
         physicalRewritesTopLevel.add(new PushLimitDownRule());
         physicalRewritesTopLevel.add(new IntroduceProjectsRule());
         physicalRewritesTopLevel.add(new SetAlgebricksPhysicalOperatorsRule());
-        physicalRewritesTopLevel.add(new IntroduceRapidFrameFlushProjectRule());
+        physicalRewritesTopLevel.add(new IntroduceRapidFrameFlushProjectAssignRule());
         physicalRewritesTopLevel.add(new SetExecutionModeRule());
         return physicalRewritesTopLevel;
     }
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceRapidFrameFlushProjectRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceRapidFrameFlushProjectAssignRule.java
similarity index 67%
rename from asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceRapidFrameFlushProjectRule.java
rename to asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceRapidFrameFlushProjectAssignRule.java
index d3e11ed2..98f7eeb 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceRapidFrameFlushProjectRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceRapidFrameFlushProjectAssignRule.java
@@ -22,21 +22,23 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.AssignPOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.StreamProjectPOperator;
 import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
 /**
- * This rule will search for project operators in an insert/delete/update plan and
- * pass a hint to all those projects between the first "insert" and the commit
- * operator. This hint is used by the project operator so that frames are pushed to
+ * This rule will search for project and assign operators in an insert/delete/update plan and
+ * pass a hint to all of them. This hint is used by the project and assign operators so that frames are pushed to
  * the next operator without waiting until they get full. The purpose of this is to
- * reduce the time of holding exclusive locks on the keys that have been inserted.
+ * reduce the time of holding exclusive locks on the keys that have been inserted. Also to allow feeds batching
+ * to work correctly.
  * 
  * @author salsubaiee
  */
-public class IntroduceRapidFrameFlushProjectRule implements IAlgebraicRewriteRule {
+public class IntroduceRapidFrameFlushProjectAssignRule implements IAlgebraicRewriteRule {
 
     @Override
     public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
@@ -52,44 +54,49 @@
             return false;
         }
 
-        AbstractLogicalOperator descendantOp = op;
-        while (descendantOp != null) {
-            if (descendantOp.getOperatorTag() == LogicalOperatorTag.PROJECT) {
+        for (int i = 0; i < op.getInputs().size(); ++i) {
+            AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) op.getInputs().get(i).getValue();
+
+            if (descendantOp.getOperatorTag() == LogicalOperatorTag.PROJECT
+                    || descendantOp.getOperatorTag() == LogicalOperatorTag.ASSIGN) {
                 if (descendantOp.getPhysicalOperator() == null) {
                     return false;
                 }
-            } else if (descendantOp.getOperatorTag() == LogicalOperatorTag.INSERT_DELETE) {
-                break;
             }
-            descendantOp = (AbstractLogicalOperator) descendantOp.getInputs().get(0).getValue();
+            checkIfRuleIsApplicable(descendantOp);
         }
         return true;
     }
 
+    private boolean changeRule(AbstractLogicalOperator op) {
+        boolean planModified = false;
+        for (int i = 0; i < op.getInputs().size(); ++i) {
+            AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) op.getInputs().get(i).getValue();
+
+            if (descendantOp.getOperatorTag() == LogicalOperatorTag.PROJECT) {
+                ProjectOperator projectOp = (ProjectOperator) descendantOp;
+                StreamProjectPOperator physicalOp = (StreamProjectPOperator) projectOp.getPhysicalOperator();
+                physicalOp.setRapidFrameFlush(true);
+                planModified = true;
+            } else if (descendantOp.getOperatorTag() == LogicalOperatorTag.ASSIGN) {
+                AssignOperator assignOp = (AssignOperator) descendantOp;
+                AssignPOperator physicalOp = (AssignPOperator) assignOp.getPhysicalOperator();
+                physicalOp.setRapidFrameFlush(true);
+                planModified = true;
+            }
+            changeRule(descendantOp);
+        }
+        return planModified;
+    }
+
     @Override
     public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
             throws AlgebricksException {
 
         AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
-
         if (!checkIfRuleIsApplicable(op)) {
             return false;
         }
-        AbstractLogicalOperator descendantOp = op;
-        ProjectOperator projectOp = null;
-
-        boolean planModified = false;
-        while (descendantOp != null) {
-            if (descendantOp.getOperatorTag() == LogicalOperatorTag.PROJECT) {
-                projectOp = (ProjectOperator) descendantOp;
-                StreamProjectPOperator physicalOp = (StreamProjectPOperator) projectOp.getPhysicalOperator();
-                physicalOp.setRapidFrameFlush(true);
-                planModified = true;
-            } else if (descendantOp.getOperatorTag() == LogicalOperatorTag.INSERT_DELETE) {
-                break;
-            }
-            descendantOp = (AbstractLogicalOperator) descendantOp.getInputs().get(0).getValue();
-        }
-        return planModified;
+        return changeRule(op);
     }
 }
\ No newline at end of file
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index e931271..8aba12a 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -1268,6 +1268,7 @@
         acquireReadLatch();
 
         try {
+            metadataProvider.setWriteTransaction(true);
             BeginFeedStatement bfs = (BeginFeedStatement) stmt;
             String dataverseName = getActiveDataverseName(bfs.getDataverseName());