fixing format issues
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/dataflow/data/nontagged/comparators/ListItemBinaryComparatorFactory.java b/asterix-om/src/main/java/edu/uci/ics/asterix/dataflow/data/nontagged/comparators/ListItemBinaryComparatorFactory.java
index 01e34b0..6e80fe6 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/dataflow/data/nontagged/comparators/ListItemBinaryComparatorFactory.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/dataflow/data/nontagged/comparators/ListItemBinaryComparatorFactory.java
@@ -37,10 +37,11 @@
 
     @Override
     public IBinaryComparator createBinaryComparator() {
-    	return createBinaryComparator(ATypeTag.NULL, ATypeTag.NULL, false);
+        return createBinaryComparator(ATypeTag.NULL, ATypeTag.NULL, false);
     }
-    
-    public IBinaryComparator createBinaryComparator(final ATypeTag firstItemTypeTag, final ATypeTag secondItemTypeTag, final boolean ignoreCase) {
+
+    public IBinaryComparator createBinaryComparator(final ATypeTag firstItemTypeTag, final ATypeTag secondItemTypeTag,
+            final boolean ignoreCase) {
         return new IBinaryComparator() {
             final IBinaryComparator ascBoolComp = BooleanBinaryComparatorFactory.INSTANCE.createBinaryComparator();
             final IBinaryComparator ascIntComp = new PointableBinaryComparatorFactory(IntegerPointable.FACTORY)
@@ -48,8 +49,8 @@
             final IBinaryComparator ascLongComp = LongBinaryComparatorFactory.INSTANCE.createBinaryComparator();
             final IBinaryComparator ascStrComp = new PointableBinaryComparatorFactory(UTF8StringPointable.FACTORY)
                     .createBinaryComparator();
-            final IBinaryComparator ascLowerCaseStrComp =  new PointableBinaryComparatorFactory(UTF8StringLowercasePointable.FACTORY)
-            		.createBinaryComparator();
+            final IBinaryComparator ascLowerCaseStrComp = new PointableBinaryComparatorFactory(
+                    UTF8StringLowercasePointable.FACTORY).createBinaryComparator();
             final IBinaryComparator ascFloatComp = new PointableBinaryComparatorFactory(FloatPointable.FACTORY)
                     .createBinaryComparator();
             final IBinaryComparator ascDoubleComp = new PointableBinaryComparatorFactory(DoublePointable.FACTORY)
@@ -83,23 +84,23 @@
                     if (b2[s2] == ATypeTag.NULL.serialize())
                         return 1;
                 }
-                
+
                 ATypeTag tag1 = firstItemTypeTag;
                 int skip1 = 0;
                 if (firstItemTypeTag == ATypeTag.ANY) {
-                	tag1 = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(b1[s1]);
-                	skip1 = 1;
+                    tag1 = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(b1[s1]);
+                    skip1 = 1;
                 }
-                
+
                 ATypeTag tag2 = secondItemTypeTag;
                 int skip2 = 0;
                 if (secondItemTypeTag == ATypeTag.ANY) {
-                	tag2 = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(b2[s2]);
-                	skip2 = 1;
+                    tag2 = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(b2[s2]);
+                    skip2 = 1;
                 }
-                
+
                 if (tag1 != tag2) {
-                	return rawComp.compare(b1, s1 + skip1, l1 - skip1, b2, s2 + skip2, l2 - skip2);
+                    return rawComp.compare(b1, s1 + skip1, l1 - skip1, b2, s2 + skip2, l2 - skip2);
                 }
 
                 switch (tag1) {
@@ -124,11 +125,11 @@
                         return ascDoubleComp.compare(b1, s1 + skip1, l1 - skip1, b2, s2 + skip2, l2 - skip2);
                     }
                     case STRING: {
-                    	if (ignoreCase) {
-                    		return ascLowerCaseStrComp.compare(b1, s1 + skip1, l1 - skip1, b2, s2 + skip2, l2 - skip2);
-                    	} else {
-                    		return ascStrComp.compare(b1, s1 + skip1, l1 - skip1, b2, s2 + skip2, l2 - skip2);
-                    	}
+                        if (ignoreCase) {
+                            return ascLowerCaseStrComp.compare(b1, s1 + skip1, l1 - skip1, b2, s2 + skip2, l2 - skip2);
+                        } else {
+                            return ascStrComp.compare(b1, s1 + skip1, l1 - skip1, b2, s2 + skip2, l2 - skip2);
+                        }
                     }
                     case RECTANGLE: {
                         return ascRectangleComp.compare(b1, s1 + skip1, l1 - skip1, b2, s2 + skip2, l2 - skip2);
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/dataflow/data/nontagged/hash/ListItemBinaryHashFunctionFactory.java b/asterix-om/src/main/java/edu/uci/ics/asterix/dataflow/data/nontagged/hash/ListItemBinaryHashFunctionFactory.java
index cbcdcc7..0fab7de 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/dataflow/data/nontagged/hash/ListItemBinaryHashFunctionFactory.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/dataflow/data/nontagged/hash/ListItemBinaryHashFunctionFactory.java
@@ -28,9 +28,8 @@
 
 /**
  * This hash function factory is introduced to be able to hash heterogeneous list items.
- * The item type tag is also included in the hash computation to distinguish the different 
+ * The item type tag is also included in the hash computation to distinguish the different
  * types with the same raw bytes.
- * 
  */
 public class ListItemBinaryHashFunctionFactory implements IBinaryHashFunctionFactory {
 
@@ -43,52 +42,52 @@
 
     @Override
     public IBinaryHashFunction createBinaryHashFunction() {
-    	return createBinaryHashFunction(ATypeTag.ANY, false);
+        return createBinaryHashFunction(ATypeTag.ANY, false);
     }
-    
+
     public IBinaryHashFunction createBinaryHashFunction(final ATypeTag itemTypeTag, final boolean ignoreCase) {
         return new IBinaryHashFunction() {
-        	
-            private IBinaryHashFunction lowerCaseStringHash = new PointableBinaryHashFunctionFactory(UTF8StringLowercasePointable.FACTORY)
-            		.createBinaryHashFunction();
+
+            private IBinaryHashFunction lowerCaseStringHash = new PointableBinaryHashFunctionFactory(
+                    UTF8StringLowercasePointable.FACTORY).createBinaryHashFunction();
             private IBinaryHashFunction genericBinaryHash = MurmurHash3BinaryHashFunctionFamily.INSTANCE
-                    .createBinaryHashFunction(0);           
+                    .createBinaryHashFunction(0);
             private GrowableArray taggedBytes = new GrowableArray();
 
             @Override
             public int hash(byte[] bytes, int offset, int length) {
-            	ATypeTag tag = itemTypeTag;
-            	int skip = 0;
-            	if (itemTypeTag == ATypeTag.ANY) {
-            		tag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(bytes[offset]);
-            		skip = 1;
-            	}
+                ATypeTag tag = itemTypeTag;
+                int skip = 0;
+                if (itemTypeTag == ATypeTag.ANY) {
+                    tag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(bytes[offset]);
+                    skip = 1;
+                }
                 switch (tag) {
                     case STRING: {
-                    	if (ignoreCase) {
-                    		return lowerCaseStringHash.hash(bytes, offset + skip, length - skip);
-                    	}
+                        if (ignoreCase) {
+                            return lowerCaseStringHash.hash(bytes, offset + skip, length - skip);
+                        }
                     }
                     default: {
-                    	if (itemTypeTag != ATypeTag.ANY) {
-                    		// add the itemTypeTag in front of the data
-                    		try {
-                    			resetTaggedBytes(bytes, offset, length);
-                    			return genericBinaryHash.hash(taggedBytes.getByteArray(), 0, length + 1);
-							} catch (IOException e) {
-								throw new RuntimeException(e);
-							}
-                    	} else {
-                    		return genericBinaryHash.hash(bytes, offset, length);
-                    	}
+                        if (itemTypeTag != ATypeTag.ANY) {
+                            // add the itemTypeTag in front of the data
+                            try {
+                                resetTaggedBytes(bytes, offset, length);
+                                return genericBinaryHash.hash(taggedBytes.getByteArray(), 0, length + 1);
+                            } catch (IOException e) {
+                                throw new RuntimeException(e);
+                            }
+                        } else {
+                            return genericBinaryHash.hash(bytes, offset, length);
+                        }
                     }
                 }
             }
-            
-            public void resetTaggedBytes(byte[] data, int offset, int length) throws IOException {
-            	taggedBytes.reset();
-            	taggedBytes.getDataOutput().writeByte(itemTypeTag.serialize());
-            	taggedBytes.getDataOutput().write(data, offset, length);
+
+            private void resetTaggedBytes(byte[] data, int offset, int length) throws IOException {
+                taggedBytes.reset();
+                taggedBytes.getDataOutput().writeByte(itemTypeTag.serialize());
+                taggedBytes.getDataOutput().write(data, offset, length);
             }
         };
     }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/common/AbstractAsterixListIterator.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/common/AbstractAsterixListIterator.java
index 2f9cfc7..269d363 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/common/AbstractAsterixListIterator.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/common/AbstractAsterixListIterator.java
@@ -47,19 +47,19 @@
     public int getPos() {
         return pos;
     }
-    
+
     public int getItemLen() {
-    	return itemLen;
+        return itemLen;
     }
 
     @Override
     public void next() {
         try {
-        	pos = nextPos;
-        	++count;
+            pos = nextPos;
+            ++count;
             nextPos = startOff + listLength;
             if (count + 1 < numberOfItems) {
-            	nextPos = getItemOffset(data, startOff, count + 1);
+                nextPos = getItemOffset(data, startOff, count + 1);
             }
             itemLen = nextPos - pos;
         } catch (AsterixException e) {
@@ -74,7 +74,7 @@
             pos = getItemOffset(data, startOff, count);
             nextPos = startOff + listLength;
             if (count + 1 < numberOfItems) {
-            	nextPos = getItemOffset(data, startOff, count + 1);
+                nextPos = getItemOffset(data, startOff, count + 1);
             }
             itemLen = nextPos - pos;
         } catch (AsterixException e) {
@@ -121,6 +121,6 @@
     protected abstract int getItemOffset(byte[] serOrderedList, int offset, int itemIndex) throws AsterixException;
 
     protected abstract int getNumberOfItems(byte[] serOrderedList, int offset);
-    
+
     protected abstract int getListLength(byte[] serOrderedList, int offset);
 }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/common/AsterixOrderedListIterator.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/common/AsterixOrderedListIterator.java
index 6a73b6d..fc92875 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/common/AsterixOrderedListIterator.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/common/AsterixOrderedListIterator.java
@@ -15,8 +15,8 @@
         return AOrderedListSerializerDeserializer.getNumberOfItems(serOrderedList, offset);
     }
 
-	@Override
-	protected int getListLength(byte[] serOrderedList, int offset) {
-		return AOrderedListSerializerDeserializer.getOrderedListLength(serOrderedList, offset + 1);
-	}
+    @Override
+    protected int getListLength(byte[] serOrderedList, int offset) {
+        return AOrderedListSerializerDeserializer.getOrderedListLength(serOrderedList, offset + 1);
+    }
 }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/common/AsterixUnorderedListIterator.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/common/AsterixUnorderedListIterator.java
index 90d9ae3..5f01581 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/common/AsterixUnorderedListIterator.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/common/AsterixUnorderedListIterator.java
@@ -15,8 +15,8 @@
         return AUnorderedListSerializerDeserializer.getNumberOfItems(serOrderedList, offset);
     }
 
-	@Override
-	protected int getListLength(byte[] serOrderedList, int offset) {
-		return AUnorderedListSerializerDeserializer.getUnorderedListLength(serOrderedList, offset + 1);
-	}
+    @Override
+    protected int getListLength(byte[] serOrderedList, int offset) {
+        return AUnorderedListSerializerDeserializer.getUnorderedListLength(serOrderedList, offset + 1);
+    }
 }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/common/SimilarityJaccardCheckEvaluator.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/common/SimilarityJaccardCheckEvaluator.java
index 11c4d6b..b732f40 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/common/SimilarityJaccardCheckEvaluator.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/common/SimilarityJaccardCheckEvaluator.java
@@ -63,7 +63,7 @@
             byte[] buf = probeIter.getData();
             int off = probeIter.getPos();
             int len = probeIter.getItemLen();
-            keyEntry.set(buf, off, len);            
+            keyEntry.set(buf, off, len);
             BinaryEntry entry = hashMap.get(keyEntry);
             if (entry != null) {
                 // Increment second value.
@@ -94,7 +94,7 @@
         }
         return intersectionSize;
     }
-    
+
     @Override
     protected void writeResult(float jacc) throws IOException {
         listBuilder.reset(listType);
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/common/SimilarityJaccardEvaluator.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/common/SimilarityJaccardEvaluator.java
index c610bde..391fb30 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/common/SimilarityJaccardEvaluator.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/common/SimilarityJaccardEvaluator.java
@@ -105,7 +105,7 @@
 
         firstTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(argOut.getByteArray()[firstStart]);
         secondTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(argOut.getByteArray()[secondStart]);
-        
+
         firstItemTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(argOut.getByteArray()[firstStart + 1]);
         secondItemTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(argOut.getByteArray()[secondStart + 1]);
     }
@@ -201,13 +201,13 @@
             hashMap.clear();
             return;
         }
-      
-        IBinaryHashFunction putHashFunc = ListItemBinaryHashFunctionFactory.INSTANCE
-				.createBinaryHashFunction(buildItemTypeTag, ignoreCase);
-        IBinaryHashFunction getHashFunc = ListItemBinaryHashFunctionFactory.INSTANCE
-				.createBinaryHashFunction(probeItemTypeTag, ignoreCase);
-        IBinaryComparator cmp = ListItemBinaryComparatorFactory.INSTANCE
-				.createBinaryComparator(buildItemTypeTag, probeItemTypeTag, ignoreCase);
+
+        IBinaryHashFunction putHashFunc = ListItemBinaryHashFunctionFactory.INSTANCE.createBinaryHashFunction(
+                buildItemTypeTag, ignoreCase);
+        IBinaryHashFunction getHashFunc = ListItemBinaryHashFunctionFactory.INSTANCE.createBinaryHashFunction(
+                probeItemTypeTag, ignoreCase);
+        IBinaryComparator cmp = ListItemBinaryComparatorFactory.INSTANCE.createBinaryComparator(buildItemTypeTag,
+                probeItemTypeTag, ignoreCase);
         hashMap = new BinaryHashMap(TABLE_SIZE, TABLE_FRAME_SIZE, putHashFunc, getHashFunc, cmp);
     }
 
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/BinaryHashMap.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/BinaryHashMap.java
index f69a54e..6367996 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/BinaryHashMap.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/evaluators/functions/BinaryHashMap.java
@@ -20,157 +20,154 @@
  * Intended to work with binary data and be able to map arbitrary key types to
  * arbitrary value types, given that they have implementations of
  * IBinaryHashFunction and IBinaryComparator.
- * 
  * Uses 2 bytes each to indicate the length of the key and the value.
  * Uses 8 byte pointers for the linked list (4 bytes frame index, 4 bytes frame offset).
- * 
  * This class is NOT thread safe.
- * 
  */
 public class BinaryHashMap {
-	// Special value to indicate an empty "bucket" in the header array.
-	private static final long NULL_PTR = -1;
-	private static final int PTR_SIZE = 8;
-	private static final int SLOT_SIZE = 2;
-	private static final int ENTRY_HEADER_SIZE = PTR_SIZE + 2 * SLOT_SIZE;
-	private final IBinaryHashFunction putHashFunc;
-	private final IBinaryHashFunction getHashFunc;
-	private final IBinaryComparator cmp;
-	private final BinaryEntry returnValue = new BinaryEntry();
-	
-	private final long[] listHeads;
-	private final int frameSize;
-	private final List<ByteBuffer> frames = new ArrayList<ByteBuffer>();
-	private int currFrameIndex;
-	private int nextOff;
-	private int size;
-	
-	// Can be used for key or value.
-	public static class BinaryEntry {
-		public byte[] buf;
-		public int off;
-		public int len;
-		
-		public void set(byte[] buf, int off, int len) {
-			this.buf = buf;
-			this.off = off;
-			this.len = len;
-		}
-		
-		// Inefficient. Just for debugging.
-		@SuppressWarnings("rawtypes")
-		public String print(ISerializerDeserializer serde) throws HyracksDataException {
-			ByteArrayInputStream inStream = new ByteArrayInputStream(buf, off, len);
+    // Special value to indicate an empty "bucket" in the header array.
+    private static final long NULL_PTR = -1;
+    private static final int PTR_SIZE = 8;
+    private static final int SLOT_SIZE = 2;
+    private static final int ENTRY_HEADER_SIZE = PTR_SIZE + 2 * SLOT_SIZE;
+    private final IBinaryHashFunction putHashFunc;
+    private final IBinaryHashFunction getHashFunc;
+    private final IBinaryComparator cmp;
+    private final BinaryEntry returnValue = new BinaryEntry();
+
+    private final long[] listHeads;
+    private final int frameSize;
+    private final List<ByteBuffer> frames = new ArrayList<ByteBuffer>();
+    private int currFrameIndex;
+    private int nextOff;
+    private int size;
+
+    // Can be used for key or value.
+    public static class BinaryEntry {
+        public byte[] buf;
+        public int off;
+        public int len;
+
+        public void set(byte[] buf, int off, int len) {
+            this.buf = buf;
+            this.off = off;
+            this.len = len;
+        }
+
+        // Inefficient. Just for debugging.
+        @SuppressWarnings("rawtypes")
+        public String print(ISerializerDeserializer serde) throws HyracksDataException {
+            ByteArrayInputStream inStream = new ByteArrayInputStream(buf, off, len);
             DataInput dataIn = new DataInputStream(inStream);
             return serde.deserialize(dataIn).toString();
-		}
-	}
-	
-	public BinaryHashMap(int tableSize, int frameSize, IBinaryHashFunction putHashFunc, IBinaryHashFunction getHashFunc, IBinaryComparator cmp) {
-		listHeads = new long[tableSize];
-		this.frameSize = frameSize;		
-		this.putHashFunc = putHashFunc;
-		this.getHashFunc = getHashFunc;
-		this.cmp = cmp;
-		frames.add(ByteBuffer.allocate(frameSize));
-		clear();
-	}
-	
-	/**
-	 * Inserts key, value into the hash map. If key already exists, returns
-	 * existing entry. Otherwise, returns null.
-	 * 
-	 * @param key
-	 * @param value
-	 * @return
-	 */
-	public BinaryEntry put(BinaryEntry key, BinaryEntry value) {
-		return getPutInternal(key, value, true);
-	}
-	
-	/**
-	 * Retrieves value for given key. Returns null if key doesn't exist.
-	 * 
-	 * @param key
-	 * @param value
-	 * @return
-	 */
-	public BinaryEntry get(BinaryEntry key) {
-		return getPutInternal(key, null, false);
-	}
-	
-	private BinaryEntry getPutInternal(BinaryEntry key, BinaryEntry value, boolean put) {
-		int bucket;
-		if (put) {
-			bucket = Math.abs(putHashFunc.hash(key.buf, key.off, key.len) % listHeads.length);
-		} else {
-			bucket = Math.abs(getHashFunc.hash(key.buf, key.off, key.len) % listHeads.length);
-		}
-		long headPtr = listHeads[bucket];
-		if (headPtr == NULL_PTR) {
-			// Key definitely doesn't exist yet.
-			if (put) {
-				listHeads[bucket] = appendEntry(key, value);
-			}
-			return null;
-		}
-		// Follow the chain until we found an entry matching the given key.
-		int frameOff;
-		ByteBuffer frame;
-		do {				
-			int frameIndex = getFrameIndex(headPtr);
-			frameOff = getFrameOffset(headPtr);
-			frame = frames.get(frameIndex);
-			int entryKeyOff = frameOff + ENTRY_HEADER_SIZE;
-			int entryKeyLen = frame.getShort(frameOff);
-			if (cmp.compare(frame.array(), entryKeyOff, entryKeyLen, key.buf,
-					key.off, key.len) == 0) {
-				// Key found, set values and return.
-				int entryValOff = frameOff + ENTRY_HEADER_SIZE + entryKeyLen;
-				int entryValLen = frame.getShort(frameOff + SLOT_SIZE);
-				returnValue.set(frame.array(), entryValOff, entryValLen);
-				return returnValue;
-			}
-			headPtr = frame.getLong(frameOff + 2 * SLOT_SIZE);
-		} while (headPtr != NULL_PTR);
-		// We've followed the chain to its end, and didn't find the key.
-		if (put) {
-			// Append the new entry, and set a pointer to it in the last entry we've checked.
-			long newPtr = appendEntry(key, value);
-			frame.putLong(frameOff + 2 * SLOT_SIZE, newPtr);
-		}
-		return null;
-	}
-	
-	public long appendEntry(BinaryEntry key, BinaryEntry value) {
-		ByteBuffer frame = frames.get(currFrameIndex);
-		int requiredSpace = key.len + value.len + ENTRY_HEADER_SIZE;
-		if (nextOff + requiredSpace >= frameSize) {
-			// Entry doesn't fit on frame, allocate a new one.
-			if (requiredSpace > frameSize) {
-				throw new IllegalStateException("Key and value greater than framesize.");
-			}
-			frames.add(ByteBuffer.allocate(frameSize));
-			currFrameIndex++;
-			nextOff = 0;
-			frame = frames.get(currFrameIndex);
-		}
-		writeEntryHeader(frame, nextOff, key.len, value.len, NULL_PTR);
-		System.arraycopy(key.buf, key.off, frame.array(), nextOff + ENTRY_HEADER_SIZE, key.len);
-		System.arraycopy(value.buf, value.off, frame.array(), nextOff + ENTRY_HEADER_SIZE + key.len, value.len);
-		long entryPtr = getEntryPtr(currFrameIndex, nextOff);
-		nextOff += requiredSpace;
-		size++;
-		return entryPtr;
-	}
-	
-	private void writeEntryHeader(ByteBuffer frame, int targetOff, int keyLen, int valLen, long ptr) {
-		frame.putShort(targetOff, (short) keyLen);
-		frame.putShort(targetOff + SLOT_SIZE, (short) valLen);
-		frame.putLong(targetOff + 2 * SLOT_SIZE, ptr);
-	}
+        }
+    }
 
-	private long getEntryPtr(int frameIndex, int frameOff) {
+    public BinaryHashMap(int tableSize, int frameSize, IBinaryHashFunction putHashFunc,
+            IBinaryHashFunction getHashFunc, IBinaryComparator cmp) {
+        listHeads = new long[tableSize];
+        this.frameSize = frameSize;
+        this.putHashFunc = putHashFunc;
+        this.getHashFunc = getHashFunc;
+        this.cmp = cmp;
+        frames.add(ByteBuffer.allocate(frameSize));
+        clear();
+    }
+
+    /**
+     * Inserts key, value into the hash map. If key already exists, returns
+     * existing entry. Otherwise, returns null.
+     * 
+     * @param key
+     * @param value
+     * @return
+     */
+    public BinaryEntry put(BinaryEntry key, BinaryEntry value) {
+        return getPutInternal(key, value, true);
+    }
+
+    /**
+     * Retrieves value for given key. Returns null if key doesn't exist.
+     * 
+     * @param key
+     * @param value
+     * @return
+     */
+    public BinaryEntry get(BinaryEntry key) {
+        return getPutInternal(key, null, false);
+    }
+
+    private BinaryEntry getPutInternal(BinaryEntry key, BinaryEntry value, boolean put) {
+        int bucket;
+        if (put) {
+            bucket = Math.abs(putHashFunc.hash(key.buf, key.off, key.len) % listHeads.length);
+        } else {
+            bucket = Math.abs(getHashFunc.hash(key.buf, key.off, key.len) % listHeads.length);
+        }
+        long headPtr = listHeads[bucket];
+        if (headPtr == NULL_PTR) {
+            // Key definitely doesn't exist yet.
+            if (put) {
+                listHeads[bucket] = appendEntry(key, value);
+            }
+            return null;
+        }
+        // Follow the chain until we found an entry matching the given key.
+        int frameOff;
+        ByteBuffer frame;
+        do {
+            int frameIndex = getFrameIndex(headPtr);
+            frameOff = getFrameOffset(headPtr);
+            frame = frames.get(frameIndex);
+            int entryKeyOff = frameOff + ENTRY_HEADER_SIZE;
+            int entryKeyLen = frame.getShort(frameOff);
+            if (cmp.compare(frame.array(), entryKeyOff, entryKeyLen, key.buf, key.off, key.len) == 0) {
+                // Key found, set values and return.
+                int entryValOff = frameOff + ENTRY_HEADER_SIZE + entryKeyLen;
+                int entryValLen = frame.getShort(frameOff + SLOT_SIZE);
+                returnValue.set(frame.array(), entryValOff, entryValLen);
+                return returnValue;
+            }
+            headPtr = frame.getLong(frameOff + 2 * SLOT_SIZE);
+        } while (headPtr != NULL_PTR);
+        // We've followed the chain to its end, and didn't find the key.
+        if (put) {
+            // Append the new entry, and set a pointer to it in the last entry we've checked.
+            long newPtr = appendEntry(key, value);
+            frame.putLong(frameOff + 2 * SLOT_SIZE, newPtr);
+        }
+        return null;
+    }
+
+    public long appendEntry(BinaryEntry key, BinaryEntry value) {
+        ByteBuffer frame = frames.get(currFrameIndex);
+        int requiredSpace = key.len + value.len + ENTRY_HEADER_SIZE;
+        if (nextOff + requiredSpace >= frameSize) {
+            // Entry doesn't fit on frame, allocate a new one.
+            if (requiredSpace > frameSize) {
+                throw new IllegalStateException("Key and value greater than framesize.");
+            }
+            frames.add(ByteBuffer.allocate(frameSize));
+            currFrameIndex++;
+            nextOff = 0;
+            frame = frames.get(currFrameIndex);
+        }
+        writeEntryHeader(frame, nextOff, key.len, value.len, NULL_PTR);
+        System.arraycopy(key.buf, key.off, frame.array(), nextOff + ENTRY_HEADER_SIZE, key.len);
+        System.arraycopy(value.buf, value.off, frame.array(), nextOff + ENTRY_HEADER_SIZE + key.len, value.len);
+        long entryPtr = getEntryPtr(currFrameIndex, nextOff);
+        nextOff += requiredSpace;
+        size++;
+        return entryPtr;
+    }
+
+    private void writeEntryHeader(ByteBuffer frame, int targetOff, int keyLen, int valLen, long ptr) {
+        frame.putShort(targetOff, (short) keyLen);
+        frame.putShort(targetOff + SLOT_SIZE, (short) valLen);
+        frame.putLong(targetOff + 2 * SLOT_SIZE, ptr);
+    }
+
+    private long getEntryPtr(int frameIndex, int frameOff) {
         return (((long) frameIndex) << 32) + frameOff;
     }
 
@@ -182,93 +179,94 @@
         return (int) (ptr & 0xffffffff);
     }
 
-	public int size() {
-		return size;
-	}
+    public int size() {
+        return size;
+    }
 
-	public boolean isEmpty() {
-		return size > 0;
-	}
+    public boolean isEmpty() {
+        return size > 0;
+    }
 
-	public void clear() {
-		// Initialize all entries to point to nothing.
-		Arrays.fill(listHeads, NULL_PTR);
-		currFrameIndex = 0;
-		nextOff = 0;
-		size = 0;
-	}
-	
-	public Iterator<Pair<BinaryEntry, BinaryEntry>> iterator() {
-		return new BinaryHashMapIterator();
-	}
-	
-	public class BinaryHashMapIterator implements Iterator<Pair<BinaryEntry, BinaryEntry> > {
-		private final Pair<BinaryEntry, BinaryEntry> val = new Pair<BinaryEntry, BinaryEntry>(new BinaryEntry(), new BinaryEntry());
-		private int listHeadIndex;
-		private ByteBuffer frame;
-		private int frameIndex;
-		private int frameOff;
-		
-		public BinaryHashMapIterator() {
-			listHeadIndex = 0;
-			frame = null;
-			frameIndex = -1;
-			frameOff = -1;
-		}
-		
-		@Override
-		public boolean hasNext() {
-			if (frame != null) {
-				long nextPtr = frame.getLong(frameOff + 2 * SLOT_SIZE);
-				if (nextPtr == NULL_PTR) {
-					// End of current list.
-					listHeadIndex++;
-					return nextListHead();
-				} else {
-					// Follow pointer.
-					setValue(nextPtr);
-					return true;
-				}
-			}
-			return nextListHead();
-		}
+    public void clear() {
+        // Initialize all entries to point to nothing.
+        Arrays.fill(listHeads, NULL_PTR);
+        currFrameIndex = 0;
+        nextOff = 0;
+        size = 0;
+    }
 
-		private boolean nextListHead() {
-			// Position to first non-null list-head pointer.
-			while(listHeadIndex < listHeads.length && listHeads[listHeadIndex] == NULL_PTR) {
-				listHeadIndex++;
-			}
-			if (listHeadIndex < listHeads.length) {
-				// Positioned to first non-null list head.
-				setValue(listHeads[listHeadIndex]);
-				return true;
-			} else {
-				// No more lists.
-				frame = null;
-				return false;
-			}
-		}
-		
-		private void setValue(long ptr) {
-			frameIndex = getFrameIndex(ptr);
-			frameOff = getFrameOffset(ptr);
-			frame = frames.get(frameIndex);
-			int entryKeyOff = frameOff + ENTRY_HEADER_SIZE;
-			int entryKeyLen = frame.getShort(frameOff);
-			int entryValOff = frameOff + ENTRY_HEADER_SIZE + entryKeyLen;
-			int entryValLen = frame.getShort(frameOff + SLOT_SIZE);
-			val.first.set(frame.array(), entryKeyOff, entryKeyLen);
-			val.second.set(frame.array(), entryValOff, entryValLen);
-		}
-		
-		@Override
-		public Pair<BinaryEntry, BinaryEntry> next() {
-			return val;
-		}
+    public Iterator<Pair<BinaryEntry, BinaryEntry>> iterator() {
+        return new BinaryHashMapIterator();
+    }
 
-		@Override
-		public void remove() {
-			throw new UnsupportedOperationException("Remove not implemented");
-		}
-	}
+    public class BinaryHashMapIterator implements Iterator<Pair<BinaryEntry, BinaryEntry>> {
+        private final Pair<BinaryEntry, BinaryEntry> val = new Pair<BinaryEntry, BinaryEntry>(new BinaryEntry(),
+                new BinaryEntry());
+        private int listHeadIndex;
+        private ByteBuffer frame;
+        private int frameIndex;
+        private int frameOff;
+
+        public BinaryHashMapIterator() {
+            listHeadIndex = 0;
+            frame = null;
+            frameIndex = -1;
+            frameOff = -1;
+        }
+
+        @Override
+        public boolean hasNext() {
+            if (frame != null) {
+                long nextPtr = frame.getLong(frameOff + 2 * SLOT_SIZE);
+                if (nextPtr == NULL_PTR) {
+                    // End of current list.
+                    listHeadIndex++;
+                    return nextListHead();
+                } else {
+                    // Follow pointer.
+                    setValue(nextPtr);
+                    return true;
+                }
+            }
+            return nextListHead();
+        }
+
+        private boolean nextListHead() {
+            // Position to first non-null list-head pointer.
+            while (listHeadIndex < listHeads.length && listHeads[listHeadIndex] == NULL_PTR) {
+                listHeadIndex++;
+            }
+            if (listHeadIndex < listHeads.length) {
+                // Positioned to first non-null list head.
+                setValue(listHeads[listHeadIndex]);
+                return true;
+            } else {
+                // No more lists.
+                frame = null;
+                return false;
+            }
+        }
+
+        private void setValue(long ptr) {
+            frameIndex = getFrameIndex(ptr);
+            frameOff = getFrameOffset(ptr);
+            frame = frames.get(frameIndex);
+            int entryKeyOff = frameOff + ENTRY_HEADER_SIZE;
+            int entryKeyLen = frame.getShort(frameOff);
+            int entryValOff = frameOff + ENTRY_HEADER_SIZE + entryKeyLen;
+            int entryValLen = frame.getShort(frameOff + SLOT_SIZE);
+            val.first.set(frame.array(), entryKeyOff, entryKeyLen);
+            val.second.set(frame.array(), entryValOff, entryValLen);
+        }
+
+        @Override
+        public Pair<BinaryEntry, BinaryEntry> next() {
+            return val;
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException("Remove not implemented");
+        }
+    }
 }