Implemented dataflow components for length-partitioned inverted indexes. Added integration test.
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_length_filter@2487 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java
new file mode 100644
index 0000000..05b7a26
--- /dev/null
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java
@@ -0,0 +1,346 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.tests.am.invertedindex;
+
+import java.io.DataOutput;
+import java.io.File;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearchModifierFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexBulkLoadOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexCreateOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexSearchOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search.ConjunctiveSearchModifierFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.DelimitedUTF8StringBinaryTokenizerFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.ITokenFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.UTF8WordTokenFactory;
+import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.hyracks.storage.common.file.ILocalResourceFactoryProvider;
+import edu.uci.ics.hyracks.storage.common.file.TransientLocalResourceFactoryProvider;
+import edu.uci.ics.hyracks.test.support.TestIndexLifecycleManagerProvider;
+import edu.uci.ics.hyracks.test.support.TestStorageManagerComponentHolder;
+import edu.uci.ics.hyracks.test.support.TestStorageManagerInterface;
+import edu.uci.ics.hyracks.tests.integration.AbstractIntegrationTest;
+
+@SuppressWarnings("rawtypes")
+public abstract class AbstractfWordInvertedIndexTest extends AbstractIntegrationTest {
+ static {
+ TestStorageManagerComponentHolder.init(8192, 20, 20);
+ }
+
+ protected static final int MERGE_THRESHOLD = 3;
+
+ protected IStorageManagerInterface storageManager = new TestStorageManagerInterface();
+ protected IIndexLifecycleManagerProvider lcManagerProvider = new TestIndexLifecycleManagerProvider();
+ protected IIndexDataflowHelperFactory btreeDataflowHelperFactory = new BTreeDataflowHelperFactory();
+ protected IIndexDataflowHelperFactory invertedIndexDataflowHelperFactory;
+
+ protected final static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("ddMMyy-hhmmssSS");
+ protected final static String sep = System.getProperty("file.separator");
+ protected final String dateString = simpleDateFormat.format(new Date());
+ protected final String primaryFileName = System.getProperty("java.io.tmpdir") + sep + "primaryBtree" + dateString;
+ protected final String btreeFileName = System.getProperty("java.io.tmpdir") + sep + "invIndexBtree" + dateString;
+
+ protected IFileSplitProvider primaryFileSplitProvider = new ConstantFileSplitProvider(
+ new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(primaryFileName))) });
+ protected IFileSplitProvider btreeFileSplitProvider = new ConstantFileSplitProvider(
+ new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(btreeFileName))) });
+
+ // Primary BTree index.
+ protected int primaryFieldCount = 2;
+ protected ITypeTraits[] primaryTypeTraits = new ITypeTraits[primaryFieldCount];
+ protected int primaryKeyFieldCount = 1;
+ protected IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[primaryKeyFieldCount];
+ protected RecordDescriptor primaryRecDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+ // Inverted index BTree dictionary.
+ protected ITypeTraits[] tokenTypeTraits;
+ protected IBinaryComparatorFactory[] tokenComparatorFactories;
+
+ // Inverted index stuff.
+ protected int invListElementFieldCount = 1;
+ protected ITypeTraits[] invListsTypeTraits = new ITypeTraits[invListElementFieldCount];
+ protected IBinaryComparatorFactory[] invListsComparatorFactories = new IBinaryComparatorFactory[invListElementFieldCount];
+ protected RecordDescriptor invListsRecDesc = new RecordDescriptor(
+ new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE });
+ protected RecordDescriptor tokenizerRecDesc;
+
+ // Tokenizer stuff.
+ protected ITokenFactory tokenFactory = new UTF8WordTokenFactory();
+ protected IBinaryTokenizerFactory tokenizerFactory = new DelimitedUTF8StringBinaryTokenizerFactory(true, false,
+ tokenFactory);
+
+ // Sorting stuff.
+ IBinaryComparatorFactory[] sortComparatorFactories;
+
+ @Before
+ public void setup() throws Exception {
+ prepare();
+
+ // Field declarations and comparators for primary BTree index.
+ primaryTypeTraits[0] = IntegerPointable.TYPE_TRAITS;
+ primaryTypeTraits[1] = UTF8StringPointable.TYPE_TRAITS;
+ primaryComparatorFactories[0] = PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY);
+
+ // Field declarations and comparators for inverted lists.
+ invListsTypeTraits[0] = IntegerPointable.TYPE_TRAITS;
+ invListsComparatorFactories[0] = PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY);
+
+ createPrimaryIndex();
+ loadPrimaryIndex();
+ printPrimaryIndex();
+ createInvertedIndex();
+ loadInvertedIndex();
+ }
+
+ protected abstract void prepare();
+
+ protected abstract boolean addNumTokensKey();
+
+ public void createPrimaryIndex() throws Exception {
+ JobSpecification spec = new JobSpecification();
+ TransientLocalResourceFactoryProvider localResourceFactoryProvider = new TransientLocalResourceFactoryProvider();
+ TreeIndexCreateOperatorDescriptor primaryCreateOp = new TreeIndexCreateOperatorDescriptor(spec, storageManager,
+ lcManagerProvider, primaryFileSplitProvider, primaryTypeTraits, primaryComparatorFactories,
+ btreeDataflowHelperFactory, localResourceFactoryProvider, NoOpOperationCallbackFactory.INSTANCE);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryCreateOp, NC1_ID);
+ spec.addRoot(primaryCreateOp);
+ runTest(spec);
+ }
+
+ public void createInvertedIndex() throws Exception {
+ JobSpecification spec = new JobSpecification();
+ ILocalResourceFactoryProvider localResourceFactoryProvider = new TransientLocalResourceFactoryProvider();
+ LSMInvertedIndexCreateOperatorDescriptor invIndexCreateOp = new LSMInvertedIndexCreateOperatorDescriptor(spec,
+ storageManager, btreeFileSplitProvider, lcManagerProvider, tokenTypeTraits, tokenComparatorFactories,
+ invListsTypeTraits, invListsComparatorFactories, tokenizerFactory, invertedIndexDataflowHelperFactory,
+ localResourceFactoryProvider, NoOpOperationCallbackFactory.INSTANCE);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, invIndexCreateOp, NC1_ID);
+ spec.addRoot(invIndexCreateOp);
+ runTest(spec);
+ }
+
+ @Test
+ public void testConjunctiveSearcher() throws Exception {
+ IInvertedIndexSearchModifierFactory conjunctiveSearchModifierFactory = new ConjunctiveSearchModifierFactory();
+ searchInvertedIndex("of", conjunctiveSearchModifierFactory);
+ searchInvertedIndex("3d", conjunctiveSearchModifierFactory);
+ searchInvertedIndex("of the human", conjunctiveSearchModifierFactory);
+ }
+
+ private IOperatorDescriptor createFileScanOp(JobSpecification spec) {
+ FileSplit[] dblpTitleFileSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
+ "data/cleanednumbereddblptitles.txt"))) };
+ IFileSplitProvider dblpTitleSplitProvider = new ConstantFileSplitProvider(dblpTitleFileSplits);
+ RecordDescriptor dblpTitleRecDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+ FileScanOperatorDescriptor dblpTitleScanner = new FileScanOperatorDescriptor(spec, dblpTitleSplitProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { IntegerParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE }, '|'), dblpTitleRecDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, dblpTitleScanner, NC1_ID);
+ return dblpTitleScanner;
+ }
+
+ private IOperatorDescriptor createPrimaryBulkLoadOp(JobSpecification spec) {
+ int[] fieldPermutation = { 0, 1 };
+ TreeIndexBulkLoadOperatorDescriptor primaryBtreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
+ storageManager, lcManagerProvider, primaryFileSplitProvider, primaryTypeTraits,
+ primaryComparatorFactories, fieldPermutation, 0.7f, true, btreeDataflowHelperFactory,
+ NoOpOperationCallbackFactory.INSTANCE);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeBulkLoad, NC1_ID);
+ return primaryBtreeBulkLoad;
+ }
+
+ private IOperatorDescriptor createScanKeyProviderOp(JobSpecification spec) throws HyracksDataException {
+ // build dummy tuple containing nothing
+ ArrayTupleBuilder tb = new ArrayTupleBuilder(primaryKeyFieldCount * 2);
+ DataOutput dos = tb.getDataOutput();
+ tb.reset();
+ UTF8StringSerializerDeserializer.INSTANCE.serialize("0", dos);
+ tb.addFieldEndOffset();
+ ISerializerDeserializer[] keyRecDescSers = { UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE };
+ RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+ ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(spec,
+ keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, keyProviderOp, NC1_ID);
+ return keyProviderOp;
+ }
+
+ private IOperatorDescriptor createPrimaryScanOp(JobSpecification spec) throws HyracksDataException {
+ int[] lowKeyFields = null; // - infinity
+ int[] highKeyFields = null; // + infinity
+ BTreeSearchOperatorDescriptor primaryBtreeSearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
+ storageManager, lcManagerProvider, primaryFileSplitProvider, primaryTypeTraits,
+ primaryComparatorFactories, lowKeyFields, highKeyFields, true, true, btreeDataflowHelperFactory, false,
+ NoOpOperationCallbackFactory.INSTANCE);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeSearchOp, NC1_ID);
+ return primaryBtreeSearchOp;
+ }
+
+ private void loadPrimaryIndex() throws Exception {
+ JobSpecification spec = new JobSpecification();
+ // Assuming that the data is pre-sorted on the key. No need to sort
+ // before bulk load.
+ IOperatorDescriptor fileScanOp = createFileScanOp(spec);
+ IOperatorDescriptor primaryBulkLoad = createPrimaryBulkLoadOp(spec);
+ spec.connect(new OneToOneConnectorDescriptor(spec), fileScanOp, 0, primaryBulkLoad, 0);
+ spec.addRoot(primaryBulkLoad);
+ runTest(spec);
+ }
+
+ private void printPrimaryIndex() throws Exception {
+ JobSpecification spec = new JobSpecification();
+ IOperatorDescriptor keyProviderOp = createScanKeyProviderOp(spec);
+ IOperatorDescriptor primaryScanOp = createPrimaryScanOp(spec);
+ IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
+ createTempFile().getAbsolutePath()) });
+ IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
+ spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, printer, 0);
+ spec.addRoot(printer);
+ runTest(spec);
+ }
+
+ private IOperatorDescriptor createExternalSortOp(JobSpecification spec, int[] sortFields,
+ RecordDescriptor outputRecDesc) {
+ ExternalSortOperatorDescriptor externalSortOp = new ExternalSortOperatorDescriptor(spec, 1000, sortFields,
+ sortComparatorFactories, outputRecDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, externalSortOp, NC1_ID);
+ return externalSortOp;
+ }
+
+ private IOperatorDescriptor createBinaryTokenizerOp(JobSpecification spec, int docField, int[] keyFields) {
+ BinaryTokenizerOperatorDescriptor binaryTokenizer = new BinaryTokenizerOperatorDescriptor(spec,
+ tokenizerRecDesc, tokenizerFactory, docField, keyFields, addNumTokensKey());
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, binaryTokenizer, NC1_ID);
+ return binaryTokenizer;
+ }
+
+ private IOperatorDescriptor createInvertedIndexBulkLoadOp(JobSpecification spec, int[] fieldPermutation) {
+ LSMInvertedIndexBulkLoadOperatorDescriptor invIndexBulkLoadOp = new LSMInvertedIndexBulkLoadOperatorDescriptor(
+ spec, fieldPermutation, true, storageManager, btreeFileSplitProvider, lcManagerProvider,
+ tokenTypeTraits, tokenComparatorFactories, invListsTypeTraits, invListsComparatorFactories,
+ tokenizerFactory, invertedIndexDataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, invIndexBulkLoadOp, NC1_ID);
+ return invIndexBulkLoadOp;
+ }
+
+ public void loadInvertedIndex() throws Exception {
+ JobSpecification spec = new JobSpecification();
+ IOperatorDescriptor keyProviderOp = createScanKeyProviderOp(spec);
+ IOperatorDescriptor primaryScanOp = createPrimaryScanOp(spec);
+ int docField = 1;
+ int[] keyFields = { 0 };
+ IOperatorDescriptor binaryTokenizerOp = createBinaryTokenizerOp(spec, docField, keyFields);
+ int[] sortFields = new int[sortComparatorFactories.length];
+ int[] fieldPermutation = new int[sortComparatorFactories.length];
+ for (int i = 0; i < sortFields.length; i++) {
+ sortFields[i] = i;
+ fieldPermutation[i] = i;
+ }
+ IOperatorDescriptor externalSortOp = createExternalSortOp(spec, sortFields, tokenizerRecDesc);
+ IOperatorDescriptor invIndexBulkLoadOp = createInvertedIndexBulkLoadOp(spec, fieldPermutation);
+ spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, binaryTokenizerOp, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), binaryTokenizerOp, 0, externalSortOp, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), externalSortOp, 0, invIndexBulkLoadOp, 0);
+ spec.addRoot(invIndexBulkLoadOp);
+ runTest(spec);
+ }
+
+ private IOperatorDescriptor createQueryProviderOp(JobSpecification spec, String queryString)
+ throws HyracksDataException {
+ // Build tuple with exactly one field, which is the query,
+ ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
+ DataOutput dos = tb.getDataOutput();
+ tb.reset();
+ UTF8StringSerializerDeserializer.INSTANCE.serialize(queryString, dos);
+ tb.addFieldEndOffset();
+ ISerializerDeserializer[] querySerde = { UTF8StringSerializerDeserializer.INSTANCE };
+ RecordDescriptor queryRecDesc = new RecordDescriptor(querySerde);
+ ConstantTupleSourceOperatorDescriptor queryProviderOp = new ConstantTupleSourceOperatorDescriptor(spec,
+ queryRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, queryProviderOp, NC1_ID);
+ return queryProviderOp;
+ }
+
+ private IOperatorDescriptor createInvertedIndexSearchOp(JobSpecification spec,
+ IInvertedIndexSearchModifierFactory searchModifierFactory) {
+ LSMInvertedIndexSearchOperatorDescriptor invIndexSearchOp = new LSMInvertedIndexSearchOperatorDescriptor(spec,
+ 0, storageManager, btreeFileSplitProvider, lcManagerProvider, tokenTypeTraits,
+ tokenComparatorFactories, invListsTypeTraits, invListsComparatorFactories,
+ invertedIndexDataflowHelperFactory, tokenizerFactory, searchModifierFactory, invListsRecDesc, false,
+ NoOpOperationCallbackFactory.INSTANCE);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, invIndexSearchOp, NC1_ID);
+ return invIndexSearchOp;
+ }
+
+ public void searchInvertedIndex(String queryString, IInvertedIndexSearchModifierFactory searchModifierFactory)
+ throws Exception {
+ JobSpecification spec = new JobSpecification();
+ IOperatorDescriptor queryProviderOp = createQueryProviderOp(spec, queryString);
+ IOperatorDescriptor invIndexSearchOp = createInvertedIndexSearchOp(spec, searchModifierFactory);
+ IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
+ createTempFile().getAbsolutePath()) });
+ IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
+ spec.connect(new OneToOneConnectorDescriptor(spec), queryProviderOp, 0, invIndexSearchOp, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), invIndexSearchOp, 0, printer, 0);
+ spec.addRoot(printer);
+ runTest(spec);
+ }
+}
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/BinaryTokenizerOperatorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/BinaryTokenizerOperatorTest.java
index 76155ae..47480da 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/BinaryTokenizerOperatorTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/BinaryTokenizerOperatorTest.java
@@ -11,6 +11,7 @@
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.ShortSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
import edu.uci.ics.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
@@ -33,6 +34,15 @@
@Test
public void tokenizerTest() throws Exception {
+ test(false);
+ }
+
+ @Test
+ public void tokenizerWithNumTokensTest() throws Exception {
+ test(true);
+ }
+
+ private void test(boolean addNumTokensKey) throws Exception {
JobSpecification spec = new JobSpecification();
FileSplit[] dblpTitleFileSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
@@ -46,16 +56,22 @@
UTF8StringParserFactory.INSTANCE }, '|'), dblpTitleRecDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, dblpTitleScanner, NC1_ID);
- RecordDescriptor tokenizerRecDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+ RecordDescriptor tokenizerRecDesc;
+ if (!addNumTokensKey) {
+ tokenizerRecDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+ } else {
+ tokenizerRecDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, ShortSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE });
+ }
ITokenFactory tokenFactory = new UTF8WordTokenFactory();
IBinaryTokenizerFactory tokenizerFactory = new DelimitedUTF8StringBinaryTokenizerFactory(true, false,
tokenFactory);
- int[] tokenFields = { 1 };
int[] keyFields = { 0 };
BinaryTokenizerOperatorDescriptor binaryTokenizer = new BinaryTokenizerOperatorDescriptor(spec,
- tokenizerRecDesc, tokenizerFactory, tokenFields, keyFields);
+ tokenizerRecDesc, tokenizerFactory, 1, keyFields, addNumTokensKey);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, binaryTokenizer, NC1_ID);
IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/InvertedIndexOperatorsTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/InvertedIndexOperatorsTest.java
deleted file mode 100644
index d108289..0000000
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/InvertedIndexOperatorsTest.java
+++ /dev/null
@@ -1,73 +0,0 @@
-package edu.uci.ics.hyracks.tests.am.invertedindex;
-
-import java.io.File;
-
-import org.junit.Test;
-
-import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
-import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
-import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
-import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.DelimitedUTF8StringBinaryTokenizerFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.ITokenFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.UTF8WordTokenFactory;
-import edu.uci.ics.hyracks.tests.integration.AbstractIntegrationTest;
-
-public class InvertedIndexOperatorsTest extends AbstractIntegrationTest {
-
- @Test
- public void tokenizerTest() throws Exception {
- JobSpecification spec = new JobSpecification();
-
- FileSplit[] dblpTitleFileSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
- "data/cleanednumbereddblptitles.txt"))) };
- IFileSplitProvider dblpTitleSplitProvider = new ConstantFileSplitProvider(dblpTitleFileSplits);
- RecordDescriptor dblpTitleRecDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
-
- FileScanOperatorDescriptor dblpTitleScanner = new FileScanOperatorDescriptor(spec, dblpTitleSplitProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[] { IntegerParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE }, '|'), dblpTitleRecDesc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, dblpTitleScanner, NC1_ID);
-
- RecordDescriptor tokenizerRecDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
-
- ITokenFactory tokenFactory = new UTF8WordTokenFactory();
- IBinaryTokenizerFactory tokenizerFactory = new DelimitedUTF8StringBinaryTokenizerFactory(true, false,
- tokenFactory);
- int[] tokenFields = { 1 };
- int[] projFields = { 0 };
- BinaryTokenizerOperatorDescriptor binaryTokenizer = new BinaryTokenizerOperatorDescriptor(spec,
- tokenizerRecDesc, tokenizerFactory, tokenFields, projFields);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, binaryTokenizer, NC1_ID);
-
- IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
- createTempFile().getAbsolutePath()) });
- IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
-
- spec.connect(new OneToOneConnectorDescriptor(spec), dblpTitleScanner, 0, binaryTokenizer, 0);
-
- spec.connect(new OneToOneConnectorDescriptor(spec), binaryTokenizer, 0, printer, 0);
-
- spec.addRoot(printer);
- runTest(spec);
- }
-}
\ No newline at end of file
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/PartitionedWordInvertedIndexTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/PartitionedWordInvertedIndexTest.java
new file mode 100644
index 0000000..1bd6878
--- /dev/null
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/PartitionedWordInvertedIndexTest.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.tests.am.invertedindex;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
+import edu.uci.ics.hyracks.data.std.primitive.ShortPointable;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.ShortSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.FlushControllerProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ImmediateSchedulerProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.RefCountingOperationTrackerFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.PartitionedLSMInvertedIndexDataflowHelperFactory;
+
+public class PartitionedWordInvertedIndexTest extends AbstractfWordInvertedIndexTest {
+
+ @Override
+ protected void prepare() {
+ // Field declarations and comparators for tokens.
+ tokenTypeTraits = new ITypeTraits[] { UTF8StringPointable.TYPE_TRAITS, ShortPointable.TYPE_TRAITS };
+ tokenComparatorFactories = new IBinaryComparatorFactory[] {
+ PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+ PointableBinaryComparatorFactory.of(ShortPointable.FACTORY) };
+
+ tokenizerRecDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, ShortSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE });
+
+ sortComparatorFactories = new IBinaryComparatorFactory[] {
+ PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+ PointableBinaryComparatorFactory.of(ShortPointable.FACTORY),
+ PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) };
+
+ invertedIndexDataflowHelperFactory = new PartitionedLSMInvertedIndexDataflowHelperFactory(
+ new FlushControllerProvider(), new ConstantMergePolicyProvider(ImmediateSchedulerProvider.INSTANCE,
+ MERGE_THRESHOLD), RefCountingOperationTrackerFactory.INSTANCE,
+ ImmediateSchedulerProvider.INSTANCE);
+ }
+
+ @Override
+ protected boolean addNumTokensKey() {
+ return true;
+ }
+}
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/WordInvertedIndexTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/WordInvertedIndexTest.java
index a63cc07..69c7c90 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/WordInvertedIndexTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/WordInvertedIndexTest.java
@@ -15,332 +15,44 @@
package edu.uci.ics.hyracks.tests.am.invertedindex;
-import java.io.DataOutput;
-import java.io.File;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
-import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
-import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
-import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.FlushControllerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ImmediateSchedulerProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.RefCountingOperationTrackerFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearchModifierFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexBulkLoadOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexCreateOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexSearchOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search.ConjunctiveSearchModifierFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.DelimitedUTF8StringBinaryTokenizerFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.ITokenFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.UTF8WordTokenFactory;
-import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
-import edu.uci.ics.hyracks.storage.common.file.ILocalResourceFactoryProvider;
-import edu.uci.ics.hyracks.storage.common.file.TransientLocalResourceFactoryProvider;
-import edu.uci.ics.hyracks.test.support.TestIndexLifecycleManagerProvider;
-import edu.uci.ics.hyracks.test.support.TestStorageManagerComponentHolder;
-import edu.uci.ics.hyracks.test.support.TestStorageManagerInterface;
-import edu.uci.ics.hyracks.tests.integration.AbstractIntegrationTest;
-@SuppressWarnings("rawtypes")
-public class WordInvertedIndexTest extends AbstractIntegrationTest {
- static {
- TestStorageManagerComponentHolder.init(8192, 20, 20);
- }
+public class WordInvertedIndexTest extends AbstractfWordInvertedIndexTest {
- private static final int MERGE_THRESHOLD = 3;
-
- private IStorageManagerInterface storageManager = new TestStorageManagerInterface();
- private IIndexLifecycleManagerProvider lcManagerProvider = new TestIndexLifecycleManagerProvider();
- private IIndexDataflowHelperFactory btreeDataflowHelperFactory = new BTreeDataflowHelperFactory();
- private IIndexDataflowHelperFactory invertedIndexDataflowHelperFactory = new LSMInvertedIndexDataflowHelperFactory(
- new FlushControllerProvider(), new ConstantMergePolicyProvider(ImmediateSchedulerProvider.INSTANCE,
- MERGE_THRESHOLD), RefCountingOperationTrackerFactory.INSTANCE, ImmediateSchedulerProvider.INSTANCE);
-
- private final static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("ddMMyy-hhmmssSS");
- private final static String sep = System.getProperty("file.separator");
- private final String dateString = simpleDateFormat.format(new Date());
- private final String primaryFileName = System.getProperty("java.io.tmpdir") + sep + "primaryBtree" + dateString;
- private final String btreeFileName = System.getProperty("java.io.tmpdir") + sep + "invIndexBtree" + dateString;
-
- private IFileSplitProvider primaryFileSplitProvider = new ConstantFileSplitProvider(
- new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(primaryFileName))) });
- private IFileSplitProvider btreeFileSplitProvider = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(
- NC1_ID, new FileReference(new File(btreeFileName))) });
-
- // Primary BTree index.
- private int primaryFieldCount = 2;
- private ITypeTraits[] primaryTypeTraits = new ITypeTraits[primaryFieldCount];
- private int primaryKeyFieldCount = 1;
- private IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[primaryKeyFieldCount];
- private RecordDescriptor primaryRecDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
-
- // Inverted index BTree dictionary.
- private ITypeTraits[] tokenTypeTraits = new ITypeTraits[1];
- private IBinaryComparatorFactory[] tokenComparatorFactories = new IBinaryComparatorFactory[1];
-
- // Inverted index stuff.
- private int invListElementFieldCount = 1;
- private ITypeTraits[] invListsTypeTraits = new ITypeTraits[invListElementFieldCount];
- private IBinaryComparatorFactory[] invListsComparatorFactories = new IBinaryComparatorFactory[invListElementFieldCount];
- private RecordDescriptor tokenizerRecDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
- private RecordDescriptor invListsRecDesc = new RecordDescriptor(
- new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE });
-
- // Tokenizer stuff.
- private ITokenFactory tokenFactory = new UTF8WordTokenFactory();
- private IBinaryTokenizerFactory tokenizerFactory = new DelimitedUTF8StringBinaryTokenizerFactory(true, false,
- tokenFactory);
-
- @Before
- public void setup() throws Exception {
- // Field declarations and comparators for primary BTree index.
- primaryTypeTraits[0] = IntegerPointable.TYPE_TRAITS;
- primaryTypeTraits[1] = UTF8StringPointable.TYPE_TRAITS;
- primaryComparatorFactories[0] = PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY);
-
+ @Override
+ protected void prepare() {
// Field declarations and comparators for tokens.
- tokenTypeTraits[0] = UTF8StringPointable.TYPE_TRAITS;
- tokenComparatorFactories[0] = PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY);
+ tokenTypeTraits = new ITypeTraits[] { UTF8StringPointable.TYPE_TRAITS };
+ tokenComparatorFactories = new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+ .of(UTF8StringPointable.FACTORY) };
- // Field declarations and comparators for inverted lists.
- invListsTypeTraits[0] = IntegerPointable.TYPE_TRAITS;
- invListsComparatorFactories[0] = PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY);
+ tokenizerRecDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
- createPrimaryIndex();
- loadPrimaryIndex();
- printPrimaryIndex();
- createInvertedIndex();
- loadInvertedIndex();
+ sortComparatorFactories = new IBinaryComparatorFactory[] {
+ PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+ PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) };
+
+ invertedIndexDataflowHelperFactory = new LSMInvertedIndexDataflowHelperFactory(new FlushControllerProvider(),
+ new ConstantMergePolicyProvider(ImmediateSchedulerProvider.INSTANCE, MERGE_THRESHOLD),
+ RefCountingOperationTrackerFactory.INSTANCE, ImmediateSchedulerProvider.INSTANCE);
}
- public void createPrimaryIndex() throws Exception {
- JobSpecification spec = new JobSpecification();
- TransientLocalResourceFactoryProvider localResourceFactoryProvider = new TransientLocalResourceFactoryProvider();
- TreeIndexCreateOperatorDescriptor primaryCreateOp = new TreeIndexCreateOperatorDescriptor(spec, storageManager,
- lcManagerProvider, primaryFileSplitProvider, primaryTypeTraits, primaryComparatorFactories,
- btreeDataflowHelperFactory, localResourceFactoryProvider, NoOpOperationCallbackFactory.INSTANCE);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryCreateOp, NC1_ID);
- spec.addRoot(primaryCreateOp);
- runTest(spec);
- }
-
- public void createInvertedIndex() throws Exception {
- JobSpecification spec = new JobSpecification();
- ILocalResourceFactoryProvider localResourceFactoryProvider = new TransientLocalResourceFactoryProvider();
- LSMInvertedIndexCreateOperatorDescriptor invIndexCreateOp = new LSMInvertedIndexCreateOperatorDescriptor(spec,
- storageManager, btreeFileSplitProvider, lcManagerProvider, tokenTypeTraits, tokenComparatorFactories,
- invListsTypeTraits, invListsComparatorFactories, tokenizerFactory, invertedIndexDataflowHelperFactory,
- localResourceFactoryProvider, NoOpOperationCallbackFactory.INSTANCE);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, invIndexCreateOp, NC1_ID);
- spec.addRoot(invIndexCreateOp);
- runTest(spec);
- }
-
- @Test
- public void testConjunctiveSearcher() throws Exception {
- IInvertedIndexSearchModifierFactory conjunctiveSearchModifierFactory = new ConjunctiveSearchModifierFactory();
- searchInvertedIndex("of", conjunctiveSearchModifierFactory);
- searchInvertedIndex("3d", conjunctiveSearchModifierFactory);
- searchInvertedIndex("of the human", conjunctiveSearchModifierFactory);
- }
-
- private IOperatorDescriptor createFileScanOp(JobSpecification spec) {
- FileSplit[] dblpTitleFileSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
- "data/cleanednumbereddblptitles.txt"))) };
- IFileSplitProvider dblpTitleSplitProvider = new ConstantFileSplitProvider(dblpTitleFileSplits);
- RecordDescriptor dblpTitleRecDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
- FileScanOperatorDescriptor dblpTitleScanner = new FileScanOperatorDescriptor(spec, dblpTitleSplitProvider,
- new DelimitedDataTupleParserFactory(new IValueParserFactory[] { IntegerParserFactory.INSTANCE,
- UTF8StringParserFactory.INSTANCE }, '|'), dblpTitleRecDesc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, dblpTitleScanner, NC1_ID);
- return dblpTitleScanner;
- }
-
- private IOperatorDescriptor createPrimaryBulkLoadOp(JobSpecification spec) {
- int[] fieldPermutation = { 0, 1 };
- TreeIndexBulkLoadOperatorDescriptor primaryBtreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
- storageManager, lcManagerProvider, primaryFileSplitProvider, primaryTypeTraits,
- primaryComparatorFactories, fieldPermutation, 0.7f, true, btreeDataflowHelperFactory,
- NoOpOperationCallbackFactory.INSTANCE);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeBulkLoad, NC1_ID);
- return primaryBtreeBulkLoad;
- }
-
- private IOperatorDescriptor createScanKeyProviderOp(JobSpecification spec) throws HyracksDataException {
- // build dummy tuple containing nothing
- ArrayTupleBuilder tb = new ArrayTupleBuilder(primaryKeyFieldCount * 2);
- DataOutput dos = tb.getDataOutput();
- tb.reset();
- UTF8StringSerializerDeserializer.INSTANCE.serialize("0", dos);
- tb.addFieldEndOffset();
- ISerializerDeserializer[] keyRecDescSers = { UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE };
- RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
- ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(spec,
- keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, keyProviderOp, NC1_ID);
- return keyProviderOp;
- }
-
- private IOperatorDescriptor createPrimaryScanOp(JobSpecification spec) throws HyracksDataException {
- int[] lowKeyFields = null; // - infinity
- int[] highKeyFields = null; // + infinity
- BTreeSearchOperatorDescriptor primaryBtreeSearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
- storageManager, lcManagerProvider, primaryFileSplitProvider, primaryTypeTraits,
- primaryComparatorFactories, lowKeyFields, highKeyFields, true, true, btreeDataflowHelperFactory, false,
- NoOpOperationCallbackFactory.INSTANCE);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeSearchOp, NC1_ID);
- return primaryBtreeSearchOp;
- }
-
- private void loadPrimaryIndex() throws Exception {
- JobSpecification spec = new JobSpecification();
- // Assuming that the data is pre-sorted on the key. No need to sort
- // before bulk load.
- IOperatorDescriptor fileScanOp = createFileScanOp(spec);
- IOperatorDescriptor primaryBulkLoad = createPrimaryBulkLoadOp(spec);
- spec.connect(new OneToOneConnectorDescriptor(spec), fileScanOp, 0, primaryBulkLoad, 0);
- spec.addRoot(primaryBulkLoad);
- runTest(spec);
- }
-
- private void printPrimaryIndex() throws Exception {
- JobSpecification spec = new JobSpecification();
- IOperatorDescriptor keyProviderOp = createScanKeyProviderOp(spec);
- IOperatorDescriptor primaryScanOp = createPrimaryScanOp(spec);
- IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
- createTempFile().getAbsolutePath()) });
- IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
- spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
- spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, printer, 0);
- spec.addRoot(printer);
- runTest(spec);
- }
-
- private IOperatorDescriptor createExternalSortOp(JobSpecification spec, int[] sortFields,
- RecordDescriptor outputRecDesc) {
- ExternalSortOperatorDescriptor externalSortOp = new ExternalSortOperatorDescriptor(spec, 1000, sortFields,
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
- PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) }, outputRecDesc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, externalSortOp, NC1_ID);
- return externalSortOp;
- }
-
- private IOperatorDescriptor createBinaryTokenizerOp(JobSpecification spec, int[] tokenFields, int[] keyFields) {
- BinaryTokenizerOperatorDescriptor binaryTokenizer = new BinaryTokenizerOperatorDescriptor(spec,
- tokenizerRecDesc, tokenizerFactory, tokenFields, keyFields);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, binaryTokenizer, NC1_ID);
- return binaryTokenizer;
- }
-
- private IOperatorDescriptor createInvertedIndexBulkLoadOp(JobSpecification spec, int[] fieldPermutation) {
- LSMInvertedIndexBulkLoadOperatorDescriptor invIndexBulkLoadOp = new LSMInvertedIndexBulkLoadOperatorDescriptor(
- spec, fieldPermutation, true, storageManager, btreeFileSplitProvider, lcManagerProvider,
- tokenTypeTraits, tokenComparatorFactories, invListsTypeTraits, invListsComparatorFactories,
- tokenizerFactory, invertedIndexDataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, invIndexBulkLoadOp, NC1_ID);
- return invIndexBulkLoadOp;
- }
-
- public void loadInvertedIndex() throws Exception {
- JobSpecification spec = new JobSpecification();
- IOperatorDescriptor keyProviderOp = createScanKeyProviderOp(spec);
- IOperatorDescriptor primaryScanOp = createPrimaryScanOp(spec);
- int[] tokenFields = { 1 };
- int[] keyFields = { 0 };
- IOperatorDescriptor binaryTokenizerOp = createBinaryTokenizerOp(spec, tokenFields, keyFields);
- int[] sortFields = { 0, 1 };
- IOperatorDescriptor externalSortOp = createExternalSortOp(spec, sortFields, tokenizerRecDesc);
- int[] fieldPermutation = { 0, 1 };
- IOperatorDescriptor invIndexBulkLoadOp = createInvertedIndexBulkLoadOp(spec, fieldPermutation);
- spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
- spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, binaryTokenizerOp, 0);
- spec.connect(new OneToOneConnectorDescriptor(spec), binaryTokenizerOp, 0, externalSortOp, 0);
- spec.connect(new OneToOneConnectorDescriptor(spec), externalSortOp, 0, invIndexBulkLoadOp, 0);
- spec.addRoot(invIndexBulkLoadOp);
- runTest(spec);
- }
-
- private IOperatorDescriptor createQueryProviderOp(JobSpecification spec, String queryString)
- throws HyracksDataException {
- // Build tuple with exactly one field, which is the query,
- ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
- DataOutput dos = tb.getDataOutput();
- tb.reset();
- UTF8StringSerializerDeserializer.INSTANCE.serialize(queryString, dos);
- tb.addFieldEndOffset();
- ISerializerDeserializer[] querySerde = { UTF8StringSerializerDeserializer.INSTANCE };
- RecordDescriptor queryRecDesc = new RecordDescriptor(querySerde);
- ConstantTupleSourceOperatorDescriptor queryProviderOp = new ConstantTupleSourceOperatorDescriptor(spec,
- queryRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, queryProviderOp, NC1_ID);
- return queryProviderOp;
- }
-
- private IOperatorDescriptor createInvertedIndexSearchOp(JobSpecification spec,
- IInvertedIndexSearchModifierFactory searchModifierFactory) {
- LSMInvertedIndexSearchOperatorDescriptor invIndexSearchOp = new LSMInvertedIndexSearchOperatorDescriptor(spec,
- 0, storageManager, btreeFileSplitProvider, lcManagerProvider, tokenTypeTraits,
- tokenComparatorFactories, invListsTypeTraits, invListsComparatorFactories,
- invertedIndexDataflowHelperFactory, tokenizerFactory, searchModifierFactory, invListsRecDesc, false,
- NoOpOperationCallbackFactory.INSTANCE);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, invIndexSearchOp, NC1_ID);
- return invIndexSearchOp;
- }
-
- public void searchInvertedIndex(String queryString, IInvertedIndexSearchModifierFactory searchModifierFactory)
- throws Exception {
- JobSpecification spec = new JobSpecification();
- IOperatorDescriptor queryProviderOp = createQueryProviderOp(spec, queryString);
- IOperatorDescriptor invIndexSearchOp = createInvertedIndexSearchOp(spec, searchModifierFactory);
- IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
- createTempFile().getAbsolutePath()) });
- IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
- spec.connect(new OneToOneConnectorDescriptor(spec), queryProviderOp, 0, invIndexSearchOp, 0);
- spec.connect(new OneToOneConnectorDescriptor(spec), invIndexSearchOp, 0, printer, 0);
- spec.addRoot(printer);
- runTest(spec);
+ @Override
+ protected boolean addNumTokensKey() {
+ return false;
}
}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorDescriptor.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorDescriptor.java
index 4706eed..84152d5 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorDescriptor.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorDescriptor.java
@@ -29,19 +29,23 @@
private static final long serialVersionUID = 1L;
private final IBinaryTokenizerFactory tokenizerFactory;
- // Fields that will be tokenized
- private final int[] tokenFields;
+ // Field that will be tokenized.
+ private final int docField;
// operator will append these key fields to each token, e.g., as
// payload for an inverted list
// WARNING: too many key fields can cause significant data blowup.
private final int[] keyFields;
+ // Indicates whether the first key field should be the number of tokens in the tokenized set of the document.
+ // This value is used in partitioned inverted indexes, for example.
+ private final boolean addNumTokensKey;
public BinaryTokenizerOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor recDesc,
- IBinaryTokenizerFactory tokenizerFactory, int[] tokenFields, int[] keyFields) {
+ IBinaryTokenizerFactory tokenizerFactory, int docField, int[] keyFields, boolean addNumTokensKey) {
super(spec, 1, 1);
this.tokenizerFactory = tokenizerFactory;
- this.tokenFields = tokenFields;
+ this.docField = docField;
this.keyFields = keyFields;
+ this.addNumTokensKey = addNumTokensKey;
recordDescriptors[0] = recDesc;
}
@@ -49,6 +53,7 @@
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
return new BinaryTokenizerOperatorNodePushable(ctx, recordDescProvider.getInputRecordDescriptor(
- getActivityId(), 0), recordDescriptors[0], tokenizerFactory.createTokenizer(), tokenFields, keyFields);
+ getActivityId(), 0), recordDescriptors[0], tokenizerFactory.createTokenizer(), docField, keyFields,
+ addNumTokensKey);
}
}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
index 8f2b155..9495516 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2009-2010 by The Regents of the University of California
+ * Copyright 2009-2012 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
@@ -34,8 +34,9 @@
private final IHyracksTaskContext ctx;
private final IBinaryTokenizer tokenizer;
- private final int[] tokenFields;
- private final int[] projFields;
+ private final int docField;
+ private final int[] keyFields;
+ private final boolean addNumTokensKey;
private final RecordDescriptor inputRecDesc;
private final RecordDescriptor outputRecDesc;
@@ -46,11 +47,13 @@
private ByteBuffer writeBuffer;
public BinaryTokenizerOperatorNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc,
- RecordDescriptor outputRecDesc, IBinaryTokenizer tokenizer, int[] tokenFields, int[] projFields) {
+ RecordDescriptor outputRecDesc, IBinaryTokenizer tokenizer, int docField, int[] keyFields,
+ boolean addNumTokensKey) {
this.ctx = ctx;
this.tokenizer = tokenizer;
- this.tokenFields = tokenFields;
- this.projFields = projFields;
+ this.docField = docField;
+ this.keyFields = keyFields;
+ this.addNumTokensKey = addNumTokensKey;
this.inputRecDesc = inputRecDesc;
this.outputRecDesc = outputRecDesc;
}
@@ -69,41 +72,51 @@
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
accessor.reset(buffer);
-
int tupleCount = accessor.getTupleCount();
for (int i = 0; i < tupleCount; i++) {
-
- for (int j = 0; j < tokenFields.length; j++) {
-
+ short numTokens = 0;
+ if (addNumTokensKey) {
+ // Run through the tokens to get the total number of tokens.
tokenizer.reset(
accessor.getBuffer().array(),
accessor.getTupleStartOffset(i) + accessor.getFieldSlotsLength()
- + accessor.getFieldStartOffset(i, tokenFields[j]),
- accessor.getFieldLength(i, tokenFields[j]));
-
+ + accessor.getFieldStartOffset(i, docField), accessor.getFieldLength(i, docField));
while (tokenizer.hasNext()) {
tokenizer.next();
+ numTokens++;
+ }
+ }
- builder.reset();
- try {
- IToken token = tokenizer.getToken();
- token.serializeToken(builderDos);
+ tokenizer.reset(
+ accessor.getBuffer().array(),
+ accessor.getTupleStartOffset(i) + accessor.getFieldSlotsLength()
+ + accessor.getFieldStartOffset(i, docField), accessor.getFieldLength(i, docField));
+ while (tokenizer.hasNext()) {
+ tokenizer.next();
+
+ builder.reset();
+ try {
+ IToken token = tokenizer.getToken();
+ token.serializeToken(builderDos);
+ builder.addFieldEndOffset();
+ // Add number of tokens if requested.
+ if (addNumTokensKey) {
+ builder.getDataOutput().writeShort(numTokens);
builder.addFieldEndOffset();
- } catch (IOException e) {
- throw new HyracksDataException(e.getMessage());
}
+ } catch (IOException e) {
+ throw new HyracksDataException(e.getMessage());
+ }
- for (int k = 0; k < projFields.length; k++) {
- builder.addField(accessor, i, projFields[k]);
- }
+ for (int k = 0; k < keyFields.length; k++) {
+ builder.addField(accessor, i, keyFields[k]);
+ }
+ if (!appender.append(builder.getFieldEndOffsets(), builder.getByteArray(), 0, builder.getSize())) {
+ FrameUtils.flushFrame(writeBuffer, writer);
+ appender.reset(writeBuffer, true);
if (!appender.append(builder.getFieldEndOffsets(), builder.getByteArray(), 0, builder.getSize())) {
- FrameUtils.flushFrame(writeBuffer, writer);
- appender.reset(writeBuffer, true);
- if (!appender
- .append(builder.getFieldEndOffsets(), builder.getByteArray(), 0, builder.getSize())) {
- throw new IllegalStateException();
- }
+ throw new IllegalStateException();
}
}
}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/PartitionedLSMInvertedIndexDataflowHelper.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/PartitionedLSMInvertedIndexDataflowHelper.java
new file mode 100644
index 0000000..ea07b24
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/PartitionedLSMInvertedIndexDataflowHelper.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.api.IInMemoryFreePageManager;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.frames.LIFOMetaDataFrameFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.IInMemoryBufferCache;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushController;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.AbstractLSMIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.DualIndexInMemoryBufferCache;
+import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.DualIndexInMemoryFreePageManager;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls.PartitionedLSMInvertedIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.InvertedIndexUtils;
+import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+
+public final class PartitionedLSMInvertedIndexDataflowHelper extends AbstractLSMIndexDataflowHelper {
+
+ public PartitionedLSMInvertedIndexDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
+ int partition, ILSMFlushController flushController, ILSMMergePolicy mergePolicy,
+ ILSMOperationTrackerFactory opTrackerFactory, ILSMIOOperationScheduler ioScheduler) {
+ this(opDesc, ctx, partition, DEFAULT_MEM_PAGE_SIZE, DEFAULT_MEM_NUM_PAGES, flushController, mergePolicy,
+ opTrackerFactory, ioScheduler);
+ }
+
+ public PartitionedLSMInvertedIndexDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
+ int partition, int memPageSize, int memNumPages, ILSMFlushController flushController,
+ ILSMMergePolicy mergePolicy, ILSMOperationTrackerFactory opTrackerFactory,
+ ILSMIOOperationScheduler ioScheduler) {
+ super(opDesc, ctx, partition, memPageSize, memNumPages, flushController, mergePolicy, opTrackerFactory,
+ ioScheduler);
+ }
+
+ @Override
+ public IIndex createIndexInstance() throws HyracksDataException {
+ IInvertedIndexOperatorDescriptor invIndexOpDesc = (IInvertedIndexOperatorDescriptor) opDesc;
+ try {
+ ITreeIndexMetaDataFrameFactory metaDataFrameFactory = new LIFOMetaDataFrameFactory();
+ IInMemoryBufferCache memBufferCache = new DualIndexInMemoryBufferCache(new HeapBufferAllocator(),
+ memPageSize, memNumPages);
+ IInMemoryFreePageManager memFreePageManager = new DualIndexInMemoryFreePageManager(memNumPages,
+ metaDataFrameFactory);
+ IBufferCache diskBufferCache = opDesc.getStorageManager().getBufferCache(ctx);
+ IFileMapProvider diskFileMapProvider = opDesc.getStorageManager().getFileMapProvider(ctx);
+ PartitionedLSMInvertedIndex invIndex = InvertedIndexUtils.createPartitionedLSMInvertedIndex(memBufferCache,
+ memFreePageManager, diskFileMapProvider, invIndexOpDesc.getInvListsTypeTraits(),
+ invIndexOpDesc.getInvListsComparatorFactories(), invIndexOpDesc.getTokenTypeTraits(),
+ invIndexOpDesc.getTokenComparatorFactories(), invIndexOpDesc.getTokenizerFactory(),
+ diskBufferCache, ctx.getIOManager(), file.getFile().getPath(), flushController, mergePolicy,
+ opTrackerFactory, ioScheduler);
+ return invIndex;
+ } catch (IndexException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/PartitionedLSMInvertedIndexDataflowHelperFactory.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/PartitionedLSMInvertedIndexDataflowHelperFactory.java
new file mode 100644
index 0000000..34367db
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/PartitionedLSMInvertedIndexDataflowHelperFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushControllerProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.AbstractLSMIndexDataflowHelperFactory;
+
+public class PartitionedLSMInvertedIndexDataflowHelperFactory extends AbstractLSMIndexDataflowHelperFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ public PartitionedLSMInvertedIndexDataflowHelperFactory(ILSMFlushControllerProvider flushControllerProvider,
+ ILSMMergePolicyProvider mergePolicyProvider, ILSMOperationTrackerFactory opTrackerProvider,
+ ILSMIOOperationSchedulerProvider ioSchedulerProvider) {
+ super(flushControllerProvider, mergePolicyProvider, opTrackerProvider, ioSchedulerProvider);
+ }
+
+ @Override
+ public IndexDataflowHelper createIndexDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
+ int partition) {
+ return new PartitionedLSMInvertedIndexDataflowHelper(opDesc, ctx, partition,
+ flushControllerProvider.getFlushController(ctx), mergePolicyProvider.getMergePolicy(ctx),
+ opTrackerFactory, ioSchedulerProvider.getIOScheduler(ctx));
+ }
+
+}