Merge branch 'peili/crash_recovery_tests' into salsubaiee/master_fix_search_comparators
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexOperationsHelper.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexOperationsHelper.java
index 5ce62d7..34a008f 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexOperationsHelper.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexOperationsHelper.java
@@ -233,7 +233,11 @@
     protected void setSecondaryRecDescAndComparators(IndexType indexType, List<String> secondaryKeyFields,
             int gramLength, AqlMetadataProvider metadataProvider) throws AlgebricksException, AsterixException {
         secondaryFieldAccessEvalFactories = new ICopyEvaluatorFactory[numSecondaryKeys];
-        secondaryComparatorFactories = new IBinaryComparatorFactory[numSecondaryKeys + numPrimaryKeys];
+        if (indexType == IndexType.RTREE) {
+            secondaryComparatorFactories = new IBinaryComparatorFactory[numSecondaryKeys];
+        } else {
+            secondaryComparatorFactories = new IBinaryComparatorFactory[numSecondaryKeys + numPrimaryKeys];
+        }
         secondaryBloomFilterKeyFields = new int[numSecondaryKeys];
         ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys + numSecondaryKeys];
         ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numSecondaryKeys + numPrimaryKeys];
@@ -257,7 +261,9 @@
         for (int i = 0; i < numPrimaryKeys; i++) {
             secondaryRecFields[numSecondaryKeys + i] = primaryRecDesc.getFields()[i];
             secondaryTypeTraits[numSecondaryKeys + i] = primaryRecDesc.getTypeTraits()[i];
-            secondaryComparatorFactories[numSecondaryKeys + i] = primaryComparatorFactories[i];
+            if (indexType != IndexType.RTREE) {
+                secondaryComparatorFactories[numSecondaryKeys + i] = primaryComparatorFactories[i];
+            }
         }
         secondaryRecDesc = new RecordDescriptor(secondaryRecFields, secondaryTypeTraits);
     }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index d29e2c5..1fb64be 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -141,6 +141,7 @@
 import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
 import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
@@ -148,6 +149,7 @@
 import edu.uci.ics.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.PartitionedLSMInvertedIndexDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.rtree.dataflow.LSMRTreeDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.rtree.dataflow.RTreeSearchOperatorDescriptor;
@@ -499,26 +501,33 @@
             }
             int numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
             RecordDescriptor outputRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
-            ITypeTraits[] typeTraits = null;
             int[] bloomFilterKeyFields;
+            ITypeTraits[] typeTraits;
+            IBinaryComparatorFactory[] comparatorFactories;
             if (isSecondary) {
                 Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
                         dataset.getDatasetName(), indexName);
                 int numSecondaryKeys = secondaryIndex.getKeyFieldNames().size();
-                typeTraits = JobGenHelper.variablesToTypeTraits(outputVars, 0, outputVars.size(), typeEnv, context);
                 bloomFilterKeyFields = new int[numSecondaryKeys];
                 for (int i = 0; i < numSecondaryKeys; i++) {
                     bloomFilterKeyFields[i] = i;
                 }
-            } else {
                 typeTraits = JobGenHelper.variablesToTypeTraits(outputVars, 0, outputVars.size(), typeEnv, context);
+                comparatorFactories = JobGenHelper.variablesToAscBinaryComparatorFactories(outputVars, 0,
+                        outputVars.size(), typeEnv, context);
+            } else {
                 bloomFilterKeyFields = new int[numPrimaryKeys];
                 for (int i = 0; i < numPrimaryKeys; i++) {
                     bloomFilterKeyFields[i] = i;
                 }
+                String itemTypeName = dataset.getItemTypeName();
+                ARecordType itemType = (ARecordType) MetadataManager.INSTANCE.getDatatype(mdTxnCtx,
+                        dataset.getDataverseName(), itemTypeName).getDatatype();
+
+                typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType);
+                comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset, itemType,
+                        context.getBinaryComparatorFactoryProvider());
             }
-            IBinaryComparatorFactory[] comparatorFactories = JobGenHelper.variablesToAscBinaryComparatorFactories(
-                    outputVars, 0, outputVars.size(), typeEnv, context);
 
             IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
@@ -612,13 +621,14 @@
             IBinaryComparatorFactory[] comparatorFactories = JobGenHelper.variablesToAscBinaryComparatorFactories(
                     outputVars, keysStartIndex, numNestedSecondaryKeyFields, typeEnv, context);
             ITypeTraits[] typeTraits = JobGenHelper.variablesToTypeTraits(outputVars, keysStartIndex,
-                    numNestedSecondaryKeyFields, typeEnv, context);
+                    numNestedSecondaryKeyFields + numPrimaryKeys, typeEnv, context);
             IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc = splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
                     dataset.getDataverseName(), dataset.getDatasetName(), indexName);
 
             IBinaryComparatorFactory[] primaryComparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
                     dataset, recType, context.getBinaryComparatorFactoryProvider());
+
             IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(keyType.getTypeTag());
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
                     dataset, mdTxnCtx);
@@ -1132,7 +1142,8 @@
             ITypeTraits[] tokenTypeTraits = new ITypeTraits[numTokenFields];
             ITypeTraits[] invListsTypeTraits = new ITypeTraits[primaryKeys.size()];
             IBinaryComparatorFactory[] tokenComparatorFactories = new IBinaryComparatorFactory[numTokenFields];
-            IBinaryComparatorFactory[] invListComparatorFactories = new IBinaryComparatorFactory[primaryKeys.size()];
+            IBinaryComparatorFactory[] invListComparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
+                    dataset, recType, context.getBinaryComparatorFactoryProvider());
 
             IAType secondaryKeyType = null;
             for (i = 0; i < secondaryKeys.size(); ++i) {
@@ -1173,15 +1184,27 @@
 
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
                     dataset, mdTxnCtx);
+            IIndexDataflowHelperFactory indexDataFlowFactory;
+            if (!isPartitioned) {
+                indexDataFlowFactory = new LSMInvertedIndexDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
+                        datasetId), compactionInfo.first, compactionInfo.second,
+                        new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                        LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
+                        storageProperties.getBloomFilterFalsePositiveRate());
+            } else {
+                indexDataFlowFactory = new PartitionedLSMInvertedIndexDataflowHelperFactory(
+                        new AsterixVirtualBufferCacheProvider(datasetId), compactionInfo.first, compactionInfo.second,
+                        new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                        LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
+                        storageProperties.getBloomFilterFalsePositiveRate());
+            }
             AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor insertDeleteOp = new AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor(
                     spec, recordDesc, appContext.getStorageManagerInterface(), splitsAndConstraint.first,
                     appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits, tokenComparatorFactories,
                     invListsTypeTraits, invListComparatorFactories, tokenizerFactory, fieldPermutation, indexOp,
-                    new LSMInvertedIndexDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(datasetId),
-                            compactionInfo.first, compactionInfo.second, new SecondaryIndexOperationTrackerProvider(
-                                    dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                            LSMInvertedIndexIOOperationCallbackFactory.INSTANCE, storageProperties
-                                    .getBloomFilterFalsePositiveRate()), filterFactory, modificationCallbackFactory);
+                    indexDataFlowFactory, filterFactory, modificationCallbackFactory);
             return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(insertDeleteOp,
                     splitsAndConstraint.second);
         } catch (MetadataException e) {
@@ -1214,7 +1237,7 @@
             int numPrimaryKeys = primaryKeys.size();
             int numKeys = numSecondaryKeys + numPrimaryKeys;
             ITypeTraits[] typeTraits = new ITypeTraits[numKeys];
-            IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numKeys];
+            IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numSecondaryKeys];
             int[] fieldPermutation = new int[numKeys];
             int[] modificationCallbackPrimaryKeyFields = new int[primaryKeys.size()];
             int i = 0;
@@ -1243,8 +1266,6 @@
             List<String> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
             for (String partitioningKey : partitioningKeys) {
                 IAType keyType = recType.getFieldType(partitioningKey);
-                comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
-                        keyType, true);
                 typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
                 ++i;
             }