Checkpointing progress towards integrating length-partitioned fuzzy indexes into Asterix. Basic bulk load and search seem to work. Needs testing.

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_lsm_length_filter@927 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/InvertedIndexPOperator.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/InvertedIndexPOperator.java
index c631530..f711a38 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/InvertedIndexPOperator.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/InvertedIndexPOperator.java
@@ -44,6 +44,8 @@
 import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.primitive.ShortPointable;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearchModifierFactory;
@@ -56,8 +58,12 @@
  * inverted-index search.
  */
 public class InvertedIndexPOperator extends IndexSearchPOperator {
-    public InvertedIndexPOperator(IDataSourceIndex<String, AqlSourceId> idx, boolean requiresBroadcast) {
+    private final boolean isPartitioned;
+
+    public InvertedIndexPOperator(IDataSourceIndex<String, AqlSourceId> idx, boolean requiresBroadcast,
+            boolean isPartitioned) {
         super(idx, requiresBroadcast);
+        this.isPartitioned = isPartitioned;
     }
 
     @Override
@@ -139,18 +145,23 @@
             }
 
             // TODO: For now we assume the type of the generated tokens is the
-            // same
-            // as the indexed field.
+            // same as the indexed field.
             // We need a better way of expressing this because tokens may be
-            // hashed,
-            // or an inverted-index may index a list type, etc.
-            ITypeTraits[] tokenTypeTraits = new ITypeTraits[numSecondaryKeys];
-            IBinaryComparatorFactory[] tokenComparatorFactories = new IBinaryComparatorFactory[numSecondaryKeys];
+            // hashed, or an inverted-index may index a list type, etc.
+            int numTokenKeys = (!isPartitioned) ? numSecondaryKeys : numSecondaryKeys + 1;
+            ITypeTraits[] tokenTypeTraits = new ITypeTraits[numTokenKeys];
+            IBinaryComparatorFactory[] tokenComparatorFactories = new IBinaryComparatorFactory[numTokenKeys];
             for (int i = 0; i < numSecondaryKeys; i++) {
                 tokenComparatorFactories[i] = InvertedIndexAccessMethod
                         .getTokenBinaryComparatorFactory(secondaryKeyType);
                 tokenTypeTraits[i] = InvertedIndexAccessMethod.getTokenTypeTrait(secondaryKeyType);
             }
+            if (isPartitioned) {
+                // The partitioning field is hardcoded to be a short *without* an Asterix type tag.
+                tokenComparatorFactories[numSecondaryKeys] = PointableBinaryComparatorFactory
+                        .of(ShortPointable.FACTORY);
+                tokenTypeTraits[numSecondaryKeys] = ShortPointable.TYPE_TRAITS;
+            }
 
             IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(unnestMap);
             List<LogicalVariable> outputVars = unnestMap.getVariables();
@@ -170,9 +181,6 @@
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadataProvider
                     .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataset.getDataverseName(),
                             datasetName, indexName);
-            Pair<IFileSplitProvider, IFileSplitProvider> fileSplitProviders = metadataProvider
-                    .getInvertedIndexFileSplitProviders(secondarySplitsAndConstraint.first);
-
             // TODO: Here we assume there is only one search key field.
             int queryField = keyFields[0];
             // Get tokenizer and search modifier factories.
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
index 19791e3..066de80 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
@@ -173,12 +173,14 @@
                                 op.setPhysicalOperator(new RTreeSearchPOperator(dsi, requiresBroadcast));
                                 break;
                             }
-                            case WORD_INVIX: {
-                                op.setPhysicalOperator(new InvertedIndexPOperator(dsi, requiresBroadcast));
+                            case WORD_INVIX:
+                            case NGRAM_INVIX: {
+                                op.setPhysicalOperator(new InvertedIndexPOperator(dsi, requiresBroadcast, false));
                                 break;
                             }
-                            case NGRAM_INVIX: {
-                                op.setPhysicalOperator(new InvertedIndexPOperator(dsi, requiresBroadcast));
+                            case FUZZY_WORD_INVIX:
+                            case FUZZY_NGRAM_INVIX: {
+                                op.setPhysicalOperator(new InvertedIndexPOperator(dsi, requiresBroadcast, true));
                                 break;
                             }
                             default: {
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AccessMethodUtils.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AccessMethodUtils.java
index 3215707..f62abb9 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AccessMethodUtils.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AccessMethodUtils.java
@@ -132,7 +132,9 @@
         switch (index.getIndexType()) {
             case BTREE:
             case WORD_INVIX:
-            case NGRAM_INVIX: {
+            case NGRAM_INVIX: 
+            case FUZZY_WORD_INVIX:
+            case FUZZY_NGRAM_INVIX: {
                 return index.getKeyFieldNames().size();
             }
             case RTREE: {
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java
index 3867baa..f1a9f9b 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java
@@ -657,7 +657,7 @@
             }
             // We can only optimize edit distance on lists using a word index.
             if ((listOrStrObj.getType().getTypeTag() == ATypeTag.ORDEREDLIST || listOrStrObj.getType().getTypeTag() == ATypeTag.UNORDEREDLIST)
-                    && index.getIndexType() == IndexType.WORD_INVIX) {
+                    && (index.getIndexType() == IndexType.WORD_INVIX || index.getIndexType() == IndexType.FUZZY_WORD_INVIX)) {
                 IACollection alist = (IACollection) listOrStrObj;
                 // Compute merge threshold.
                 mergeThreshold = alist.size() - edThresh.getIntegerValue();
@@ -683,11 +683,11 @@
                 AbstractFunctionCallExpression nonConstfuncExpr = (AbstractFunctionCallExpression) nonConstArg;
                 // We can use this index if the tokenization function matches the index type.
                 if (nonConstfuncExpr.getFunctionIdentifier() == AsterixBuiltinFunctions.WORD_TOKENS
-                        && index.getIndexType() == IndexType.WORD_INVIX) {
+                        && (index.getIndexType() == IndexType.WORD_INVIX || index.getIndexType() == IndexType.FUZZY_WORD_INVIX)) {
                     return true;
                 }
                 if (nonConstfuncExpr.getFunctionIdentifier() == AsterixBuiltinFunctions.GRAM_TOKENS
-                        && index.getIndexType() == IndexType.NGRAM_INVIX) {
+                        && (index.getIndexType() == IndexType.NGRAM_INVIX || index.getIndexType() == IndexType.FUZZY_NGRAM_INVIX)) {
                     return true;
                 }
             }
@@ -700,7 +700,7 @@
         }
         // We can only optimize contains with ngram indexes.
         if (optFuncExpr.getFuncExpr().getFunctionIdentifier() == AsterixBuiltinFunctions.CONTAINS
-                && index.getIndexType() == IndexType.NGRAM_INVIX) {
+                && (index.getIndexType() == IndexType.NGRAM_INVIX || index.getIndexType() == IndexType.FUZZY_NGRAM_INVIX)) {
             // Check that the constant search string has at least gramLength characters.
             AsterixConstantValue strConstVal = (AsterixConstantValue) optFuncExpr.getConstantVal(0);
             IAObject strObj = strConstVal.getObject();
@@ -753,10 +753,12 @@
     public static IBinaryTokenizerFactory getBinaryTokenizerFactory(SearchModifierType searchModifierType,
             ATypeTag searchKeyType, Index index) throws AlgebricksException {
         switch (index.getIndexType()) {
-            case WORD_INVIX: {
+            case WORD_INVIX:
+            case FUZZY_WORD_INVIX: {
                 return AqlBinaryTokenizerFactoryProvider.INSTANCE.getWordTokenizerFactory(searchKeyType, false);
             }
-            case NGRAM_INVIX: {
+            case NGRAM_INVIX:
+            case FUZZY_NGRAM_INVIX: {
                 // Make sure not to use pre- and postfixing for conjunctive searches.
                 boolean prePost = (searchModifierType == SearchModifierType.CONJUNCTIVE) ? false : true;
                 return AqlBinaryTokenizerFactoryProvider.INSTANCE.getNGramTokenizerFactory(searchKeyType,
@@ -771,10 +773,12 @@
     public static IBinaryTokenizerFactory getBinaryTokenizerFactory(ATypeTag keyType, IndexType indexType,
             int gramLength) throws AlgebricksException {
         switch (indexType) {
-            case WORD_INVIX: {
+            case WORD_INVIX:
+            case FUZZY_WORD_INVIX: {
                 return AqlBinaryTokenizerFactoryProvider.INSTANCE.getWordTokenizerFactory(keyType, false);
             }
-            case NGRAM_INVIX: {
+            case NGRAM_INVIX:
+            case FUZZY_NGRAM_INVIX: {
                 return AqlBinaryTokenizerFactoryProvider.INSTANCE.getNGramTokenizerFactory(keyType, gramLength, true,
                         false);
             }
@@ -797,11 +801,13 @@
             case EDIT_DISTANCE: {
                 int edThresh = ((AInt32) simThresh).getIntegerValue();
                 switch (index.getIndexType()) {
-                    case NGRAM_INVIX: {
+                    case NGRAM_INVIX:
+                    case FUZZY_NGRAM_INVIX: {
                         // Edit distance on strings, filtered with overlapping grams.
                         return new EditDistanceSearchModifierFactory(index.getGramLength(), edThresh);
                     }
-                    case WORD_INVIX: {
+                    case WORD_INVIX:
+                    case FUZZY_WORD_INVIX: {
                         // Edit distance on two lists. The list-elements are non-overlapping.
                         return new ListEditDistanceSearchModifierFactory(edThresh);
                     }
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
index a92afd5..080b103 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
@@ -101,7 +101,9 @@
                 break;
             }
             case WORD_INVIX:
-            case NGRAM_INVIX: {
+            case NGRAM_INVIX:
+            case FUZZY_WORD_INVIX:
+            case FUZZY_NGRAM_INVIX: {
                 indexCreator = new SecondaryInvertedIndexCreator(physOptConf);
                 break;
             }
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java
index 1be74cf..5e9a214 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java
@@ -2,6 +2,7 @@
 
 import java.util.List;
 
+import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
 import edu.uci.ics.asterix.common.context.AsterixRuntimeComponentsProvider;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
@@ -24,15 +25,20 @@
 import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.primitive.ShortPointable;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.ShortSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexBulkLoadOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexCreateOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.PartitionedLSMInvertedIndexDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
 import edu.uci.ics.hyracks.storage.common.file.ILocalResourceFactoryProvider;
 import edu.uci.ics.hyracks.storage.common.file.TransientLocalResourceFactoryProvider;
@@ -48,7 +54,8 @@
     private int numTokenKeyPairFields;
     private IBinaryComparatorFactory[] tokenKeyPairComparatorFactories;
     private RecordDescriptor tokenKeyPairRecDesc;
-
+    private boolean isPartitioned;
+    
     protected SecondaryInvertedIndexCreator(PhysicalOptimizationConfig physOptConf) {
         super(physOptConf);
     }
@@ -64,6 +71,11 @@
         if (numSecondaryKeys > 1) {
             throw new AsterixException("Cannot create composite inverted index on multiple fields.");
         }
+        if (createIndexStmt.getIndexType() == IndexType.FUZZY_WORD_INVIX || createIndexStmt.getIndexType() == IndexType.FUZZY_NGRAM_INVIX) {
+            isPartitioned = true;
+        } else {
+            isPartitioned = false;
+        }
         // Prepare record descriptor used in the assign op, and the optional
         // select op.
         List<String> secondaryKeyFields = createIndexStmt.getKeyFields();
@@ -84,10 +96,16 @@
         }
         secondaryRecDesc = new RecordDescriptor(secondaryRecFields, secondaryTypeTraits);
         // Comparators and type traits for tokens.
-        tokenComparatorFactories = new IBinaryComparatorFactory[numSecondaryKeys];
-        tokenTypeTraits = new ITypeTraits[numSecondaryKeys];
+        int numTokenFields = (!isPartitioned) ? numSecondaryKeys : numSecondaryKeys + 1;
+        tokenComparatorFactories = new IBinaryComparatorFactory[numTokenFields];
+        tokenTypeTraits = new ITypeTraits[numTokenFields];
         tokenComparatorFactories[0] = InvertedIndexAccessMethod.getTokenBinaryComparatorFactory(secondaryKeyType);
         tokenTypeTraits[0] = InvertedIndexAccessMethod.getTokenTypeTrait(secondaryKeyType);
+        if (isPartitioned) {
+            // The partitioning field is hardcoded to be a short *without* an Asterix type tag.
+            tokenComparatorFactories[1] = PointableBinaryComparatorFactory.of(ShortPointable.FACTORY);
+            tokenTypeTraits[1] = ShortPointable.TYPE_TRAITS;
+        }
         // Set tokenizer factory.
         // TODO: We might want to expose the hashing option at the AQL level,
         // and add the choice to the index metadata.
@@ -100,8 +118,8 @@
             invListsTypeTraits[i] = primaryRecDesc.getTypeTraits()[i];
         }
         // For tokenization, sorting and loading.
-        // One token + primary keys.
-        numTokenKeyPairFields = 1 + numPrimaryKeys;
+        // One token (+ optional partitioning field) + primary keys.
+        numTokenKeyPairFields = (!isPartitioned) ? 1 + numPrimaryKeys : 2 + numPrimaryKeys;
         ISerializerDeserializer[] tokenKeyPairFields = new ISerializerDeserializer[numTokenKeyPairFields];
         ITypeTraits[] tokenKeyPairTypeTraits = new ITypeTraits[numTokenKeyPairFields];
         tokenKeyPairComparatorFactories = new IBinaryComparatorFactory[numTokenKeyPairFields];
@@ -109,10 +127,17 @@
         tokenKeyPairTypeTraits[0] = tokenTypeTraits[0];
         tokenKeyPairComparatorFactories[0] = InvertedIndexAccessMethod
                 .getTokenBinaryComparatorFactory(secondaryKeyType);
+        int pkOff = 1;
+        if (isPartitioned) {
+            tokenKeyPairFields[1] = ShortSerializerDeserializer.INSTANCE;
+            tokenKeyPairTypeTraits[1] = tokenTypeTraits[1];
+            tokenKeyPairComparatorFactories[1] = PointableBinaryComparatorFactory.of(ShortPointable.FACTORY);
+            pkOff = 2;
+        }
         for (int i = 0; i < numPrimaryKeys; i++) {
-            tokenKeyPairFields[i + 1] = primaryRecDesc.getFields()[i];
-            tokenKeyPairTypeTraits[i + 1] = primaryRecDesc.getTypeTraits()[i];
-            tokenKeyPairComparatorFactories[i + 1] = primaryComparatorFactories[i];
+            tokenKeyPairFields[i + pkOff] = primaryRecDesc.getFields()[i];
+            tokenKeyPairTypeTraits[i + pkOff] = primaryRecDesc.getTypeTraits()[i];
+            tokenKeyPairComparatorFactories[i + pkOff] = primaryComparatorFactories[i];
         }
         tokenKeyPairRecDesc = new RecordDescriptor(tokenKeyPairFields, tokenKeyPairTypeTraits);
     }
@@ -120,17 +145,14 @@
     @Override
     public JobSpecification buildCreationJobSpec() throws AsterixException, AlgebricksException {
         JobSpecification spec = new JobSpecification();
-        //TODO replace the transient one to persistent one
+        // TODO: replace the transient one to persistent one.
         ILocalResourceFactoryProvider localResourceFactoryProvider = new TransientLocalResourceFactoryProvider();
+        IIndexDataflowHelperFactory dataflowHelperFactory = createDataflowHelperFactory();
         LSMInvertedIndexCreateOperatorDescriptor invIndexCreateOp = new LSMInvertedIndexCreateOperatorDescriptor(spec,
                 AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, secondaryFileSplitProvider,
                 AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, tokenTypeTraits, tokenComparatorFactories,
-                invListsTypeTraits, primaryComparatorFactories, tokenizerFactory,
-                new LSMInvertedIndexDataflowHelperFactory(AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
-                        AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
-                        AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
-                        AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER), localResourceFactoryProvider,
-                NoOpOperationCallbackFactory.INSTANCE);
+                invListsTypeTraits, primaryComparatorFactories, tokenizerFactory, dataflowHelperFactory,
+                localResourceFactoryProvider, NoOpOperationCallbackFactory.INSTANCE);
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, invIndexCreateOp,
                 secondaryPartitionConstraint);
         spec.addRoot(invIndexCreateOp);
@@ -184,16 +206,13 @@
     }
 
     private AbstractOperatorDescriptor createTokenizerOp(JobSpecification spec) throws AlgebricksException {
-        int[] fieldsToTokenize = new int[numSecondaryKeys];
-        for (int i = 0; i < numSecondaryKeys; i++) {
-            fieldsToTokenize[i] = i;
-        }
+        int docField = 0;
         int[] primaryKeyFields = new int[numPrimaryKeys];
         for (int i = 0; i < numPrimaryKeys; i++) {
             primaryKeyFields[i] = numSecondaryKeys + i;
         }
         BinaryTokenizerOperatorDescriptor tokenizerOp = new BinaryTokenizerOperatorDescriptor(spec,
-                tokenKeyPairRecDesc, tokenizerFactory, fieldsToTokenize, primaryKeyFields);
+                tokenKeyPairRecDesc, tokenizerFactory, docField, primaryKeyFields, isPartitioned);
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, tokenizerOp,
                 primaryPartitionConstraint);
         return tokenizerOp;
@@ -214,21 +233,34 @@
     }
 
     private LSMInvertedIndexBulkLoadOperatorDescriptor createInvertedIndexBulkLoadOp(JobSpecification spec) {
-        int[] fieldPermutation = new int[numSecondaryKeys + numPrimaryKeys];
+        int[] fieldPermutation = new int[numTokenKeyPairFields];
         for (int i = 0; i < numTokenKeyPairFields; i++) {
             fieldPermutation[i] = i;
         }
+        IIndexDataflowHelperFactory dataflowHelperFactory = createDataflowHelperFactory();
         LSMInvertedIndexBulkLoadOperatorDescriptor invIndexBulkLoadOp = new LSMInvertedIndexBulkLoadOperatorDescriptor(
                 spec, fieldPermutation, false, AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
                 secondaryFileSplitProvider, AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, tokenTypeTraits,
                 tokenComparatorFactories, invListsTypeTraits, primaryComparatorFactories, tokenizerFactory,
-                new LSMInvertedIndexDataflowHelperFactory(AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
-                        AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
-                        AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
-                        AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER),
-                NoOpOperationCallbackFactory.INSTANCE);
+                dataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, invIndexBulkLoadOp,
                 secondaryPartitionConstraint);
         return invIndexBulkLoadOp;
     }
+    
+    private IIndexDataflowHelperFactory createDataflowHelperFactory() {
+        if (!isPartitioned) {
+            return new LSMInvertedIndexDataflowHelperFactory(
+                    AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
+                    AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
+                    AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
+                    AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER);
+        } else {
+            return new PartitionedLSMInvertedIndexDataflowHelperFactory(
+                    AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
+                    AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
+                    AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
+                    AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER);
+        }
+    }
 }
diff --git a/asterix-aql/src/main/javacc/AQL.jj b/asterix-aql/src/main/javacc/AQL.jj
index eaa4260..59a7753 100644
--- a/asterix-aql/src/main/javacc/AQL.jj
+++ b/asterix-aql/src/main/javacc/AQL.jj
@@ -398,9 +398,10 @@
   	("," <IDENTIFIER> { cis.addFieldExpr(token.image); })*
   <RIGHTPAREN>
     ("type"
-  		("btree" { cis.setIndexType(IndexType.BTREE); }
-  		| "keyword" { cis.setIndexType(IndexType.WORD_INVIX); }  		
+  		("btree" { cis.setIndexType(IndexType.BTREE); }  		
   		| "rtree" { cis.setIndexType(IndexType.RTREE); }
+  		| "keyword" { cis.setIndexType(IndexType.WORD_INVIX); }
+  		| "fuzzy keyword" { cis.setIndexType(IndexType.FUZZY_WORD_INVIX); }
   		| "ngram"
   		  <LEFTPAREN>
   		  (<INTEGER_LITERAL>
@@ -409,7 +410,16 @@
   		      cis.setGramLength(Integer.valueOf(token.image));
   		    }
   		  )
-  		  <RIGHTPAREN>	  		  
+  		  <RIGHTPAREN>
+  		| "fuzzy ngram"
+  		  <LEFTPAREN>
+  		  (<INTEGER_LITERAL>
+  		    {
+  		      cis.setIndexType(IndexType.FUZZY_NGRAM_INVIX);
+  		      cis.setGramLength(Integer.valueOf(token.image));
+  		    }
+  		  )
+  		  <RIGHTPAREN>
 		)
   	";"  	
   	| ";"
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/DatasetConfig.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/DatasetConfig.java
index e860a0a..cbe96f7 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/DatasetConfig.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/DatasetConfig.java
@@ -41,7 +41,9 @@
         BTREE,        
         RTREE,
         WORD_INVIX,
-        NGRAM_INVIX
+        NGRAM_INVIX,
+        FUZZY_WORD_INVIX,
+        FUZZY_NGRAM_INVIX
     }
 
 }
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/dataflow/data/common/ListEditDistanceSearchModifier.java b/asterix-om/src/main/java/edu/uci/ics/asterix/dataflow/data/common/ListEditDistanceSearchModifier.java
index 3d66912..cf3bf9f 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/dataflow/data/common/ListEditDistanceSearchModifier.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/dataflow/data/common/ListEditDistanceSearchModifier.java
@@ -25,7 +25,17 @@
     }
 
     @Override
-    public int getNumPrefixLists(int numQueryTokens) {
-        return numQueryTokens - getOccurrenceThreshold(numQueryTokens) + 1;
+    public int getNumPrefixLists(int occurrenceThreshold, int numInvLists) {
+        return numInvLists - occurrenceThreshold + 1;
+    }
+
+    @Override
+    public short getNumTokensLowerBound(short numQueryTokens) {
+        return (short) (numQueryTokens - edThresh);
+    }
+
+    @Override
+    public short getNumTokensUpperBound(short numQueryTokens) {
+        return (short) (numQueryTokens + edThresh);
     }
 }