Generating is-not-null filtering expr inside IntroduceSecondaryIndexInsertDeleteRule. Added AsterixTupleFilter to be passed to index insert/delete ops.
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix-fix-issue-9@272 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
index 4ef030b..179f97c 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
@@ -144,9 +144,11 @@
secondaryExpressions.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(
secondaryKeyVar)));
}
+ Mutable<ILogicalExpression> filterExpression = createFilterExpression(secondaryKeyVars);
AqlIndex dataSourceIndex = new AqlIndex(index, metadata, datasetName);
IndexInsertDeleteOperator indexUpdate = new IndexInsertDeleteOperator(dataSourceIndex,
- insertOp.getPrimaryKeyExpressions(), secondaryExpressions, insertOp.getOperation());
+ insertOp.getPrimaryKeyExpressions(), secondaryExpressions, filterExpression,
+ insertOp.getOperation());
indexUpdate.getInputs().add(new MutableObject<ILogicalOperator>(assign));
assign.getInputs().add(new MutableObject<ILogicalOperator>(project));
project.getInputs().add(new MutableObject<ILogicalOperator>(currentTop));
@@ -183,9 +185,11 @@
secondaryExpressions.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(
secondaryKeyVar)));
}
+ Mutable<ILogicalExpression> filterExpression = createFilterExpression(keyVarList);
AqlIndex dataSourceIndex = new AqlIndex(index, metadata, datasetName);
IndexInsertDeleteOperator indexUpdate = new IndexInsertDeleteOperator(dataSourceIndex,
- insertOp.getPrimaryKeyExpressions(), secondaryExpressions, insertOp.getOperation());
+ insertOp.getPrimaryKeyExpressions(), secondaryExpressions, filterExpression,
+ insertOp.getOperation());
AssignOperator assignCoordinates = new AssignOperator(keyVarList, keyExprList);
indexUpdate.getInputs().add(new MutableObject<ILogicalOperator>(assignCoordinates));
assignCoordinates.getInputs().add(new MutableObject<ILogicalOperator>(assign));
@@ -204,6 +208,29 @@
return true;
}
+ // TODO: Return null here for non-nullable fields.
+ private Mutable<ILogicalExpression> createFilterExpression(List<LogicalVariable> secondaryKeyVars) {
+ List<Mutable<ILogicalExpression>> filterExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+ for (LogicalVariable secondaryKeyVar : secondaryKeyVars) {
+ // Add 'is not null' to all secondary index keys as a filtering condition.
+ ScalarFunctionCallExpression isNullFuncExpr = new ScalarFunctionCallExpression(
+ FunctionUtils.getFunctionInfo(AsterixBuiltinFunctions.IS_NULL),
+ new MutableObject<ILogicalExpression>(new VariableReferenceExpression(secondaryKeyVar)));
+ ScalarFunctionCallExpression notFuncExpr = new ScalarFunctionCallExpression(
+ FunctionUtils.getFunctionInfo(AsterixBuiltinFunctions.NOT),
+ new MutableObject<ILogicalExpression>(isNullFuncExpr));
+ filterExpressions.add(new MutableObject<ILogicalExpression>(notFuncExpr));
+ }
+ Mutable<ILogicalExpression> filterExpression = null;
+ if (filterExpressions.size() > 0) {
+ filterExpression = new MutableObject<ILogicalExpression>(new ScalarFunctionCallExpression(
+ FunctionUtils.getFunctionInfo(AsterixBuiltinFunctions.AND), filterExpressions));
+ } else {
+ filterExpression = filterExpressions.get(0);
+ }
+ return filterExpression;
+ }
+
public static IAType keyFieldType(String expr, ARecordType recType) throws AlgebricksException {
String[] names = recType.getFieldNames();
int n = names.length;
diff --git a/asterix-hyracks-glue/src/main/java/edu/uci/ics/asterix/runtime/transaction/TreeIndexInsertUpdateDeleteOperatorDescriptor.java b/asterix-hyracks-glue/src/main/java/edu/uci/ics/asterix/runtime/transaction/TreeIndexInsertUpdateDeleteOperatorDescriptor.java
index a80aadc..02b688b 100644
--- a/asterix-hyracks-glue/src/main/java/edu/uci/ics/asterix/runtime/transaction/TreeIndexInsertUpdateDeleteOperatorDescriptor.java
+++ b/asterix-hyracks-glue/src/main/java/edu/uci/ics/asterix/runtime/transaction/TreeIndexInsertUpdateDeleteOperatorDescriptor.java
@@ -28,6 +28,7 @@
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.ITupleFilterFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
@@ -45,23 +46,28 @@
private final long transactionId;
- /* TODO: Index operators should live in Hyracks. Right now, they are needed here in Asterix
- * as a hack to provide transactionIDs. The Asterix verions of this operator will disappear
- * and the operator will come from Hyracks once the LSM/Recovery/Transactions world has
- * been introduced.
- */
- public TreeIndexInsertUpdateDeleteOperatorDescriptor(JobSpecification spec, RecordDescriptor recDesc,
- IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> indexRegistryProvider,
- IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
- IBinaryComparatorFactory[] comparatorFactories, int[] fieldPermutation, IndexOp op,
- IIndexDataflowHelperFactory dataflowHelperFactory, IOperationCallbackProvider opCallbackProvider,
- long transactionId) {
- super(spec, 1, 1, recDesc, storageManager, indexRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, dataflowHelperFactory, opCallbackProvider);
- this.fieldPermutation = fieldPermutation;
- this.op = op;
- this.transactionId = transactionId;
- }
+ /*
+ * TODO: Index operators should live in Hyracks. Right now, they are needed
+ * here in Asterix as a hack to provide transactionIDs. The Asterix verions
+ * of this operator will disappear and the operator will come from Hyracks
+ * once the LSM/Recovery/Transactions world has been introduced.
+ */
+ public TreeIndexInsertUpdateDeleteOperatorDescriptor(JobSpecification spec,
+ RecordDescriptor recDesc, IStorageManagerInterface storageManager,
+ IIndexRegistryProvider<IIndex> indexRegistryProvider,
+ IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
+ IBinaryComparatorFactory[] comparatorFactories,
+ int[] fieldPermutation, IndexOp op,
+ IIndexDataflowHelperFactory dataflowHelperFactory,
+ ITupleFilterFactory tupleFilterFactory,
+ IOperationCallbackProvider opCallbackProvider, long transactionId) {
+ super(spec, 1, 1, recDesc, storageManager, indexRegistryProvider,
+ fileSplitProvider, typeTraits, comparatorFactories,
+ dataflowHelperFactory, tupleFilterFactory, opCallbackProvider);
+ this.fieldPermutation = fieldPermutation;
+ this.op = op;
+ this.transactionId = transactionId;
+ }
@Override
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
diff --git a/asterix-hyracks-glue/src/main/java/edu/uci/ics/asterix/runtime/transaction/TreeIndexInsertUpdateDeleteOperatorNodePushable.java b/asterix-hyracks-glue/src/main/java/edu/uci/ics/asterix/runtime/transaction/TreeIndexInsertUpdateDeleteOperatorNodePushable.java
index 0749693..77aa9fa 100644
--- a/asterix-hyracks-glue/src/main/java/edu/uci/ics/asterix/runtime/transaction/TreeIndexInsertUpdateDeleteOperatorNodePushable.java
+++ b/asterix-hyracks-glue/src/main/java/edu/uci/ics/asterix/runtime/transaction/TreeIndexInsertUpdateDeleteOperatorNodePushable.java
@@ -37,6 +37,8 @@
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
+import edu.uci.ics.hyracks.storage.am.common.api.ITupleFilter;
+import edu.uci.ics.hyracks.storage.am.common.api.ITupleFilterFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
@@ -54,6 +56,7 @@
private ILockManager lockManager;
private final TransactionContext txnContext;
private TreeLogger treeLogger;
+ private ITupleFilter tupleFilter;
private final TransactionProvider transactionProvider;
/* TODO: Index operators should live in Hyracks. Right now, they are needed here in Asterix
@@ -64,14 +67,16 @@
public TreeIndexInsertUpdateDeleteOperatorNodePushable(TransactionContext txnContext,
AbstractTreeIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx, int partition, int[] fieldPermutation,
IRecordDescriptorProvider recordDescProvider, IndexOp op) {
- boolean createIfNotExists = (op == IndexOp.INSERT);
treeIndexHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
opDesc, ctx, partition);
this.recordDescProvider = recordDescProvider;
this.op = op;
tuple.setFieldPermutation(fieldPermutation);
this.txnContext = txnContext;
-
+ ITupleFilterFactory tupleFilterFactory = opDesc.getTupleFilterFactory();
+ if (tupleFilterFactory != null) {
+ tupleFilter = tupleFilterFactory.createTupleFilter();
+ }
AsterixAppRuntimeContext runtimeContext = (AsterixAppRuntimeContext) ctx.getJobletContext()
.getApplicationContext().getApplicationObject();
transactionProvider = runtimeContext.getTransactionProvider();
@@ -122,6 +127,9 @@
try {
for (int i = 0; i < tupleCount; i++) {
tuple.reset(accessor, i);
+ if (tupleFilter != null && !tupleFilter.accept(tuple)) {
+ continue;
+ }
switch (op) {
case INSERT: {
lockManager.lock(txnContext, resourceId,
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 4eb3456..9d3e387 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -47,6 +47,7 @@
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
@@ -756,7 +757,7 @@
splitsAndConstraint.first, typeTraits, comparatorFactories,
fieldPermutation, IndexOp.INSERT,
new BTreeDataflowHelperFactory(),
- NoOpOperationCallbackProvider.INSTANCE, txnId);
+ null, NoOpOperationCallbackProvider.INSTANCE, txnId);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(
btreeBulkLoad, splitsAndConstraint.second);
}
@@ -814,17 +815,18 @@
splitsAndConstraint.first, typeTraits, comparatorFactories,
fieldPermutation, IndexOp.DELETE,
new BTreeDataflowHelperFactory(),
- NoOpOperationCallbackProvider.INSTANCE, txnId);
+ null, NoOpOperationCallbackProvider.INSTANCE, txnId);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(
btreeBulkLoad, splitsAndConstraint.second);
}
@Override
+ // TODO: Use filterExpr.
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertRuntime(
IDataSourceIndex<String, AqlSourceId> dataSourceIndex,
IOperatorSchema propagatedSchema,
List<LogicalVariable> primaryKeys,
- List<LogicalVariable> secondaryKeys, RecordDescriptor recordDesc,
+ List<LogicalVariable> secondaryKeys, ILogicalExpression filterExpr, RecordDescriptor recordDesc,
JobGenContext context, JobSpecification spec)
throws AlgebricksException {
String indexName = dataSourceIndex.getId();
@@ -849,11 +851,12 @@
}
@Override
+ // TODO: Use filterExpr.
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexDeleteRuntime(
IDataSourceIndex<String, AqlSourceId> dataSourceIndex,
IOperatorSchema propagatedSchema,
List<LogicalVariable> primaryKeys,
- List<LogicalVariable> secondaryKeys, RecordDescriptor recordDesc,
+ List<LogicalVariable> secondaryKeys, ILogicalExpression filterExpr, RecordDescriptor recordDesc,
JobGenContext context, JobSpecification spec)
throws AlgebricksException {
String indexName = dataSourceIndex.getId();
@@ -953,7 +956,7 @@
appContext.getIndexRegistryProvider(),
splitsAndConstraint.first, typeTraits, comparatorFactories,
fieldPermutation, indexOp, new BTreeDataflowHelperFactory(),
- NoOpOperationCallbackProvider.INSTANCE, txnId);
+ null, NoOpOperationCallbackProvider.INSTANCE, txnId);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(
btreeBulkLoad, splitsAndConstraint.second);
}
@@ -1040,7 +1043,7 @@
splitsAndConstraint.first, typeTraits, comparatorFactories,
fieldPermutation, indexOp, new RTreeDataflowHelperFactory(
valueProviderFactories),
- NoOpOperationCallbackProvider.INSTANCE, txnId);
+ null, NoOpOperationCallbackProvider.INSTANCE, txnId);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(
rtreeUpdate, splitsAndConstraint.second);
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/base/AsterixTupleFilter.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/base/AsterixTupleFilter.java
new file mode 100644
index 0000000..7dd7ac1
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/base/AsterixTupleFilter.java
@@ -0,0 +1,29 @@
+package edu.uci.ics.asterix.runtime.base;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspector;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IEvaluatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.ITupleFilter;
+
+public class AsterixTupleFilter implements ITupleFilter {
+
+ private final IBinaryBooleanInspector boolInspector;
+ private final IEvaluator eval;
+ private final ArrayBackedValueStorage evalOut = new ArrayBackedValueStorage();
+
+ public AsterixTupleFilter(IEvaluatorFactory evalFactory,
+ IBinaryBooleanInspector boolInspector) throws AlgebricksException {
+ this.boolInspector = boolInspector;
+ eval = evalFactory.createEvaluator(evalOut);
+ }
+
+ @Override
+ public boolean accept(IFrameTupleReference tuple) throws Exception {
+ evalOut.reset();
+ eval.evaluate(tuple);
+ return boolInspector.getBooleanValue(evalOut.getBytes(), 0, 2);
+ }
+}