Fixed secondary indexes comparators in search plans. Fixed null pointer exception in inverted index dml ops.
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 17d4b73..1fb64be 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -141,6 +141,7 @@
 import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
 import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
@@ -148,6 +149,7 @@
 import edu.uci.ics.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.PartitionedLSMInvertedIndexDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.rtree.dataflow.LSMRTreeDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.rtree.dataflow.RTreeSearchOperatorDescriptor;
@@ -500,6 +502,8 @@
             int numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
             RecordDescriptor outputRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
             int[] bloomFilterKeyFields;
+            ITypeTraits[] typeTraits;
+            IBinaryComparatorFactory[] comparatorFactories;
             if (isSecondary) {
                 Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
                         dataset.getDatasetName(), indexName);
@@ -508,21 +512,23 @@
                 for (int i = 0; i < numSecondaryKeys; i++) {
                     bloomFilterKeyFields[i] = i;
                 }
+                typeTraits = JobGenHelper.variablesToTypeTraits(outputVars, 0, outputVars.size(), typeEnv, context);
+                comparatorFactories = JobGenHelper.variablesToAscBinaryComparatorFactories(outputVars, 0,
+                        outputVars.size(), typeEnv, context);
             } else {
                 bloomFilterKeyFields = new int[numPrimaryKeys];
                 for (int i = 0; i < numPrimaryKeys; i++) {
                     bloomFilterKeyFields[i] = i;
                 }
+                String itemTypeName = dataset.getItemTypeName();
+                ARecordType itemType = (ARecordType) MetadataManager.INSTANCE.getDatatype(mdTxnCtx,
+                        dataset.getDataverseName(), itemTypeName).getDatatype();
+
+                typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType);
+                comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset, itemType,
+                        context.getBinaryComparatorFactoryProvider());
             }
 
-            String itemTypeName = dataset.getItemTypeName();
-            ARecordType itemType = (ARecordType) MetadataManager.INSTANCE.getDatatype(mdTxnCtx,
-                    dataset.getDataverseName(), itemTypeName).getDatatype();
-
-            ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType);
-            IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset,
-                    itemType, context.getBinaryComparatorFactoryProvider());
-
             IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
             try {
@@ -615,13 +621,14 @@
             IBinaryComparatorFactory[] comparatorFactories = JobGenHelper.variablesToAscBinaryComparatorFactories(
                     outputVars, keysStartIndex, numNestedSecondaryKeyFields, typeEnv, context);
             ITypeTraits[] typeTraits = JobGenHelper.variablesToTypeTraits(outputVars, keysStartIndex,
-                    numNestedSecondaryKeyFields, typeEnv, context);
+                    numNestedSecondaryKeyFields + numPrimaryKeys, typeEnv, context);
             IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc = splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
                     dataset.getDataverseName(), dataset.getDatasetName(), indexName);
 
             IBinaryComparatorFactory[] primaryComparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
                     dataset, recType, context.getBinaryComparatorFactoryProvider());
+
             IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(keyType.getTypeTag());
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
                     dataset, mdTxnCtx);
@@ -1135,7 +1142,8 @@
             ITypeTraits[] tokenTypeTraits = new ITypeTraits[numTokenFields];
             ITypeTraits[] invListsTypeTraits = new ITypeTraits[primaryKeys.size()];
             IBinaryComparatorFactory[] tokenComparatorFactories = new IBinaryComparatorFactory[numTokenFields];
-            IBinaryComparatorFactory[] invListComparatorFactories = new IBinaryComparatorFactory[primaryKeys.size()];
+            IBinaryComparatorFactory[] invListComparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
+                    dataset, recType, context.getBinaryComparatorFactoryProvider());
 
             IAType secondaryKeyType = null;
             for (i = 0; i < secondaryKeys.size(); ++i) {
@@ -1176,15 +1184,27 @@
 
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
                     dataset, mdTxnCtx);
+            IIndexDataflowHelperFactory indexDataFlowFactory;
+            if (!isPartitioned) {
+                indexDataFlowFactory = new LSMInvertedIndexDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
+                        datasetId), compactionInfo.first, compactionInfo.second,
+                        new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                        LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
+                        storageProperties.getBloomFilterFalsePositiveRate());
+            } else {
+                indexDataFlowFactory = new PartitionedLSMInvertedIndexDataflowHelperFactory(
+                        new AsterixVirtualBufferCacheProvider(datasetId), compactionInfo.first, compactionInfo.second,
+                        new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                        LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
+                        storageProperties.getBloomFilterFalsePositiveRate());
+            }
             AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor insertDeleteOp = new AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor(
                     spec, recordDesc, appContext.getStorageManagerInterface(), splitsAndConstraint.first,
                     appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits, tokenComparatorFactories,
                     invListsTypeTraits, invListComparatorFactories, tokenizerFactory, fieldPermutation, indexOp,
-                    new LSMInvertedIndexDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(datasetId),
-                            compactionInfo.first, compactionInfo.second, new SecondaryIndexOperationTrackerProvider(
-                                    dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                            LSMInvertedIndexIOOperationCallbackFactory.INSTANCE, storageProperties
-                                    .getBloomFilterFalsePositiveRate()), filterFactory, modificationCallbackFactory);
+                    indexDataFlowFactory, filterFactory, modificationCallbackFactory);
             return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(insertDeleteOp,
                     splitsAndConstraint.second);
         } catch (MetadataException e) {
@@ -1217,7 +1237,7 @@
             int numPrimaryKeys = primaryKeys.size();
             int numKeys = numSecondaryKeys + numPrimaryKeys;
             ITypeTraits[] typeTraits = new ITypeTraits[numKeys];
-            IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numKeys];
+            IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numSecondaryKeys];
             int[] fieldPermutation = new int[numKeys];
             int[] modificationCallbackPrimaryKeyFields = new int[primaryKeys.size()];
             int i = 0;
@@ -1246,8 +1266,6 @@
             List<String> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
             for (String partitioningKey : partitioningKeys) {
                 IAType keyType = recType.getFieldType(partitioningKey);
-                comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
-                        keyType, true);
                 typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
                 ++i;
             }