Secondary BTree indexes can now be created on nullable fields (nulls are filtered during load). Still need to modify insert/delete jobgen to filter nulls as well.

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix-fix-issue-9@213 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
index cb292a2..a16dcf4 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
@@ -38,6 +38,12 @@
         // Assign op.
         AlgebricksMetaOperatorDescriptor asterixAssignOp = createAssignOp(spec, primaryScanOp, numSecondaryKeys);
         
+        // If any of the secondary fields are nullable, then add a select op that filters nulls.
+        AlgebricksMetaOperatorDescriptor selectOp = null;
+        if (anySecondaryKeyIsNullable) {
+            selectOp = createFilterNullsSelectOp(spec, numSecondaryKeys);
+        }
+        
         // Sort by secondary keys.
         ExternalSortOperatorDescriptor sortOp = createSortOp(spec, secondaryComparatorFactories, secondaryRecDesc);
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, sortOp, primaryPartitionConstraint);
@@ -48,12 +54,16 @@
 
         // Connect the operators.
         spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, asterixAssignOp, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, sortOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, asterixAssignOp, 0);        
+        if (anySecondaryKeyIsNullable) {
+            spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, selectOp, 0);
+            spec.connect(new OneToOneConnectorDescriptor(spec), selectOp, 0, sortOp, 0);
+        } else {
+            spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, sortOp, 0);
+        }
         spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, secondaryBulkLoadOp, 0);
         spec.addRoot(secondaryBulkLoadOp);
         spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
         return spec;
-
     }
 }
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
index e331d10..81ba191 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
@@ -8,6 +8,7 @@
 import edu.uci.ics.asterix.common.context.AsterixStorageManagerInterface;
 import edu.uci.ics.asterix.common.context.AsterixTreeRegistryProvider;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.formats.nontagged.AqlBinaryBooleanInspectorImpl;
 import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
 import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
 import edu.uci.ics.asterix.formats.nontagged.AqlTypeTraitProvider;
@@ -18,14 +19,19 @@
 import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.runtime.evaluators.functions.AndDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.IsNullDescriptor;
+import edu.uci.ics.asterix.runtime.evaluators.functions.NotDescriptor;
 import edu.uci.ics.asterix.translator.DmlTranslator.CompiledCreateIndexStatement;
 import edu.uci.ics.hyracks.algebricks.core.algebra.data.ISerializerDeserializerProvider;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
 import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IEvaluatorFactory;
 import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.evaluators.ColumnAccessEvalFactory;
 import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
 import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.std.AssignRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.std.StreamSelectRuntimeFactory;
 import edu.uci.ics.hyracks.algebricks.core.api.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.core.api.constraints.AlgebricksPartitionConstraintHelper;
 import edu.uci.ics.hyracks.algebricks.core.api.exceptions.AlgebricksException;
@@ -64,6 +70,7 @@
     protected IFileSplitProvider primaryFileSplitProvider;
     protected AlgebricksPartitionConstraint primaryPartitionConstraint;
     protected String secondaryIndexName;
+    protected boolean anySecondaryKeyIsNullable = false;
 
     protected IBinaryComparatorFactory[] primaryComparatorFactories;
     protected RecordDescriptor primaryRecDesc;
@@ -153,7 +160,9 @@
         for (int i = 0; i < numSecondaryKeys; i++) {
             evalFactories[i] = metadata.getFormat().getFieldAccessEvaluatorFactory(itemType, secondaryKeyFields.get(i),
                     numPrimaryKeys);
-            IAType keyType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyFields.get(i), itemType);
+            Pair<IAType, Boolean> keyTypePair = AqlCompiledIndexDecl.getNonNullableKeyFieldType(secondaryKeyFields.get(i), itemType);
+            IAType keyType = keyTypePair.first;
+            anySecondaryKeyIsNullable = anySecondaryKeyIsNullable || keyTypePair.second;
             ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType);
             secondaryRecFields[i] = keySerde;
             secondaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
@@ -260,4 +269,31 @@
                 secondarySplitsAndConstraint.second);
         return treeIndexBulkLoadOp;
     }
+    
+    public AlgebricksMetaOperatorDescriptor createFilterNullsSelectOp(JobSpecification spec, int numSecondaryKeyFields) throws AlgebricksException {
+        IEvaluatorFactory[] andArgsEvalFactories = new IEvaluatorFactory[numSecondaryKeyFields];
+        NotDescriptor notDesc = new NotDescriptor();
+        IsNullDescriptor isNullDesc = new IsNullDescriptor();
+        for (int i = 0; i < numSecondaryKeyFields; i++) {
+            // Access column i, and apply 'is not null'.
+            ColumnAccessEvalFactory columnAccessEvalFactory = new ColumnAccessEvalFactory(i);
+            IEvaluatorFactory isNullEvalFactory = isNullDesc.createEvaluatorFactory(new IEvaluatorFactory[] { columnAccessEvalFactory });
+            IEvaluatorFactory notEvalFactory = notDesc.createEvaluatorFactory(new IEvaluatorFactory[] { isNullEvalFactory });
+            andArgsEvalFactories[i] = notEvalFactory;
+        }
+        IEvaluatorFactory selectCond = null;
+        if (numSecondaryKeyFields > 1) {
+            // Create conjunctive condition where all secondary index keys must satisfy 'is not null'.
+            AndDescriptor andDesc = new AndDescriptor();
+            selectCond = andDesc.createEvaluatorFactory(andArgsEvalFactories);
+        } else {
+            selectCond = andArgsEvalFactories[0];
+        }
+        StreamSelectRuntimeFactory select = new StreamSelectRuntimeFactory(selectCond, null, AqlBinaryBooleanInspectorImpl.INSTANCE);
+        AlgebricksMetaOperatorDescriptor asterixSelectOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
+                new IPushRuntimeFactory[] { select }, new RecordDescriptor[] { secondaryRecDesc });
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixSelectOp,
+                primaryPartitionConstraint);
+        return asterixSelectOp;
+    }
 }
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
index 6fb8ee0..c0de096 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
@@ -17,6 +17,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
 import edu.uci.ics.hyracks.algebricks.core.api.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import edu.uci.ics.hyracks.algebricks.core.utils.Pair;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
@@ -50,7 +51,9 @@
                             + numSecondaryKeys
                             + " fields as a key for the R-tree index. There can be only one field as a key for the R-tree index.");
         }
-        IAType spatialType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyFields.get(0), itemType);
+        Pair<IAType, Boolean> spatialTypePair = AqlCompiledIndexDecl.getNonNullableKeyFieldType(secondaryKeyFields.get(0), itemType);
+        IAType spatialType = spatialTypePair.first;
+        anySecondaryKeyIsNullable = spatialTypePair.second;
         if (spatialType == null) {
             throw new AsterixException("Could not find field " + secondaryKeyFields.get(0) + " in the schema.");
         }
@@ -63,8 +66,7 @@
         ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys
                 + numNestedSecondaryKeyFields];
         ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numNestedSecondaryKeyFields + numPrimaryKeys];
-        IAType keyType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyFields.get(0), itemType);
-        IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(keyType.getTypeTag());
+        IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(spatialType.getTypeTag());
         for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
             ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
                     .getSerializerDeserializer(nestedKeyType);
diff --git a/asterix-app/src/test/resources/runtimets/queries/index/cust-index-age-nullable.aql b/asterix-app/src/test/resources/runtimets/queries/index/cust-index-age-nullable.aql
new file mode 100644
index 0000000..19348a6
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/index/cust-index-age-nullable.aql
@@ -0,0 +1,33 @@
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type AddressType as open {
+  number: int32, 
+  street: string,
+  city: string
+}
+
+create type CustomerType as open {
+  cid: int32, 
+  name: string,
+  age: int32?,
+  address: AddressType?,
+  interests: {{string}},
+  children: [ { name: string, age: int32? } ]
+}
+
+create dataset Customers(CustomerType) partitioned by key cid;
+
+load dataset Customers 
+using "edu.uci.ics.asterix.external.dataset.adapter.NCFileSystemAdapter"
+(("path"="nc1://data/semistructured/tiny01/customer.adm"),("format"="adm"));
+
+create index age_index on Customers(age);
+
+write output to nc1:"rttest/index_cust-index-age-nullable.adm";
+
+for $c in dataset('Customers')
+where $c.age < 20
+order by $c.cid
+return $c
diff --git a/asterix-app/src/test/resources/runtimets/queries/index/rtree-secondary-index-nullable.aql b/asterix-app/src/test/resources/runtimets/queries/index/rtree-secondary-index-nullable.aql
new file mode 100644
index 0000000..a4555f4
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/index/rtree-secondary-index-nullable.aql
@@ -0,0 +1,30 @@
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type MyRecord as closed {
+  id: int32,
+  point: point?,
+  kwds: string,
+  line1: line,
+  line2: line,
+  poly1: polygon,
+  poly2: polygon,
+  rec: rectangle
+}
+
+create dataset MyData(MyRecord)
+  partitioned by key id;
+
+load dataset MyData 
+using "edu.uci.ics.asterix.external.dataset.adapter.NCFileSystemAdapter" 
+(("path"="nc1://data/spatial/spatialDataNulls.json"),("format"="adm"));
+
+create index rtree_index_point on MyData(point) type rtree;
+
+write output to nc1:"rttest/index_rtree-secondary-index-nullable.adm";
+
+for $o in dataset('MyData')
+where spatial-intersect($o.point, create-polygon(create-point(4.0,1.0), create-point(4.0,4.0), create-point(12.0,4.0), create-point(12.0,1.0)))
+order by $o.id
+return {"id":$o.id}
diff --git a/asterix-app/src/test/resources/runtimets/queries/index/rtree-secondary-index.aql b/asterix-app/src/test/resources/runtimets/queries/index/rtree-secondary-index.aql
index 6dab809..01b2981 100644
--- a/asterix-app/src/test/resources/runtimets/queries/index/rtree-secondary-index.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/index/rtree-secondary-index.aql
@@ -22,7 +22,6 @@
 
 create index rtree_index_point on MyData(point) type rtree;
 
-
 write output to nc1:"rttest/index_rtree-secondary-index.adm";
 
 for $o in dataset('MyData')
diff --git a/asterix-app/src/test/resources/runtimets/results/index/cust-index-age-nullable.adm b/asterix-app/src/test/resources/runtimets/results/index/cust-index-age-nullable.adm
new file mode 100644
index 0000000..f5b5521
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/index/cust-index-age-nullable.adm
@@ -0,0 +1,2 @@
+{ "cid": 92, "name": "Kenny Laychock", "age": 15, "address": { "number": 4790, "street": "Washington St.", "city": "Portland" }, "interests": {{ "Video Games", "Basketball" }}, "children": [  ] }
+{ "cid": 112, "name": "Dorie Lave", "age": 10, "address": { "number": 2286, "street": "Lake St.", "city": "Los Angeles" }, "interests": {{ "Coffee" }}, "children": [ { "name": "Grady Lave", "age": null }, { "name": "Daysi Lave", "age": null } ] }
diff --git a/asterix-app/src/test/resources/runtimets/results/index/rtree-secondary-index-nullable.adm b/asterix-app/src/test/resources/runtimets/results/index/rtree-secondary-index-nullable.adm
new file mode 100644
index 0000000..2ffad7c
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/index/rtree-secondary-index-nullable.adm
@@ -0,0 +1 @@
+{ "id": 12 }
\ No newline at end of file