Generating is-not-null filtering expr inside IntroduceSecondaryIndexInsertDeleteRule. Added AsterixTupleFilter to be passed to index insert/delete ops.

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix-fix-issue-9@272 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
index 4ef030b..179f97c 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
@@ -144,9 +144,11 @@
                     secondaryExpressions.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(
                             secondaryKeyVar)));
                 }
+                Mutable<ILogicalExpression> filterExpression = createFilterExpression(secondaryKeyVars);
                 AqlIndex dataSourceIndex = new AqlIndex(index, metadata, datasetName);
                 IndexInsertDeleteOperator indexUpdate = new IndexInsertDeleteOperator(dataSourceIndex,
-                        insertOp.getPrimaryKeyExpressions(), secondaryExpressions, insertOp.getOperation());
+                        insertOp.getPrimaryKeyExpressions(), secondaryExpressions, filterExpression,
+                        insertOp.getOperation());
                 indexUpdate.getInputs().add(new MutableObject<ILogicalOperator>(assign));
                 assign.getInputs().add(new MutableObject<ILogicalOperator>(project));
                 project.getInputs().add(new MutableObject<ILogicalOperator>(currentTop));
@@ -183,9 +185,11 @@
                     secondaryExpressions.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(
                             secondaryKeyVar)));
                 }
+                Mutable<ILogicalExpression> filterExpression = createFilterExpression(keyVarList);
                 AqlIndex dataSourceIndex = new AqlIndex(index, metadata, datasetName);
                 IndexInsertDeleteOperator indexUpdate = new IndexInsertDeleteOperator(dataSourceIndex,
-                        insertOp.getPrimaryKeyExpressions(), secondaryExpressions, insertOp.getOperation());
+                        insertOp.getPrimaryKeyExpressions(), secondaryExpressions, filterExpression,
+                        insertOp.getOperation());
                 AssignOperator assignCoordinates = new AssignOperator(keyVarList, keyExprList);
                 indexUpdate.getInputs().add(new MutableObject<ILogicalOperator>(assignCoordinates));
                 assignCoordinates.getInputs().add(new MutableObject<ILogicalOperator>(assign));
@@ -204,6 +208,29 @@
         return true;
     }
 
+    // TODO: Return null here for non-nullable fields.
+    private Mutable<ILogicalExpression> createFilterExpression(List<LogicalVariable> secondaryKeyVars) {
+        List<Mutable<ILogicalExpression>> filterExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+        for (LogicalVariable secondaryKeyVar : secondaryKeyVars) {
+            // Add 'is not null' to all secondary index keys as a filtering condition.
+            ScalarFunctionCallExpression isNullFuncExpr = new ScalarFunctionCallExpression(
+                    FunctionUtils.getFunctionInfo(AsterixBuiltinFunctions.IS_NULL),
+                    new MutableObject<ILogicalExpression>(new VariableReferenceExpression(secondaryKeyVar)));
+            ScalarFunctionCallExpression notFuncExpr = new ScalarFunctionCallExpression(
+                    FunctionUtils.getFunctionInfo(AsterixBuiltinFunctions.NOT),
+                    new MutableObject<ILogicalExpression>(isNullFuncExpr));                    
+            filterExpressions.add(new MutableObject<ILogicalExpression>(notFuncExpr));
+        }
+        Mutable<ILogicalExpression> filterExpression = null;
+        if (filterExpressions.size() > 0) {
+            filterExpression = new MutableObject<ILogicalExpression>(new ScalarFunctionCallExpression(
+                    FunctionUtils.getFunctionInfo(AsterixBuiltinFunctions.AND), filterExpressions));
+        } else {
+            filterExpression = filterExpressions.get(0);
+        }
+        return filterExpression;
+    }
+    
     public static IAType keyFieldType(String expr, ARecordType recType) throws AlgebricksException {
         String[] names = recType.getFieldNames();
         int n = names.length;
diff --git a/asterix-hyracks-glue/src/main/java/edu/uci/ics/asterix/runtime/transaction/TreeIndexInsertUpdateDeleteOperatorDescriptor.java b/asterix-hyracks-glue/src/main/java/edu/uci/ics/asterix/runtime/transaction/TreeIndexInsertUpdateDeleteOperatorDescriptor.java
index a80aadc..02b688b 100644
--- a/asterix-hyracks-glue/src/main/java/edu/uci/ics/asterix/runtime/transaction/TreeIndexInsertUpdateDeleteOperatorDescriptor.java
+++ b/asterix-hyracks-glue/src/main/java/edu/uci/ics/asterix/runtime/transaction/TreeIndexInsertUpdateDeleteOperatorDescriptor.java
@@ -28,6 +28,7 @@
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
 import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.ITupleFilterFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
@@ -45,23 +46,28 @@
 
     private final long transactionId;
 
-    /* TODO: Index operators should live in Hyracks. Right now, they are needed here in Asterix
-     * as a hack to provide transactionIDs. The Asterix verions of this operator will disappear 
-     * and the operator will come from Hyracks once the LSM/Recovery/Transactions world has 
-     * been introduced.
-     */
-    public TreeIndexInsertUpdateDeleteOperatorDescriptor(JobSpecification spec, RecordDescriptor recDesc,
-            IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> indexRegistryProvider,
-            IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
-            IBinaryComparatorFactory[] comparatorFactories, int[] fieldPermutation, IndexOp op,
-            IIndexDataflowHelperFactory dataflowHelperFactory, IOperationCallbackProvider opCallbackProvider,
-            long transactionId) {
-        super(spec, 1, 1, recDesc, storageManager, indexRegistryProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, dataflowHelperFactory, opCallbackProvider);
-        this.fieldPermutation = fieldPermutation;
-        this.op = op;
-        this.transactionId = transactionId;
-    }
+	/*
+	 * TODO: Index operators should live in Hyracks. Right now, they are needed
+	 * here in Asterix as a hack to provide transactionIDs. The Asterix verions
+	 * of this operator will disappear and the operator will come from Hyracks
+	 * once the LSM/Recovery/Transactions world has been introduced.
+	 */
+	public TreeIndexInsertUpdateDeleteOperatorDescriptor(JobSpecification spec,
+			RecordDescriptor recDesc, IStorageManagerInterface storageManager,
+			IIndexRegistryProvider<IIndex> indexRegistryProvider,
+			IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
+			IBinaryComparatorFactory[] comparatorFactories,
+			int[] fieldPermutation, IndexOp op,
+			IIndexDataflowHelperFactory dataflowHelperFactory,
+			ITupleFilterFactory tupleFilterFactory,
+			IOperationCallbackProvider opCallbackProvider, long transactionId) {
+		super(spec, 1, 1, recDesc, storageManager, indexRegistryProvider,
+				fileSplitProvider, typeTraits, comparatorFactories,
+				dataflowHelperFactory, tupleFilterFactory, opCallbackProvider);
+		this.fieldPermutation = fieldPermutation;
+		this.op = op;
+		this.transactionId = transactionId;
+	}
 
     @Override
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
diff --git a/asterix-hyracks-glue/src/main/java/edu/uci/ics/asterix/runtime/transaction/TreeIndexInsertUpdateDeleteOperatorNodePushable.java b/asterix-hyracks-glue/src/main/java/edu/uci/ics/asterix/runtime/transaction/TreeIndexInsertUpdateDeleteOperatorNodePushable.java
index 0749693..77aa9fa 100644
--- a/asterix-hyracks-glue/src/main/java/edu/uci/ics/asterix/runtime/transaction/TreeIndexInsertUpdateDeleteOperatorNodePushable.java
+++ b/asterix-hyracks-glue/src/main/java/edu/uci/ics/asterix/runtime/transaction/TreeIndexInsertUpdateDeleteOperatorNodePushable.java
@@ -37,6 +37,8 @@
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
+import edu.uci.ics.hyracks.storage.am.common.api.ITupleFilter;
+import edu.uci.ics.hyracks.storage.am.common.api.ITupleFilterFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
@@ -54,6 +56,7 @@
     private ILockManager lockManager;
     private final TransactionContext txnContext;
     private TreeLogger treeLogger;
+    private ITupleFilter tupleFilter;
     private final TransactionProvider transactionProvider;
 
     /* TODO: Index operators should live in Hyracks. Right now, they are needed here in Asterix
@@ -64,14 +67,16 @@
     public TreeIndexInsertUpdateDeleteOperatorNodePushable(TransactionContext txnContext,
             AbstractTreeIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx, int partition, int[] fieldPermutation,
             IRecordDescriptorProvider recordDescProvider, IndexOp op) {
-        boolean createIfNotExists = (op == IndexOp.INSERT);
         treeIndexHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
                 opDesc, ctx, partition);
         this.recordDescProvider = recordDescProvider;
         this.op = op;
         tuple.setFieldPermutation(fieldPermutation);
         this.txnContext = txnContext;
-
+        ITupleFilterFactory tupleFilterFactory = opDesc.getTupleFilterFactory();
+        if (tupleFilterFactory != null) {
+            tupleFilter = tupleFilterFactory.createTupleFilter();
+        }
         AsterixAppRuntimeContext runtimeContext = (AsterixAppRuntimeContext) ctx.getJobletContext()
                 .getApplicationContext().getApplicationObject();
         transactionProvider = runtimeContext.getTransactionProvider();
@@ -122,6 +127,9 @@
         try {
             for (int i = 0; i < tupleCount; i++) {
                 tuple.reset(accessor, i);
+                if (tupleFilter != null && !tupleFilter.accept(tuple)) {
+                    continue;
+                }
                 switch (op) {
                     case INSERT: {
                         lockManager.lock(txnContext, resourceId,
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 4eb3456..9d3e387 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -47,6 +47,7 @@
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
 import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
@@ -756,7 +757,7 @@
 				splitsAndConstraint.first, typeTraits, comparatorFactories,
 				fieldPermutation, IndexOp.INSERT,
 				new BTreeDataflowHelperFactory(),
-				NoOpOperationCallbackProvider.INSTANCE, txnId);
+				null, NoOpOperationCallbackProvider.INSTANCE, txnId);
 		return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(
 				btreeBulkLoad, splitsAndConstraint.second);
 	}
@@ -814,17 +815,18 @@
 				splitsAndConstraint.first, typeTraits, comparatorFactories,
 				fieldPermutation, IndexOp.DELETE,
 				new BTreeDataflowHelperFactory(),
-				NoOpOperationCallbackProvider.INSTANCE, txnId);
+				null, NoOpOperationCallbackProvider.INSTANCE, txnId);
 		return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(
 				btreeBulkLoad, splitsAndConstraint.second);
 	}
 
 	@Override
+	// TODO: Use filterExpr.
 	public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertRuntime(
 			IDataSourceIndex<String, AqlSourceId> dataSourceIndex,
 			IOperatorSchema propagatedSchema,
 			List<LogicalVariable> primaryKeys,
-			List<LogicalVariable> secondaryKeys, RecordDescriptor recordDesc,
+			List<LogicalVariable> secondaryKeys, ILogicalExpression filterExpr, RecordDescriptor recordDesc,
 			JobGenContext context, JobSpecification spec)
 			throws AlgebricksException {
 		String indexName = dataSourceIndex.getId();
@@ -849,11 +851,12 @@
 	}
 
 	@Override
+	// TODO: Use filterExpr.
 	public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexDeleteRuntime(
 			IDataSourceIndex<String, AqlSourceId> dataSourceIndex,
 			IOperatorSchema propagatedSchema,
 			List<LogicalVariable> primaryKeys,
-			List<LogicalVariable> secondaryKeys, RecordDescriptor recordDesc,
+			List<LogicalVariable> secondaryKeys, ILogicalExpression filterExpr, RecordDescriptor recordDesc,
 			JobGenContext context, JobSpecification spec)
 			throws AlgebricksException {
 		String indexName = dataSourceIndex.getId();
@@ -953,7 +956,7 @@
 				appContext.getIndexRegistryProvider(),
 				splitsAndConstraint.first, typeTraits, comparatorFactories,
 				fieldPermutation, indexOp, new BTreeDataflowHelperFactory(),
-				NoOpOperationCallbackProvider.INSTANCE, txnId);
+				null, NoOpOperationCallbackProvider.INSTANCE, txnId);
 		return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(
 				btreeBulkLoad, splitsAndConstraint.second);
 	}
@@ -1040,7 +1043,7 @@
 				splitsAndConstraint.first, typeTraits, comparatorFactories,
 				fieldPermutation, indexOp, new RTreeDataflowHelperFactory(
 						valueProviderFactories),
-				NoOpOperationCallbackProvider.INSTANCE, txnId);
+				null, NoOpOperationCallbackProvider.INSTANCE, txnId);
 		return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(
 				rtreeUpdate, splitsAndConstraint.second);
 	}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/base/AsterixTupleFilter.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/base/AsterixTupleFilter.java
new file mode 100644
index 0000000..7dd7ac1
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/base/AsterixTupleFilter.java
@@ -0,0 +1,29 @@
+package edu.uci.ics.asterix.runtime.base;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspector;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IEvaluatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.ITupleFilter;
+
+public class AsterixTupleFilter implements ITupleFilter {
+	
+	private final IBinaryBooleanInspector boolInspector;
+	private final IEvaluator eval;
+    private final ArrayBackedValueStorage evalOut = new ArrayBackedValueStorage();
+    
+	public AsterixTupleFilter(IEvaluatorFactory evalFactory,
+			IBinaryBooleanInspector boolInspector) throws AlgebricksException {
+		this.boolInspector = boolInspector;
+		eval = evalFactory.createEvaluator(evalOut);
+	}
+	
+	@Override
+	public boolean accept(IFrameTupleReference tuple) throws Exception {
+		evalOut.reset();
+		eval.evaluate(tuple);
+		return boolInspector.getBooleanValue(evalOut.getBytes(), 0, 2);
+	}
+}