Merge branch 'salsubaiee/master_flush_frames_in_feeds_rapidly'
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
index a24a459..ffde764 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
@@ -33,7 +33,7 @@
import edu.uci.ics.asterix.optimizer.rules.IntroduceEnforcedListTypeRule;
import edu.uci.ics.asterix.optimizer.rules.IntroduceInstantLockSearchCallbackRule;
import edu.uci.ics.asterix.optimizer.rules.IntroduceMaterializationForInsertWithSelfScanRule;
-import edu.uci.ics.asterix.optimizer.rules.IntroduceRapidFrameFlushProjectRule;
+import edu.uci.ics.asterix.optimizer.rules.IntroduceRapidFrameFlushProjectAssignRule;
import edu.uci.ics.asterix.optimizer.rules.IntroduceSecondaryIndexInsertDeleteRule;
import edu.uci.ics.asterix.optimizer.rules.IntroduceStaticTypeCastForInsertRule;
import edu.uci.ics.asterix.optimizer.rules.IntroduceUnnestForCollectionToSequenceRule;
@@ -264,7 +264,7 @@
physicalRewritesTopLevel.add(new PushLimitDownRule());
physicalRewritesTopLevel.add(new IntroduceProjectsRule());
physicalRewritesTopLevel.add(new SetAlgebricksPhysicalOperatorsRule());
- physicalRewritesTopLevel.add(new IntroduceRapidFrameFlushProjectRule());
+ physicalRewritesTopLevel.add(new IntroduceRapidFrameFlushProjectAssignRule());
physicalRewritesTopLevel.add(new SetExecutionModeRule());
return physicalRewritesTopLevel;
}
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceRapidFrameFlushProjectRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceRapidFrameFlushProjectAssignRule.java
similarity index 67%
rename from asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceRapidFrameFlushProjectRule.java
rename to asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceRapidFrameFlushProjectAssignRule.java
index d3e11ed2..98f7eeb 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceRapidFrameFlushProjectRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceRapidFrameFlushProjectAssignRule.java
@@ -22,21 +22,23 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.AssignPOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.StreamProjectPOperator;
import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
/**
- * This rule will search for project operators in an insert/delete/update plan and
- * pass a hint to all those projects between the first "insert" and the commit
- * operator. This hint is used by the project operator so that frames are pushed to
+ * This rule will search for project and assign operators in an insert/delete/update plan and
+ * pass a hint to all of them. This hint is used by the project and assign operators so that frames are pushed to
* the next operator without waiting until they get full. The purpose of this is to
- * reduce the time of holding exclusive locks on the keys that have been inserted.
+ * reduce the time of holding exclusive locks on the keys that have been inserted. Also to allow feeds batching
+ * to work correctly.
*
* @author salsubaiee
*/
-public class IntroduceRapidFrameFlushProjectRule implements IAlgebraicRewriteRule {
+public class IntroduceRapidFrameFlushProjectAssignRule implements IAlgebraicRewriteRule {
@Override
public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
@@ -52,44 +54,49 @@
return false;
}
- AbstractLogicalOperator descendantOp = op;
- while (descendantOp != null) {
- if (descendantOp.getOperatorTag() == LogicalOperatorTag.PROJECT) {
+ for (int i = 0; i < op.getInputs().size(); ++i) {
+ AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) op.getInputs().get(i).getValue();
+
+ if (descendantOp.getOperatorTag() == LogicalOperatorTag.PROJECT
+ || descendantOp.getOperatorTag() == LogicalOperatorTag.ASSIGN) {
if (descendantOp.getPhysicalOperator() == null) {
return false;
}
- } else if (descendantOp.getOperatorTag() == LogicalOperatorTag.INSERT_DELETE) {
- break;
}
- descendantOp = (AbstractLogicalOperator) descendantOp.getInputs().get(0).getValue();
+ checkIfRuleIsApplicable(descendantOp);
}
return true;
}
+ private boolean changeRule(AbstractLogicalOperator op) {
+ boolean planModified = false;
+ for (int i = 0; i < op.getInputs().size(); ++i) {
+ AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) op.getInputs().get(i).getValue();
+
+ if (descendantOp.getOperatorTag() == LogicalOperatorTag.PROJECT) {
+ ProjectOperator projectOp = (ProjectOperator) descendantOp;
+ StreamProjectPOperator physicalOp = (StreamProjectPOperator) projectOp.getPhysicalOperator();
+ physicalOp.setRapidFrameFlush(true);
+ planModified = true;
+ } else if (descendantOp.getOperatorTag() == LogicalOperatorTag.ASSIGN) {
+ AssignOperator assignOp = (AssignOperator) descendantOp;
+ AssignPOperator physicalOp = (AssignPOperator) assignOp.getPhysicalOperator();
+ physicalOp.setRapidFrameFlush(true);
+ planModified = true;
+ }
+ changeRule(descendantOp);
+ }
+ return planModified;
+ }
+
@Override
public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
throws AlgebricksException {
AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
-
if (!checkIfRuleIsApplicable(op)) {
return false;
}
- AbstractLogicalOperator descendantOp = op;
- ProjectOperator projectOp = null;
-
- boolean planModified = false;
- while (descendantOp != null) {
- if (descendantOp.getOperatorTag() == LogicalOperatorTag.PROJECT) {
- projectOp = (ProjectOperator) descendantOp;
- StreamProjectPOperator physicalOp = (StreamProjectPOperator) projectOp.getPhysicalOperator();
- physicalOp.setRapidFrameFlush(true);
- planModified = true;
- } else if (descendantOp.getOperatorTag() == LogicalOperatorTag.INSERT_DELETE) {
- break;
- }
- descendantOp = (AbstractLogicalOperator) descendantOp.getInputs().get(0).getValue();
- }
- return planModified;
+ return changeRule(op);
}
}
\ No newline at end of file
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index e931271..8aba12a 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -1268,6 +1268,7 @@
acquireReadLatch();
try {
+ metadataProvider.setWriteTransaction(true);
BeginFeedStatement bfs = (BeginFeedStatement) stmt;
String dataverseName = getActiveDataverseName(bfs.getDataverseName());