First running prototype of pipeline of insert/delete ops with is-not-null filters. All tests pass, but there is still some room for optimization.

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix-fix-issue-9@275 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
index 179f97c..8543edd 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
@@ -222,7 +222,7 @@
             filterExpressions.add(new MutableObject<ILogicalExpression>(notFuncExpr));
         }
         Mutable<ILogicalExpression> filterExpression = null;
-        if (filterExpressions.size() > 0) {
+        if (filterExpressions.size() > 1) {
             filterExpression = new MutableObject<ILogicalExpression>(new ScalarFunctionCallExpression(
                     FunctionUtils.getFunctionInfo(AsterixBuiltinFunctions.AND), filterExpressions));
         } else {
diff --git a/asterix-hyracks-glue/src/main/java/edu/uci/ics/asterix/runtime/transaction/TreeIndexInsertUpdateDeleteOperatorNodePushable.java b/asterix-hyracks-glue/src/main/java/edu/uci/ics/asterix/runtime/transaction/TreeIndexInsertUpdateDeleteOperatorNodePushable.java
index 77aa9fa..af4eefc 100644
--- a/asterix-hyracks-glue/src/main/java/edu/uci/ics/asterix/runtime/transaction/TreeIndexInsertUpdateDeleteOperatorNodePushable.java
+++ b/asterix-hyracks-glue/src/main/java/edu/uci/ics/asterix/runtime/transaction/TreeIndexInsertUpdateDeleteOperatorNodePushable.java
@@ -34,6 +34,7 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
@@ -51,6 +52,7 @@
     private final IRecordDescriptorProvider recordDescProvider;
     private final IndexOp op;
     private final PermutingFrameTupleReference tuple = new PermutingFrameTupleReference();
+    private FrameTupleReference frameTuple;
     private ByteBuffer writeBuffer;
     private IIndexAccessor indexAccessor;
     private ILockManager lockManager;
@@ -73,10 +75,6 @@
         this.op = op;
         tuple.setFieldPermutation(fieldPermutation);
         this.txnContext = txnContext;
-        ITupleFilterFactory tupleFilterFactory = opDesc.getTupleFilterFactory();
-        if (tupleFilterFactory != null) {
-            tupleFilter = tupleFilterFactory.createTupleFilter();
-        }
         AsterixAppRuntimeContext runtimeContext = (AsterixAppRuntimeContext) ctx.getJobletContext()
                 .getApplicationContext().getApplicationObject();
         transactionProvider = runtimeContext.getTransactionProvider();
@@ -109,6 +107,11 @@
             treeIndexHelper.init(false);
             ITreeIndex treeIndex = (ITreeIndex) treeIndexHelper.getIndex();
             indexAccessor = treeIndex.createAccessor();
+            ITupleFilterFactory tupleFilterFactory = opDesc.getTupleFilterFactory();
+            if (tupleFilterFactory != null) {
+                tupleFilter = tupleFilterFactory.createTupleFilter();
+                frameTuple = new FrameTupleReference();
+            }
             initializeTransactionSupport();
         } catch (Exception e) {
             // cleanup in case of failure
@@ -125,11 +128,14 @@
         byte[] resourceId = DataUtil.intToByteArray(fileId);
         int tupleCount = accessor.getTupleCount();
         try {
-            for (int i = 0; i < tupleCount; i++) {
-                tuple.reset(accessor, i);
-                if (tupleFilter != null && !tupleFilter.accept(tuple)) {
-                    continue;
+            for (int i = 0; i < tupleCount; i++) {                
+                if (tupleFilter != null) {
+                    frameTuple.reset(accessor, i);
+                    if (!tupleFilter.accept(frameTuple)) {
+                        continue;
+                    }
                 }
+                tuple.reset(accessor, i);
                 switch (op) {
                     case INSERT: {
                         lockManager.lock(txnContext, resourceId,
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 9d3e387..a67d0c3 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -41,6 +41,7 @@
 import edu.uci.ics.asterix.om.types.ATypeTag;
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.asterix.om.util.NonTaggedFormatUtil;
+import edu.uci.ics.asterix.runtime.base.AsterixTupleFilterFactory;
 import edu.uci.ics.asterix.runtime.transaction.TreeIndexInsertUpdateDeleteOperatorDescriptor;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -49,6 +50,8 @@
 import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ILogicalExpressionJobGen;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
@@ -824,11 +827,12 @@
 	// TODO: Use filterExpr.
 	public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertRuntime(
 			IDataSourceIndex<String, AqlSourceId> dataSourceIndex,
-			IOperatorSchema propagatedSchema,
+			IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
+			IVariableTypeEnvironment typeEnv,
 			List<LogicalVariable> primaryKeys,
-			List<LogicalVariable> secondaryKeys, ILogicalExpression filterExpr, RecordDescriptor recordDesc,
-			JobGenContext context, JobSpecification spec)
-			throws AlgebricksException {
+			List<LogicalVariable> secondaryKeys, ILogicalExpression filterExpr,
+			RecordDescriptor recordDesc, JobGenContext context,
+			JobSpecification spec) throws AlgebricksException {
 		String indexName = dataSourceIndex.getId();
 		String datasetName = dataSourceIndex.getDataSource().getId()
 				.getDatasetName();
@@ -839,26 +843,29 @@
 		}
 		AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(
 				compiledDatasetDecl, indexName);
-
-		if (cid.getKind() == IndexKind.BTREE)
+		AsterixTupleFilterFactory filterFactory = createTupleFilterFactory(
+				inputSchemas, typeEnv, filterExpr, context);
+		if (cid.getKind() == IndexKind.BTREE) {
 			return getBTreeDmlRuntime(datasetName, indexName, propagatedSchema,
-					primaryKeys, secondaryKeys, recordDesc, context, spec,
+					primaryKeys, secondaryKeys, filterFactory, recordDesc, context, spec,
 					IndexOp.INSERT);
-		else
+		} else {
 			return getRTreeDmlRuntime(datasetName, indexName, propagatedSchema,
-					primaryKeys, secondaryKeys, recordDesc, context, spec,
+					primaryKeys, secondaryKeys, filterFactory, recordDesc, context, spec,
 					IndexOp.INSERT);
+		}
 	}
 
 	@Override
 	// TODO: Use filterExpr.
 	public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexDeleteRuntime(
 			IDataSourceIndex<String, AqlSourceId> dataSourceIndex,
-			IOperatorSchema propagatedSchema,
+			IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
+			IVariableTypeEnvironment typeEnv,
 			List<LogicalVariable> primaryKeys,
-			List<LogicalVariable> secondaryKeys, ILogicalExpression filterExpr, RecordDescriptor recordDesc,
-			JobGenContext context, JobSpecification spec)
-			throws AlgebricksException {
+			List<LogicalVariable> secondaryKeys, ILogicalExpression filterExpr,
+			RecordDescriptor recordDesc, JobGenContext context,
+			JobSpecification spec) throws AlgebricksException {
 		String indexName = dataSourceIndex.getId();
 		String datasetName = dataSourceIndex.getDataSource().getId()
 				.getDatasetName();
@@ -869,23 +876,39 @@
 		}
 		AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(
 				compiledDatasetDecl, indexName);
-		if (cid.getKind() == IndexKind.BTREE)
+		AsterixTupleFilterFactory filterFactory = createTupleFilterFactory(
+				inputSchemas, typeEnv, filterExpr, context);
+		if (cid.getKind() == IndexKind.BTREE) {
 			return getBTreeDmlRuntime(datasetName, indexName, propagatedSchema,
-					primaryKeys, secondaryKeys, recordDesc, context, spec,
+					primaryKeys, secondaryKeys, filterFactory, recordDesc, context, spec,
 					IndexOp.DELETE);
-		else
+		} else {
 			return getRTreeDmlRuntime(datasetName, indexName, propagatedSchema,
-					primaryKeys, secondaryKeys, recordDesc, context, spec,
+					primaryKeys, secondaryKeys, filterFactory, recordDesc, context, spec,
 					IndexOp.DELETE);
+		}
 	}
 
+	private AsterixTupleFilterFactory createTupleFilterFactory(
+			IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv,
+			ILogicalExpression filterExpr, JobGenContext context)
+			throws AlgebricksException {
+		ILogicalExpressionJobGen exprJobGen = context.getExpressionJobGen();
+		IEvaluatorFactory filterEvalFactory = exprJobGen
+				.createEvaluatorFactory(filterExpr, typeEnv, inputSchemas,
+						context);
+		return new AsterixTupleFilterFactory(filterEvalFactory,
+				context.getBinaryBooleanInspector());
+	}
+	
 	private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getBTreeDmlRuntime(
 			String datasetName, String indexName,
 			IOperatorSchema propagatedSchema,
 			List<LogicalVariable> primaryKeys,
-			List<LogicalVariable> secondaryKeys, RecordDescriptor recordDesc,
-			JobGenContext context, JobSpecification spec, IndexOp indexOp)
-			throws AlgebricksException {
+			List<LogicalVariable> secondaryKeys,
+			AsterixTupleFilterFactory filterFactory,
+			RecordDescriptor recordDesc, JobGenContext context,
+			JobSpecification spec, IndexOp indexOp) throws AlgebricksException {
 		int numKeys = primaryKeys.size() + secondaryKeys.size();
 		// generate field permutations
 		int[] fieldPermutation = new int[numKeys];
@@ -956,7 +979,7 @@
 				appContext.getIndexRegistryProvider(),
 				splitsAndConstraint.first, typeTraits, comparatorFactories,
 				fieldPermutation, indexOp, new BTreeDataflowHelperFactory(),
-				null, NoOpOperationCallbackProvider.INSTANCE, txnId);
+				filterFactory, NoOpOperationCallbackProvider.INSTANCE, txnId);
 		return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(
 				btreeBulkLoad, splitsAndConstraint.second);
 	}
@@ -965,9 +988,10 @@
 			String datasetName, String indexName,
 			IOperatorSchema propagatedSchema,
 			List<LogicalVariable> primaryKeys,
-			List<LogicalVariable> secondaryKeys, RecordDescriptor recordDesc,
-			JobGenContext context, JobSpecification spec, IndexOp indexOp)
-			throws AlgebricksException {
+			List<LogicalVariable> secondaryKeys,
+			AsterixTupleFilterFactory filterFactory,
+			RecordDescriptor recordDesc, JobGenContext context,
+			JobSpecification spec, IndexOp indexOp) throws AlgebricksException {
 		AqlCompiledDatasetDecl compiledDatasetDecl = metadata
 				.findDataset(datasetName);
 		String itemTypeName = compiledDatasetDecl.getItemTypeName();
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/base/AsterixTupleFilter.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/base/AsterixTupleFilter.java
index 7dd7ac1..8bb0b9f 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/base/AsterixTupleFilter.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/base/AsterixTupleFilter.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package edu.uci.ics.asterix.runtime.base;
 
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/base/AsterixTupleFilterFactory.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/base/AsterixTupleFilterFactory.java
new file mode 100644
index 0000000..ce5cefd
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/base/AsterixTupleFilterFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.base;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspector;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IEvaluatorFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.ITupleFilter;
+import edu.uci.ics.hyracks.storage.am.common.api.ITupleFilterFactory;
+
+public class AsterixTupleFilterFactory implements ITupleFilterFactory {
+
+    private static final long serialVersionUID = 1L;
+    
+    private final IBinaryBooleanInspector boolInspector;
+    private final IEvaluatorFactory evalFactory;
+    
+    public AsterixTupleFilterFactory(IEvaluatorFactory evalFactory,
+            IBinaryBooleanInspector boolInspector) throws AlgebricksException {
+        this.evalFactory = evalFactory;
+        this.boolInspector = boolInspector;
+    }
+    
+    @Override
+    public ITupleFilter createTupleFilter() throws Exception {
+        return new AsterixTupleFilter(evalFactory, boolInspector);
+    }
+
+}