First working version of inverted index search.

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_indexes@429 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-invertedindex/pom.xml b/hyracks-storage-am-invertedindex/pom.xml
index 9a464bf..6e70ad4 100644
--- a/hyracks-storage-am-invertedindex/pom.xml
+++ b/hyracks-storage-am-invertedindex/pom.xml
@@ -62,7 +62,7 @@
   	<dependency>
   		<groupId>edu.uci.ics.fuzzyjoin</groupId>
   		<artifactId>fuzzyjoin-core</artifactId>
-  		<version>0.0.2-SNAPSHOT</version>
+  		<version>0.0.3-SNAPSHOT</version>
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>
diff --git a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/api/IInvertedListCursor.java b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/api/IInvertedListCursor.java
index 73ca734..f03bdfc 100644
--- a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/api/IInvertedListCursor.java
+++ b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/api/IInvertedListCursor.java
@@ -1,9 +1,9 @@
 package edu.uci.ics.hyracks.storage.am.invertedindex.api;
 
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
 
 public interface IInvertedListCursor extends Comparable<IInvertedListCursor> {
     void reset(int startPageId, int endPageId, int startOff, int numElements);
@@ -25,7 +25,7 @@
     
     // jump to a specific element
     void positionCursor(int elementIx);
-    boolean containsKey(byte[] searchKey, int keyStartOff, int keyLength, IBinaryComparator cmp);      
+    boolean containsKey(ITupleReference searchTuple, MultiComparator invListCmp);      
             
     // for debugging
     String printInvList(ISerializerDeserializer[] serdes) throws HyracksDataException;    
diff --git a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/impls/FixedSizeElementInvertedListCursor.java b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/impls/FixedSizeElementInvertedListCursor.java
index 3d28415..9ed0ee1 100644
--- a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/impls/FixedSizeElementInvertedListCursor.java
+++ b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/impls/FixedSizeElementInvertedListCursor.java
@@ -4,11 +4,11 @@
 import java.io.DataInput;
 import java.io.DataInputStream;
 
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.dataflow.value.ITypeTrait;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
 import edu.uci.ics.hyracks.storage.am.invertedindex.api.IInvertedListCursor;
 import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
 import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
@@ -115,7 +115,7 @@
     }
     
     @Override
-    public boolean containsKey(byte[] searchKey, int keyStartOff, int keyLength, IBinaryComparator comparator) {
+    public boolean containsKey(ITupleReference searchTuple, MultiComparator invListCmp) {
         int mid;
         int begin = 0;
         int end = numElements - 1;
@@ -123,7 +123,7 @@
         while (begin <= end) {
             mid = (begin + end) / 2;
             positionCursor(mid);
-            int cmp = comparator.compare(searchKey, keyStartOff, keyLength, getPage().getBuffer().array(), getOffset(), elementSize);
+            int cmp = invListCmp.compare(searchTuple, tuple);
             if (cmp < 0) {
                 end = mid - 1;
             } else if (cmp > 0) {
diff --git a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/impls/FixedSizeFrameTupleAccessor.java b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/impls/FixedSizeFrameTupleAccessor.java
new file mode 100644
index 0000000..1c3c9e4
--- /dev/null
+++ b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/impls/FixedSizeFrameTupleAccessor.java
@@ -0,0 +1,83 @@
+package edu.uci.ics.hyracks.storage.am.invertedindex.impls;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.FrameHelper;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTrait;
+
+public class FixedSizeFrameTupleAccessor implements IFrameTupleAccessor {
+
+    private final int frameSize;
+    private ByteBuffer buffer;
+    
+    private final ITypeTrait[] fields;
+    private final int[] fieldStartOffsets;
+    private final int tupleSize;
+    
+    public FixedSizeFrameTupleAccessor(int frameSize, ITypeTrait[] fields) {
+        this.frameSize = frameSize;
+        this.fields = fields;
+        this.fieldStartOffsets = new int[fields.length];
+        this.fieldStartOffsets[0] = 0;
+        for(int i = 1; i < fields.length; i++) {
+            fieldStartOffsets[i] = fieldStartOffsets[i-1] + fields[i-1].getStaticallyKnownDataLength();
+        }
+        
+        int tmp = 0;
+        for(int i = 0; i < fields.length; i++) {
+            tmp += fields[i].getStaticallyKnownDataLength();
+        }
+        tupleSize = tmp;
+    }
+    
+    @Override
+    public ByteBuffer getBuffer() {
+        return buffer;
+    }
+
+    @Override
+    public int getFieldCount() {
+        return fields.length;
+    }
+
+    @Override
+    public int getFieldEndOffset(int tupleIndex, int fIdx) {
+        return getTupleStartOffset(tupleIndex) + fieldStartOffsets[fIdx] + fields[fIdx].getStaticallyKnownDataLength();
+    }
+
+    @Override
+    public int getFieldLength(int tupleIndex, int fIdx) {
+        return fields[fIdx].getStaticallyKnownDataLength();
+    }
+
+    @Override
+    public int getFieldSlotsLength() {
+        return 0;
+    }
+
+    @Override
+    public int getFieldStartOffset(int tupleIndex, int fIdx) {
+        return tupleIndex * tupleSize + fieldStartOffsets[fIdx];
+    }
+
+    @Override
+    public int getTupleCount() {
+        return buffer.getInt(FrameHelper.getTupleCountOffset(frameSize));
+    }
+
+    @Override
+    public int getTupleEndOffset(int tupleIndex) {
+        return getFieldEndOffset(tupleIndex, fields.length-1);
+    }
+
+    @Override
+    public int getTupleStartOffset(int tupleIndex) {
+        return tupleIndex * tupleSize;
+    }
+
+    @Override
+    public void reset(ByteBuffer buffer) {
+        this.buffer = buffer;
+    }
+}
diff --git a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/impls/FixedSizeFrameTupleAppender.java b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/impls/FixedSizeFrameTupleAppender.java
new file mode 100644
index 0000000..7d95584
--- /dev/null
+++ b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/impls/FixedSizeFrameTupleAppender.java
@@ -0,0 +1,111 @@
+package edu.uci.ics.hyracks.storage.am.invertedindex.impls;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.FrameHelper;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTrait;
+
+public class FixedSizeFrameTupleAppender {
+
+    private static final int TUPLE_COUNT_SIZE = 4;
+    private final int frameSize;
+    private final int tupleSize;
+    private ByteBuffer buffer;
+    private int tupleCount;
+    private int tupleDataEndOffset;        
+
+    public FixedSizeFrameTupleAppender(int frameSize, ITypeTrait[] fields) {
+        this.frameSize = frameSize;
+        int tmp = 0;
+        for(int i = 0; i < fields.length; i++) {
+            tmp += fields[i].getStaticallyKnownDataLength();
+        }
+        tupleSize = tmp;
+    }
+    
+    public void reset(ByteBuffer buffer, boolean clear) {
+        this.buffer = buffer;
+        if (clear) {
+            buffer.putInt(FrameHelper.getTupleCountOffset(frameSize), 0);
+            tupleCount = 0;
+            tupleDataEndOffset = 0;
+        }
+    }
+    
+    public boolean append(byte[] bytes, int offset) {
+        if (tupleDataEndOffset + tupleSize + TUPLE_COUNT_SIZE <= frameSize) {
+            System.arraycopy(bytes, offset, buffer.array(), tupleDataEndOffset, tupleSize);
+            tupleDataEndOffset += tupleSize;
+            tupleCount++;           
+            return true;
+        }
+        return false;
+    }        
+    
+    public boolean append(byte[] bytes, int offset, int length) {
+        if (tupleDataEndOffset + length + TUPLE_COUNT_SIZE <= frameSize) {
+            System.arraycopy(bytes, offset, buffer.array(), tupleDataEndOffset, length);
+            tupleDataEndOffset += length; 
+            return true;
+        }
+        return false;
+    }
+    
+    public boolean append(int fieldValue) {
+        if (tupleDataEndOffset + 4 + TUPLE_COUNT_SIZE <= frameSize) {
+            buffer.putInt(tupleDataEndOffset, fieldValue);            
+            tupleDataEndOffset += 4;
+            tupleCount++;           
+            return true;
+        }
+        return false;
+    }
+    
+    public boolean append(long fieldValue) {
+        if (tupleDataEndOffset + 8 + TUPLE_COUNT_SIZE <= frameSize) {
+            buffer.putLong(tupleDataEndOffset, fieldValue);            
+            tupleDataEndOffset += 8;
+            tupleCount++;           
+            return true;
+        }
+        return false;
+    }
+    
+    public boolean append(char fieldValue) {
+        if (tupleDataEndOffset + 2 + TUPLE_COUNT_SIZE <= frameSize) {
+            buffer.putLong(tupleDataEndOffset, fieldValue);            
+            tupleDataEndOffset += 2;
+            tupleCount++; 
+            return true;
+        }
+        return false;
+    }
+    
+    public boolean append(byte fieldValue) {
+        if (tupleDataEndOffset + 1 + TUPLE_COUNT_SIZE <= frameSize) {
+            buffer.put(tupleDataEndOffset, fieldValue);            
+            tupleDataEndOffset += 1;
+            tupleCount++;           
+            return true;
+        }
+        return false;
+    }        
+    
+    // returns true if an entire tuple fits
+    // returns false otherwise
+    public boolean hasSpace() {
+        return tupleDataEndOffset + tupleSize + TUPLE_COUNT_SIZE <= frameSize;
+    }
+    
+    public void incrementTupleCount(int count) {
+        buffer.putInt(FrameHelper.getTupleCountOffset(frameSize), buffer.getInt(FrameHelper.getTupleCountOffset(frameSize)) + count);
+    }
+    
+    public int getTupleCount() {
+        return tupleCount;
+    }
+
+    public ByteBuffer getBuffer() {
+        return buffer;
+    }
+}
diff --git a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/impls/TOccurrenceSearcher.java b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/impls/TOccurrenceSearcher.java
index 1998a7c..618c61b 100644
--- a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/impls/TOccurrenceSearcher.java
+++ b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/impls/TOccurrenceSearcher.java
@@ -26,7 +26,9 @@
 import edu.uci.ics.fuzzyjoin.tokenizer.IToken;
 import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTrait;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.TypeTrait;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
@@ -48,10 +50,11 @@
 public class TOccurrenceSearcher {
 
     private final IHyracksStageletContext ctx;
-    private final ArrayTupleBuilder resultTupleBuilder;
-    private final FrameTupleAppender resultTupleAppender;
-    private final FrameTupleAccessor resultFrameAccessor;
-
+    private final FixedSizeFrameTupleAppender resultFrameTupleApp;
+    private final FixedSizeFrameTupleAccessor resultFrameTupleAcc;
+    private final FixedSizeTupleReference resultTuple;
+    private final int invListKeyLength;
+    
     private List<ByteBuffer> newResultBuffers = new ArrayList<ByteBuffer>();
     private List<ByteBuffer> prevResultBuffers = new ArrayList<ByteBuffer>();
     private List<ByteBuffer> swap = null;
@@ -66,24 +69,35 @@
         
     private final InvertedIndex invIndex;    
     private final IBinaryTokenizer queryTokenizer;    
-    private final int occurrenceThreshold;
+    private int occurrenceThreshold;
     
     private final int cursorCacheSize = 10;
     private ArrayList<IInvertedListCursor> invListCursorCache = new ArrayList<IInvertedListCursor>(cursorCacheSize);
     private ArrayList<IInvertedListCursor> invListCursors = new ArrayList<IInvertedListCursor>(cursorCacheSize);
 
-    public TOccurrenceSearcher(IHyracksStageletContext ctx, InvertedIndex invIndex, IBinaryTokenizer queryTokenizer, int occurrenceThreshold) {
+    public TOccurrenceSearcher(IHyracksStageletContext ctx, InvertedIndex invIndex, IBinaryTokenizer queryTokenizer) {
         this.ctx = ctx;
         this.invIndex = invIndex;
         this.queryTokenizer = queryTokenizer;
-        this.occurrenceThreshold = occurrenceThreshold;
 
         leafFrame = invIndex.getBTree().getLeafFrameFactory().getFrame();
         interiorFrame = invIndex.getBTree().getInteriorFrameFactory().getFrame();
 
         btreeCursor = new RangeSearchCursor(leafFrame);
-        resultTupleAppender = new FrameTupleAppender(ctx.getFrameSize());
-        resultTupleBuilder = new ArrayTupleBuilder(1); // TODO: fix hardcoded
+        ITypeTrait[] invListFields = invIndex.getInvListElementCmp().getTypeTraits();
+        ITypeTrait[] invListFieldsWithCount = new TypeTrait[invListFields.length + 1];
+        int tmp = 0;
+        for(int i = 0; i < invListFields.length; i++) {
+            invListFieldsWithCount[i] = invListFields[i];
+            tmp += invListFields[i].getStaticallyKnownDataLength();
+        }
+        // using an integer for counting occurrences
+        invListFieldsWithCount[invListFields.length] = new TypeTrait(4);
+        invListKeyLength = tmp;
+        
+        resultFrameTupleApp = new FixedSizeFrameTupleAppender(ctx.getFrameSize(), invListFieldsWithCount);
+        resultFrameTupleAcc = new FixedSizeFrameTupleAccessor(ctx.getFrameSize(), invListFieldsWithCount);
+        resultTuple = new FixedSizeTupleReference(invListFieldsWithCount);
         newResultBuffers.add(ctx.allocateFrame());
         prevResultBuffers.add(ctx.allocateFrame());
 
@@ -91,11 +105,7 @@
         btreePred.setLowKeyComparator(searchCmp);
         btreePred.setHighKeyComparator(searchCmp);
         btreePred.setLowKey(searchKey, true);
-        btreePred.setHighKey(searchKey, true);
-
-        ISerializerDeserializer[] valueSerde = { IntegerSerializerDeserializer.INSTANCE };
-        RecordDescriptor valueRecDesc = new RecordDescriptor(valueSerde);
-        resultFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), valueRecDesc);
+        btreePred.setHighKey(searchKey, true);                
 
         // pre-create cursor objects
         for (int i = 0; i < cursorCacheSize; i++) {
@@ -151,54 +161,237 @@
         
         BTreeOpContext btreeOpCtx = invIndex.getBTree().createOpContext(TreeIndexOp.TI_SEARCH, leafFrame,
                 interiorFrame, null);
+        
         invListCursors.clear();
-        System.out.println("NUM QUERY TOKENS: " + numQueryTokens);
         for (int i = 0; i < numQueryTokens; i++) {
             searchKey.reset(queryTokenAccessor, i);
             invIndex.openCursor(btreeCursor, btreePred, btreeOpCtx, invListCursorCache.get(i));
-            invListCursors.add(invListCursorCache.get(i));
+            invListCursors.add(invListCursorCache.get(i));            
         }
         Collections.sort(invListCursors);
-
-        ISerializerDeserializer[] invListSerdes = new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE };
-        for (int i = 0; i < numQueryTokens; i++) {
-            System.out.println("LISTSIZE: " + invListCursors.get(i).getNumElements());
-            
-            invListCursors.get(i).pinPagesSync();
-            String s = invListCursors.get(i).printInvList(invListSerdes);
-            System.out.println(s);
-            invListCursors.get(i).unpinPages();
+        
+        for(int i = 0; i < numQueryTokens; i++) {
+            System.out.println("SIZE: " + i + " " + invListCursors.get(i).getNumElements());
         }
         
+        occurrenceThreshold = numQueryTokens;
+        
         int numPrefixTokens = numQueryTokens - occurrenceThreshold + 1;
-                
-        resultTupleAppender.reset(newResultBuffers.get(0), true);
+        
+        int maxPrevBufIdx = mergePrefixLists(numPrefixTokens, numQueryTokens);        
+        maxPrevBufIdx = mergeSuffixLists(numPrefixTokens, numQueryTokens, maxPrevBufIdx);        
         
         /*
-        try {
-            // append first inverted list to temporary results
-            searchKey.reset(queryTokenAccessor, 0);
-            btree.search(btreeCursor, pred, opCtx);
-            while (btreeCursor.hasNext()) {
-                btreeCursor.next();
-                maxResultBufIdx = appendTupleToNewResults(btreeCursor, maxResultBufIdx);
-            }
-            btreeCursor.close();
-            btreeCursor.reset();
-        } catch (Exception e) {
-            throw new HyracksDataException(e);
+        for(int i = 0; i <= maxPrevBufIdx; i++) {
+            ByteBuffer testBuf = newResultBuffers.get(i);
+            resultFrameTupleAcc.reset(testBuf);
+            for(int j = 0; j < resultFrameTupleAcc.getTupleCount(); j++) {
+                System.out.print(IntegerSerializerDeserializer.getInt(resultFrameTupleAcc.getBuffer().array(), resultFrameTupleAcc.getFieldStartOffset(j, 0)) + ",");
+                System.out.print(IntegerSerializerDeserializer.getInt(resultFrameTupleAcc.getBuffer().array(), resultFrameTupleAcc.getFieldStartOffset(j, 1)) + " ");
+            }            
         }
+        System.out.println();
         */
         
+    }        
         
-        resultTupleAppender.reset(newResultBuffers.get(0), true);
-        
+    private int mergePrefixLists(int numPrefixTokens, int numQueryTokens) throws IOException {
+        resultFrameTupleApp.reset(newResultBuffers.get(0), true);
+        int maxPrevBufIdx = 0;
         for(int i = 0; i < numPrefixTokens; i++) {
-            
+            swap = prevResultBuffers;
+            prevResultBuffers = newResultBuffers;
+            newResultBuffers = swap;
+                        
+            invListCursors.get(i).pinPagesSync();
+            maxPrevBufIdx = mergePrefixList(invListCursors.get(i), prevResultBuffers, maxPrevBufIdx, newResultBuffers);
+            invListCursors.get(i).unpinPages();        
+        }          
+                
+        return maxPrevBufIdx;
+    }
+        
+    private int mergeSuffixLists(int numPrefixTokens, int numQueryTokens, int maxPrevBufIdx) throws IOException {
+        
+        swap = prevResultBuffers;
+        prevResultBuffers = newResultBuffers;
+        newResultBuffers = swap;
+        
+        System.out.println("MERGING SUFFIX LISTS");
+        
+        int newBufIdx = 0;
+        ByteBuffer newCurrentBuffer = newResultBuffers.get(0);
+
+        int prevBufIdx = 0;
+        ByteBuffer prevCurrentBuffer = prevResultBuffers.get(0);
+        
+        int resultTidx = 0; 
+        
+        MultiComparator invListCmp = invIndex.getInvListElementCmp();
+        
+        resultFrameTupleAcc.reset(prevCurrentBuffer);
+        resultFrameTupleApp.reset(newCurrentBuffer, true);
+        
+        for(int i = numPrefixTokens; i < numQueryTokens; i++) {
+            invListCursors.get(i).pinPagesSync();
         }
         
+        try {                          
+            while(resultTidx < resultFrameTupleAcc.getTupleCount()) {
+                
+                resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));            
+                int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0), resultTuple.getFieldStart(resultTuple.getFieldCount()-1)); 
+                for(int i = numPrefixTokens; i < numQueryTokens; i++) {
+                    if(invListCursors.get(i).containsKey(resultTuple, invListCmp)) {
+                        count++;
+                        if(count >= occurrenceThreshold) {
+                            break;
+                        }
+                    }
+                    else {
+                        if(count + numQueryTokens - i + 1 < occurrenceThreshold) {
+                            break; // early termination
+                        }
+                    }
+                }
+
+                // add to final results?
+                if(count >= occurrenceThreshold) {
+                    newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
+                }
+
+                resultTidx++;
+                if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
+                    prevBufIdx++;
+                    if (prevBufIdx <= maxPrevBufIdx) {
+                        prevCurrentBuffer = prevResultBuffers.get(prevBufIdx);
+                        resultFrameTupleAcc.reset(prevCurrentBuffer);
+                        resultTidx = 0;
+                    }
+                }
+            }
+        } finally {
+            for(int i = numPrefixTokens; i < numQueryTokens; i++) {
+                invListCursors.get(i).unpinPages();
+            }
+        }
+        
+        return newBufIdx;
+    }
+    
+    private int mergePrefixList(IInvertedListCursor invListCursor, List<ByteBuffer> prevResultBuffers, int maxPrevBufIdx, List<ByteBuffer> newResultBuffers) throws IOException {                
+        int newBufIdx = 0;
+        ByteBuffer newCurrentBuffer = newResultBuffers.get(0);
+
+        int prevBufIdx = 0;
+        ByteBuffer prevCurrentBuffer = prevResultBuffers.get(0);
+        
+        boolean advanceCursor = true;
+        boolean advancePrevResult = false;
+        int resultTidx = 0; 
+        
+        MultiComparator invListCmp = invIndex.getInvListElementCmp();
+        
+        resultFrameTupleAcc.reset(prevCurrentBuffer);
+        resultFrameTupleApp.reset(newCurrentBuffer, true);
+        
+        while(invListCursor.hasNext() && resultTidx < resultFrameTupleAcc.getTupleCount()) {
+            
+            if(advanceCursor) invListCursor.next();
+            
+            ITupleReference invListTuple = invListCursor.getTuple();
+            
+            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));            
+            
+            int cmp = invListCmp.compare(invListTuple, resultTuple);
+            if (cmp == 0) {
+                int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0), resultTuple.getFieldStart(resultTuple.getFieldCount()-1)) + 1;
+                newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);                
+                advanceCursor = true;
+                advancePrevResult = true;
+            } else {
+                if (cmp < 0) {
+                    int count = 1;
+                    newBufIdx = appendTupleToNewResults(invListTuple, count, newBufIdx);
+                    advanceCursor = true;
+                    advancePrevResult = false;
+                } else {
+                    int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0), resultTuple.getFieldStart(resultTuple.getFieldCount()-1));
+                    newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
+                    advanceCursor = false;
+                    advancePrevResult = true;
+                }
+            }
+
+            if (advancePrevResult) {
+                resultTidx++;
+                if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
+                    prevBufIdx++;
+                    if (prevBufIdx <= maxPrevBufIdx) {
+                        prevCurrentBuffer = prevResultBuffers.get(prevBufIdx);
+                        resultFrameTupleAcc.reset(prevCurrentBuffer);
+                        resultTidx = 0;
+                    }
+                }
+            }            
+        }
+        
+        // append remaining new elements from inverted list
+        //if(invListCursor.hasNext()) System.out.println("APPENDING FROM INV LIST");        
+        while(invListCursor.hasNext()) {            
+            invListCursor.next();            
+            ITupleReference invListTuple = invListCursor.getTuple();
+            newBufIdx = appendTupleToNewResults(invListTuple, 1, newBufIdx);       
+        }
+        
+        // append remaining elements from previous result set
+        //if(resultTidx < resultFrameTupleAcc.getTupleCount()) System.out.println("APPENDING FROM RESULTS");
+        while(resultTidx < resultFrameTupleAcc.getTupleCount()) {                        
+            
+            int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0), resultTuple.getFieldStart(resultTuple.getFieldCount()-1));
+            newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
+            
+            resultTidx++;
+            if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
+                prevBufIdx++;
+                if (prevBufIdx <= maxPrevBufIdx) {
+                    prevCurrentBuffer = prevResultBuffers.get(prevBufIdx);
+                    resultFrameTupleAcc.reset(prevCurrentBuffer);
+                    resultTidx = 0;
+                }
+            }
+        }
+                
+        return newBufIdx;        
     }        
     
+    private int appendTupleToNewResults(ITupleReference tuple, int newCount, int newBufIdx) throws IOException {                        
+        ByteBuffer newCurrentBuffer = newResultBuffers.get(newBufIdx);
+        
+        if (!resultFrameTupleApp.hasSpace()) {
+            newBufIdx++;
+            if (newBufIdx >= newResultBuffers.size()) {
+                newResultBuffers.add(ctx.allocateFrame());
+            }
+            newCurrentBuffer = newResultBuffers.get(newBufIdx);
+            resultFrameTupleApp.reset(newCurrentBuffer, true);            
+        }
+        
+        // append key
+        if (!resultFrameTupleApp.append(tuple.getFieldData(0), tuple.getFieldStart(0), invListKeyLength) ) {
+            throw new IllegalStateException();
+        }
+        
+        // append new count
+        if (!resultFrameTupleApp.append(newCount) ) {
+            throw new IllegalStateException();
+        }
+
+        resultFrameTupleApp.incrementTupleCount(1);
+        
+        return newBufIdx;
+    }
+    
    
     /*
     private int appendTupleToNewResults(IBTreeCursor btreeCursor, int newBufIdx) throws IOException {
diff --git a/hyracks-tests/hyracks-storage-am-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/invertedindex/BulkLoadTest.java b/hyracks-tests/hyracks-storage-am-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/invertedindex/BulkLoadTest.java
index 17f45e4..90cbb03 100644
--- a/hyracks-tests/hyracks-storage-am-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/invertedindex/BulkLoadTest.java
+++ b/hyracks-tests/hyracks-storage-am-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/invertedindex/BulkLoadTest.java
@@ -57,15 +57,15 @@
 
 public class BulkLoadTest extends AbstractInvIndexTest {
 	// testing params
-    private static final int PAGE_SIZE = 256;
-    private static final int NUM_PAGES = 10;
-    private static final int HYRACKS_FRAME_SIZE = 256;
+    //private static final int PAGE_SIZE = 256;
+    //private static final int NUM_PAGES = 100;
+    //private static final int HYRACKS_FRAME_SIZE = 256;
 
     // realistic params
     // private static final int PAGE_SIZE = 65536;
-    //private static final int PAGE_SIZE = 32768;
-    //private static final int NUM_PAGES = 10;
-    //private static final int HYRACKS_FRAME_SIZE = 32768;
+    private static final int PAGE_SIZE = 32768;
+    private static final int NUM_PAGES = 10000;
+    private static final int HYRACKS_FRAME_SIZE = 32768;
     private IHyracksStageletContext stageletCtx = TestUtils.create(HYRACKS_FRAME_SIZE);    
 
     @Test
@@ -160,7 +160,7 @@
         tokens.add("systems");
         tokens.add("university");      
         
-        int maxId = 1000;
+        int maxId = 100;
         int addProb = 0;
         int addProbStep = 2;        
 
@@ -221,7 +221,8 @@
         queryAccessor.reset(frame);
         FrameTupleReference queryTuple = new FrameTupleReference();
 
-        String query = "computer hyracks fast blubb";
+        //String query = "computer hyracks fast";
+        String query = "compilers fast university hyracks";
         
         ITokenFactory tokenFactory = new UTF8WordTokenFactory();
         IBinaryTokenizer queryTokenizer = new DelimitedUTF8StringBinaryTokenizer(true, false, tokenFactory);
@@ -239,14 +240,16 @@
         DataInput dataIn = new DataInputStream(inStream);
         Object o = serde.deserialize(dataIn);
         System.out.println(o.toString());
-        
-        
-        TOccurrenceSearcher searcher = new TOccurrenceSearcher(stageletCtx, invIndex, queryTokenizer, 1);
+                
+        TOccurrenceSearcher searcher = new TOccurrenceSearcher(stageletCtx, invIndex, queryTokenizer);
 
-        long timeStart = System.currentTimeMillis();
-        searcher.search(queryTuple, 0);
-        long timeEnd = System.currentTimeMillis();
-        System.out.println("SEARCH TIME: " + (timeEnd - timeStart) + "ms");
+        int repeats = 10;
+        for(int i = 0; i < repeats; i++) {
+        	long timeStart = System.currentTimeMillis();
+        	searcher.search(queryTuple, 0);
+        	long timeEnd = System.currentTimeMillis();
+        	System.out.println("SEARCH TIME: " + (timeEnd - timeStart) + "ms");
+        }
         
         
         
diff --git a/hyracks-tests/hyracks-storage-am-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/invertedindex/FixedSizeFrameTupleTest.java b/hyracks-tests/hyracks-storage-am-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/invertedindex/FixedSizeFrameTupleTest.java
new file mode 100644
index 0000000..3d295e1
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/invertedindex/FixedSizeFrameTupleTest.java
@@ -0,0 +1,55 @@
+package edu.uci.ics.hyracks.storage.am.invertedindex;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Random;
+
+import junit.framework.Assert;
+
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTrait;
+import edu.uci.ics.hyracks.api.dataflow.value.TypeTrait;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.storage.am.invertedindex.impls.FixedSizeFrameTupleAccessor;
+import edu.uci.ics.hyracks.storage.am.invertedindex.impls.FixedSizeFrameTupleAppender;
+
+public class FixedSizeFrameTupleTest {
+
+	private static int FRAME_SIZE = 4096;
+	
+	private Random rnd = new Random(50);
+	
+	@Test
+    public void singleFieldTest() throws Exception {
+		ByteBuffer buffer = ByteBuffer.allocate(FRAME_SIZE);
+		
+		ITypeTrait[] fields = new TypeTrait[1];
+		fields[0] = new TypeTrait(4);
+		
+		FixedSizeFrameTupleAppender ftapp = new FixedSizeFrameTupleAppender(FRAME_SIZE, fields);
+		FixedSizeFrameTupleAccessor ftacc = new FixedSizeFrameTupleAccessor(FRAME_SIZE, fields);
+		
+		boolean frameHasSpace = true;		
+		
+		ArrayList<Integer> check = new ArrayList<Integer>();
+		
+		ftapp.reset(buffer, true);
+		while(frameHasSpace) {			
+			int val = rnd.nextInt();
+			frameHasSpace = ftapp.append(val);
+			if(frameHasSpace) {
+				check.add(val);
+				ftapp.incrementTupleCount(1);
+			}
+		}
+		
+		ftacc.reset(buffer);
+		System.out.println("TUPLECOUNT: " + ftacc.getTupleCount());
+		System.out.println("CHECKCOUNT: " + check.size());
+		for(int i = 0; i < ftacc.getTupleCount(); i++) {
+			int val = IntegerSerializerDeserializer.INSTANCE.getInt(ftacc.getBuffer().array(), ftacc.getTupleStartOffset(i));			
+			Assert.assertEquals(check.get(i).intValue(), val);						
+		}		
+	}
+}