prevented secondary index insert operator in feed from dropping frames
Change-Id: I5a22b7a56e476b0cb4535ba4323419ef7eb69fb1
Reviewed-on: https://asterix-gerrit.ics.uci.edu/446
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
index a5dcf18..4118f5e 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
@@ -41,6 +41,10 @@
private final boolean isPrimary;
+ public boolean isPrimary() {
+ return isPrimary;
+ }
+
public AsterixLSMInsertDeleteOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
int partition, int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider, IndexOperation op,
boolean isPrimary) {
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeInputHandler.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeInputHandler.java
index e0663ed..cafc699 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeInputHandler.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/feeds/FeedRuntimeInputHandler.java
@@ -49,7 +49,7 @@
private final FeedConnectionId connectionId;
private final FeedRuntimeId runtimeId;
private final FeedPolicyAccessor feedPolicyAccessor;
- private final boolean bufferingEnabled;
+ private boolean bufferingEnabled;
private final IExceptionHandler exceptionHandler;
private final FeedFrameDiscarder discarder;
private final FeedFrameSpiller spiller;
@@ -427,4 +427,12 @@
}
}
}
+
+ public boolean isBufferingEnabled() {
+ return bufferingEnabled;
+ }
+
+ public void setBufferingEnabled(boolean bufferingEnabled) {
+ this.bufferingEnabled = bufferingEnabled;
+ }
}
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaStoreNodePushable.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaStoreNodePushable.java
index 5275e4c..a18f4d8 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaStoreNodePushable.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaStoreNodePushable.java
@@ -24,6 +24,7 @@
import java.util.logging.Logger;
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.dataflow.AsterixLSMInsertDeleteOperatorNodePushable;
import org.apache.asterix.common.feeds.FeedConnectionId;
import org.apache.asterix.common.feeds.FeedRuntime;
import org.apache.asterix.common.feeds.FeedRuntimeId;
@@ -130,6 +131,12 @@
this.inputSideHandler = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, coreOperator,
policyEnforcer.getFeedPolicyAccessor(), true, fta, recordDesc, feedManager,
nPartitions);
+ if(coreOperator instanceof AsterixLSMInsertDeleteOperatorNodePushable){
+ AsterixLSMInsertDeleteOperatorNodePushable indexOp = (AsterixLSMInsertDeleteOperatorNodePushable) coreOperator;
+ if(!indexOp.isPrimary()){
+ inputSideHandler.setBufferingEnabled(false);
+ }
+ }
setupBasicRuntime(inputSideHandler);
}