Add ByteArrayPointable datatype.

Change-Id: Iebb5add2363d0f72dcd66ac139339ccf834a9df1
Reviewed-on: http://fulliautomatix.ics.uci.edu:8443/174
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/.gitignore b/.gitignore
index 3f833d5..e90f66d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -24,3 +24,5 @@
 output
 tmp
 dist
+*.iml
+.idea/
diff --git a/hyracks/hyracks-data/hyracks-data-std/src/main/java/edu/uci/ics/hyracks/data/std/primitive/ByteArrayPointable.java b/hyracks/hyracks-data/hyracks-data-std/src/main/java/edu/uci/ics/hyracks/data/std/primitive/ByteArrayPointable.java
new file mode 100644
index 0000000..099d0cc
--- /dev/null
+++ b/hyracks/hyracks-data/hyracks-data-std/src/main/java/edu/uci/ics/hyracks/data/std/primitive/ByteArrayPointable.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.data.std.primitive;
+
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.data.std.api.*;
+
+public class ByteArrayPointable extends AbstractPointable implements IHashable, IComparable {
+
+    public static final ITypeTraits TYPE_TRAITS = new ITypeTraits() {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public boolean isFixedLength() {
+            return false;
+        }
+
+        @Override
+        public int getFixedLength() {
+            return 0;
+        }
+    };
+
+    public static final IPointableFactory FACTORY = new IPointableFactory() {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public IPointable createPointable() {
+            return new ByteArrayPointable();
+        }
+
+        @Override
+        public ITypeTraits getTypeTraits() {
+            return TYPE_TRAITS;
+        }
+    };
+
+    @Override
+    public int compareTo(IPointable pointer) {
+        return compareTo(pointer.getByteArray(), pointer.getStartOffset(), pointer.getLength());
+    }
+
+    @Override
+    public int compareTo(byte[] bytes, int start, int length) {
+        int thislen = getLength(this.bytes, this.start);
+        int thatlen = getLength(bytes, start);
+
+        for (int thisIndex = 0, thatIndex = 0; thisIndex < thislen && thatIndex < thatlen; ++thisIndex, ++thatIndex) {
+            if (this.bytes[this.start + SIZE_OF_LENGTH + thisIndex] != bytes[start + SIZE_OF_LENGTH + thatIndex]) {
+                return (0xff & this.bytes[this.start + SIZE_OF_LENGTH + thisIndex]) - (0xff & bytes[start + SIZE_OF_LENGTH
+                        + thatIndex]);
+            }
+        }
+        return thislen - thatlen;
+    }
+
+    @Override
+    public int hash() {
+        int h = 0;
+        int realLength = getLength(bytes, start);
+        for (int i = 0; i < realLength; ++i) {
+            h = 31 * h + bytes[start + SIZE_OF_LENGTH + i];
+        }
+        return h;
+    }
+
+    @Override
+    public int getLength(){
+        return getFullLength(getByteArray(), getStartOffset());
+    }
+
+    public static final int SIZE_OF_LENGTH = 2;
+    public static final int MAX_LENGTH = 65535;
+
+    public static int getLength(byte[] bytes, int offset) {
+        return ((0xFF & bytes[offset]) << 8) + (0xFF & bytes[offset + 1]);
+    }
+
+    public static int getFullLength(byte[] bytes, int offset){
+        return getLength(bytes, offset) + SIZE_OF_LENGTH;
+    }
+
+    public static void putLength(int length, byte[] bytes, int offset) {
+        bytes[offset] = (byte) ((length >>> 8) & 0xFF);
+        bytes[offset + 1] = (byte) ((length >>> 0) & 0xFF);
+    }
+
+}
diff --git a/hyracks/hyracks-data/hyracks-data-std/src/test/java/edu/uci/ics/hyracks/data/std/primitive/ByteArrayPointableTest.java b/hyracks/hyracks-data/hyracks-data-std/src/test/java/edu/uci/ics/hyracks/data/std/primitive/ByteArrayPointableTest.java
new file mode 100644
index 0000000..cc0774f
--- /dev/null
+++ b/hyracks/hyracks-data/hyracks-data-std/src/test/java/edu/uci/ics/hyracks/data/std/primitive/ByteArrayPointableTest.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.data.std.primitive;
+
+import org.junit.Test;
+
+import javax.xml.bind.DatatypeConverter;
+
+import static org.junit.Assert.*;
+
+public class ByteArrayPointableTest {
+
+    public static byte[] generatePointableBytes(byte[] bytes){
+        byte[] ret = new byte[bytes.length + ByteArrayPointable.SIZE_OF_LENGTH];
+        for (int i = 0; i < bytes.length; ++i){
+            ret[i+ ByteArrayPointable.SIZE_OF_LENGTH] = bytes[i];
+        }
+        ByteArrayPointable.putLength(bytes.length, ret, 0);
+        return ret;
+    }
+
+    @Test
+    public void testCompareTo() throws Exception {
+        byte [] bytes = generatePointableBytes(new byte[] { 1, 2, 3, 4});
+        ByteArrayPointable byteArrayPointable = new ByteArrayPointable();
+        byteArrayPointable.set(bytes, 0, bytes.length);
+
+        testEqual(byteArrayPointable, generatePointableBytes(new byte[] { 1,2 ,3,4}));
+
+        testLessThan(byteArrayPointable, generatePointableBytes(new byte[] {2}));
+        testLessThan(byteArrayPointable, generatePointableBytes(new byte[] {1,2,3,5}));
+        testLessThan(byteArrayPointable, generatePointableBytes(new byte[] {1,2,3,4,5}));
+
+        testGreaterThan(byteArrayPointable, generatePointableBytes(new byte[] { }));
+        testGreaterThan(byteArrayPointable, generatePointableBytes(new byte[] { 0}));
+        testGreaterThan(byteArrayPointable, generatePointableBytes(new byte[] { 1,2,3}));
+
+    }
+
+
+    void testEqual(ByteArrayPointable pointable, byte [] bytes){
+        assertTrue(pointable.compareTo(bytes, 0, bytes.length) == 0);
+    }
+
+    void testLessThan(ByteArrayPointable pointable, byte[] bytes){
+        assertTrue(pointable.compareTo(bytes, 0, bytes.length) < 0);
+    }
+
+    void testGreaterThan(ByteArrayPointable pointable, byte[] bytes){
+        assertTrue(pointable.compareTo(bytes, 0, bytes.length) > 0);
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/marshalling/ByteArraySerializerDeserializer.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/marshalling/ByteArraySerializerDeserializer.java
new file mode 100644
index 0000000..0f5cdf5
--- /dev/null
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/marshalling/ByteArraySerializerDeserializer.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.common.data.marshalling;
+
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.primitive.ByteArrayPointable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class ByteArraySerializerDeserializer implements ISerializerDeserializer<byte[]> {
+
+    private static final long serialVersionUID = 1L;
+
+    public final static ByteArraySerializerDeserializer INSTANCE = new ByteArraySerializerDeserializer();
+
+    private ByteArraySerializerDeserializer() {
+    }
+
+    @Override
+    public byte[] deserialize(DataInput in) throws HyracksDataException {
+        try {
+            int length = in.readUnsignedShort();
+            byte[] bytes = new byte[length + ByteArrayPointable.SIZE_OF_LENGTH];
+            in.readFully(bytes, ByteArrayPointable.SIZE_OF_LENGTH, length);
+            ByteArrayPointable.putLength(length, bytes, 0);
+            return bytes;
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public void serialize(byte[] instance, DataOutput out) throws HyracksDataException {
+
+        if (instance.length > ByteArrayPointable.MAX_LENGTH) {
+            throw new HyracksDataException(
+                    "encoded byte array too long: " + instance.length + " bytes");
+        }
+        try {
+            int realLength = ByteArrayPointable.getFullLength(instance, 0);
+            out.write(instance, 0, realLength);
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    public void serialize(byte[] instance, int start, int length, DataOutput out) throws HyracksDataException {
+        if (length > ByteArrayPointable.MAX_LENGTH) {
+            throw new HyracksDataException(
+                    "encoded byte array too long: " + instance.length + " bytes");
+        }
+        try {
+            out.write(instance, start, length);
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+}
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/normalizers/ByteArrayNormalizedKeyComputerFactory.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/normalizers/ByteArrayNormalizedKeyComputerFactory.java
new file mode 100644
index 0000000..be37b21
--- /dev/null
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/normalizers/ByteArrayNormalizedKeyComputerFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.common.data.normalizers;
+
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.data.std.primitive.ByteArrayPointable;
+
+public class ByteArrayNormalizedKeyComputerFactory implements INormalizedKeyComputerFactory {
+    public static ByteArrayNormalizedKeyComputerFactory INSTANCE = new ByteArrayNormalizedKeyComputerFactory();
+
+    @Override public INormalizedKeyComputer createNormalizedKeyComputer() {
+        return new INormalizedKeyComputer() {
+            @Override public int normalize(byte[] bytes, int start, int length) {
+                int normalizedKey = 0;
+                int realLength = ByteArrayPointable.getLength(bytes, start);
+                for (int i = 0; i < 3; ++i) {
+                    normalizedKey <<= 8;
+                    if (i < realLength) {
+                        normalizedKey += bytes[start + ByteArrayPointable.SIZE_OF_LENGTH + i] & 0xff;
+                    }
+                }
+                // last byte, shift 7 instead of 8 to avoid negative number
+                normalizedKey <<= 7;
+                if (3 < realLength) {
+                    normalizedKey += (bytes[start + ByteArrayPointable.SIZE_OF_LENGTH + 3] & 0xfe) >> 1;
+                }
+                return normalizedKey;
+            }
+        };
+    }
+}
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/ByteArrayBase64ParserFactory.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/ByteArrayBase64ParserFactory.java
new file mode 100644
index 0000000..edb4136
--- /dev/null
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/ByteArrayBase64ParserFactory.java
@@ -0,0 +1,243 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.common.data.parsers;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.primitive.ByteArrayPointable;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+
+public class ByteArrayBase64ParserFactory implements IValueParserFactory {
+
+    public static final ByteArrayBase64ParserFactory INSTANCE = new ByteArrayBase64ParserFactory();
+
+    private ByteArrayBase64ParserFactory() {
+    }
+
+    @Override public IValueParser createValueParser() {
+        return new IValueParser() {
+            private byte[] buffer;
+            private byte[] quadruplet = new byte[4];
+
+            @Override public void parse(char[] input, int start, int length, DataOutput out)
+                    throws HyracksDataException {
+                if (length % 4 != 0) {
+                    throw new HyracksDataException(
+                            "Invalid Base64 string, the length of the string should be a multiple of 4");
+                }
+                buffer = extractPointableArrayFromBase64String(input, start, length, buffer, quadruplet);
+                try {
+                    out.write(buffer, 0, ByteArrayPointable.getFullLength(buffer, 0));
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+        };
+    }
+
+    // The following base64 related implementation is copied/changed base on javax.xml.bind.DatatypeConverterImpl.java
+    private static final byte[] decodeMap = initDecodeMap();
+    private static final byte PADDING = 127;
+
+    private static byte[] initDecodeMap() {
+        byte[] map = new byte[128];
+        Arrays.fill(map, (byte) -1);
+
+        int i;
+        for (i = 'A'; i <= 'Z'; i++) {
+            map[i] = (byte) (i - 'A');
+        }
+        for (i = 'a'; i <= 'z'; i++) {
+            map[i] = (byte) (i - 'a' + 26);
+        }
+        for (i = '0'; i <= '9'; i++) {
+            map[i] = (byte) (i - '0' + 52);
+        }
+        map['+'] = 62;
+        map['/'] = 63;
+        map['='] = PADDING;
+
+        return map;
+    }
+
+    /**
+     * computes the length of binary data speculatively.
+     * Our requirement is to create byte[] of the exact length to store the binary data.
+     * If we do this in a straight-forward way, it takes two passes over the data.
+     * Experiments show that this is a non-trivial overhead (35% or so is spent on
+     * the first pass in calculating the length.)
+     * So the approach here is that we compute the length speculatively, without looking
+     * at the whole contents. The obtained speculative value is never less than the
+     * actual length of the binary data, but it may be bigger. So if the speculation
+     * goes wrong, we'll pay the cost of reallocation and buffer copying.
+     * If the base64 text is tightly packed with no indentation nor illegal char
+     * (like what most web services produce), then the speculation of this method
+     * will be correct, so we get the performance benefit.
+     */
+    private static int guessLength(char[] chars, int start, int length) {
+
+        // compute the tail '=' chars
+        int j = length - 1;
+        for (; j >= 0; j--) {
+            byte code = decodeMap[chars[start + j]];
+            if (code == PADDING) {
+                continue;
+            }
+            if (code == -1) // most likely this base64 text is indented. go with the upper bound
+            {
+                return length / 4 * 3;
+            }
+            break;
+        }
+
+        j++;    // text.charAt(j) is now at some base64 char, so +1 to make it the size
+        int padSize = length - j;
+        if (padSize > 2) // something is wrong with base64. be safe and go with the upper bound
+        {
+            return length / 4 * 3;
+        }
+
+        // so far this base64 looks like it's unindented tightly packed base64.
+        // take a chance and create an array with the expected size
+        return length / 4 * 3 - padSize;
+    }
+
+    private static int guessLength(byte[] chars, int start, int length) {
+
+        // compute the tail '=' chars
+        int j = length - 1;
+        for (; j >= 0; j--) {
+            byte code = decodeMap[chars[start + j]];
+            if (code == PADDING) {
+                continue;
+            }
+            if (code == -1) // most likely this base64 text is indented. go with the upper bound
+            {
+                return length / 4 * 3;
+            }
+            break;
+        }
+
+        j++;    // text.charAt(j) is now at some base64 char, so +1 to make it the size
+        int padSize = length - j;
+        if (padSize > 2) // something is wrong with base64. be safe and go with the upper bound
+        {
+            return length / 4 * 3;
+        }
+
+        // so far this base64 looks like it's unindented tightly packed base64.
+        // take a chance and create an array with the expected size
+        return length / 4 * 3 - padSize;
+    }
+
+    public static byte[] extractPointableArrayFromBase64String(byte[] input, int start, int length,
+            byte[] bufferNeedToReset, byte[] quadruplet)
+            throws HyracksDataException {
+        int contentOffset = ByteArrayPointable.SIZE_OF_LENGTH;
+        final int buflen = guessLength(input, start, length) + contentOffset;
+        bufferNeedToReset = ByteArrayHexParserFactory.ensureCapacity(buflen, bufferNeedToReset);
+        int byteArrayLength = parseBase64String(input, start, length, bufferNeedToReset, contentOffset,
+                quadruplet);
+        if (byteArrayLength > ByteArrayPointable.MAX_LENGTH) {
+            throw new HyracksDataException("The decoded byte array is too long.");
+        }
+        ByteArrayPointable.putLength(byteArrayLength, bufferNeedToReset, 0);
+        return bufferNeedToReset;
+    }
+
+    public static byte[] extractPointableArrayFromBase64String(char[] input, int start, int length,
+            byte[] bufferNeedToReset, byte[] quadruplet)
+            throws HyracksDataException {
+        int contentOffset = ByteArrayPointable.SIZE_OF_LENGTH;
+        final int buflen = guessLength(input, start, length) + contentOffset;
+        bufferNeedToReset = ByteArrayHexParserFactory.ensureCapacity(buflen, bufferNeedToReset);
+        int byteArrayLength = parseBase64String(input, start, length, bufferNeedToReset, contentOffset,
+                quadruplet);
+        if (byteArrayLength > ByteArrayPointable.MAX_LENGTH) {
+            throw new HyracksDataException("The decoded byte array is too long.");
+        }
+        ByteArrayPointable.putLength(byteArrayLength, bufferNeedToReset, 0);
+        return bufferNeedToReset;
+    }
+
+    static int parseBase64String(char[] input, int start, int length, byte[] out, int offset,
+            byte[] quadruplet) throws HyracksDataException {
+        int outLength = 0;
+
+        int i;
+        int q = 0;
+
+        // convert each quadruplet to three bytes.
+        for (i = 0; i < length; i++) {
+            char ch = input[start + i];
+            byte v = decodeMap[ch];
+
+            if (v == -1) {
+                throw new HyracksDataException("Invalid Base64 character");
+            }
+            quadruplet[q++] = v;
+
+            if (q == 4) {
+                // quadruplet is now filled.
+                out[offset + outLength++] = (byte) ((quadruplet[0] << 2) | (quadruplet[1] >> 4));
+                if (quadruplet[2] != PADDING) {
+                    out[offset + outLength++] = (byte) ((quadruplet[1] << 4) | (quadruplet[2] >> 2));
+                }
+                if (quadruplet[3] != PADDING) {
+                    out[offset + outLength++] = (byte) ((quadruplet[2] << 6) | (quadruplet[3]));
+                }
+                q = 0;
+            }
+        }
+
+        return outLength;
+    }
+
+    static int parseBase64String(byte[] input, int start, int length, byte[] out, int offset,
+            byte[] quadruplet) throws HyracksDataException {
+        int outLength = 0;
+
+        int i;
+        int q = 0;
+
+        // convert each quadruplet to three bytes.
+        for (i = 0; i < length; i++) {
+            char ch = (char)input[start + i];
+            byte v = decodeMap[ch];
+
+            if (v == -1) {
+                throw new HyracksDataException("Invalid Base64 character");
+            }
+            quadruplet[q++] = v;
+
+            if (q == 4) {
+                // quadruplet is now filled.
+                out[offset + outLength++] = (byte) ((quadruplet[0] << 2) | (quadruplet[1] >> 4));
+                if (quadruplet[2] != PADDING) {
+                    out[offset + outLength++] = (byte) ((quadruplet[1] << 4) | (quadruplet[2] >> 2));
+                }
+                if (quadruplet[3] != PADDING) {
+                    out[offset + outLength++] = (byte) ((quadruplet[2] << 6) | (quadruplet[3]));
+                }
+                q = 0;
+            }
+        }
+
+        return outLength;
+    }
+}
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/ByteArrayHexParserFactory.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/ByteArrayHexParserFactory.java
new file mode 100644
index 0000000..e8410ea
--- /dev/null
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/ByteArrayHexParserFactory.java
@@ -0,0 +1,128 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.common.data.parsers;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.primitive.ByteArrayPointable;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+
+public class ByteArrayHexParserFactory implements IValueParserFactory {
+    public static ByteArrayHexParserFactory INSTANCE = new ByteArrayHexParserFactory();
+
+    private ByteArrayHexParserFactory() {
+    }
+
+    @Override public IValueParser createValueParser() {
+        return new IValueParser() {
+            private byte[] buffer = new byte[] { };
+
+            @Override public void parse(char[] input, int start, int length, DataOutput out)
+                    throws HyracksDataException {
+                try {
+                    buffer = extractPointableArrayFromHexString(input, start, length, buffer);
+                    out.write(buffer, 0, ByteArrayPointable.getFullLength(buffer, 0));
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+        };
+    }
+
+    public static boolean isValidHexChar(char c) {
+        if (c >= '0' && c <= '9'
+                || c >= 'a' && c <= 'f'
+                || c >= 'A' && c <= 'F') {
+            return true;
+        }
+        return false;
+    }
+
+    public static byte[] extractPointableArrayFromHexString(char[] input, int start, int length,
+            byte[] bufferNeedToReset) throws HyracksDataException {
+        if (length % 2 != 0) {
+            throw new HyracksDataException(
+                    "Invalid hex string for binary type: the string length should be a muliple of 2.");
+        }
+        int byteLength = length / 2;
+        bufferNeedToReset = ensureCapacity(byteLength + ByteArrayPointable.SIZE_OF_LENGTH, bufferNeedToReset);
+        extractByteArrayFromHexString(input, start, length, bufferNeedToReset,
+                ByteArrayPointable.SIZE_OF_LENGTH);
+        if (byteLength > ByteArrayPointable.MAX_LENGTH) {
+            throw new HyracksDataException("The decoded byte array is too long.");
+        }
+        ByteArrayPointable.putLength(byteLength, bufferNeedToReset, 0);
+        return bufferNeedToReset;
+    }
+
+    public static byte[] extractPointableArrayFromHexString(byte[] input, int start, int length,
+            byte[] bufferNeedToReset) throws HyracksDataException {
+        if (length % 2 != 0) {
+            throw new HyracksDataException(
+                    "Invalid hex string for binary type: the string length should be a muliple of 2.");
+        }
+        int byteLength = length / 2;
+        bufferNeedToReset = ensureCapacity(byteLength + ByteArrayPointable.SIZE_OF_LENGTH, bufferNeedToReset);
+        extractByteArrayFromHexString(input, start, length, bufferNeedToReset,
+                ByteArrayPointable.SIZE_OF_LENGTH);
+        if (byteLength > ByteArrayPointable.MAX_LENGTH) {
+            throw new HyracksDataException("The decoded byte array is too long.");
+        }
+        ByteArrayPointable.putLength(byteLength, bufferNeedToReset, 0);
+        return bufferNeedToReset;
+    }
+
+    static byte[] ensureCapacity(int capacity, byte[] original) {
+        if (original == null) {
+            return new byte[capacity];
+        }
+        if (original.length < capacity) {
+            return Arrays.copyOf(original, capacity);
+        }
+        return original;
+    }
+
+    private static int getValueFromValidHexChar(char c) throws HyracksDataException {
+        if (!isValidHexChar(c)) {
+            throw new HyracksDataException("Invalid hex character : " + c);
+        }
+        if (c >= '0' && c <= '9') {
+            return c - '0';
+        }
+        if (c >= 'a' && c <= 'f') {
+            return 10 + c - 'a';
+        }
+        return 10 + c - 'A';
+    }
+
+    private static void extractByteArrayFromHexString(char[] input, int start, int length, byte[] output,
+            int offset) throws HyracksDataException {
+        for (int i = 0; i < length; i += 2) {
+            output[offset + i / 2] = (byte) ((getValueFromValidHexChar(input[start + i]) << 4) +
+                    getValueFromValidHexChar(input[start + i + 1]));
+        }
+    }
+
+    private static void extractByteArrayFromHexString(byte[] input, int start, int length, byte[] output,
+            int offset) throws HyracksDataException {
+        for (int i = 0; i < length; i += 2) {
+            output[offset + i / 2] = (byte) ((getValueFromValidHexChar((char)input[start + i]) << 4) +
+                    getValueFromValidHexChar((char)input[start + i + 1]));
+        }
+    }
+}
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/util/StringUtils.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/util/StringUtils.java
index c63ca2d..fc4e8f7 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/util/StringUtils.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/util/StringUtils.java
@@ -38,4 +38,6 @@
         dos.write((len >>> 8) & 0xFF);
         dos.write((len >>> 0) & 0xFF);
     }
+
+
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-common/src/test/java/edu/uci/ics/hyracks/dataflow/common/data/marshalling/ByteArraySerializerDeserializerTest.java b/hyracks/hyracks-dataflow-common/src/test/java/edu/uci/ics/hyracks/dataflow/common/data/marshalling/ByteArraySerializerDeserializerTest.java
new file mode 100644
index 0000000..2c33156
--- /dev/null
+++ b/hyracks/hyracks-dataflow-common/src/test/java/edu/uci/ics/hyracks/dataflow/common/data/marshalling/ByteArraySerializerDeserializerTest.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.common.data.marshalling;
+
+import edu.uci.ics.hyracks.data.std.primitive.ByteArrayPointable;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.util.Arrays;
+import java.util.Random;
+
+import static org.junit.Assert.assertTrue;
+
+public class ByteArraySerializerDeserializerTest {
+    Random random = new Random();
+
+    public static byte[] generateRandomBytes(int maxSize, Random random) {
+        int size = random.nextInt(maxSize);
+        byte[] bytes = new byte[size + ByteArrayPointable.SIZE_OF_LENGTH];
+        random.nextBytes(bytes);
+        ByteArrayPointable.putLength(size, bytes, 0);
+        return bytes;
+    }
+
+    @Test
+    public void testSerializeDeserializeRandomBytes() throws Exception {
+        for (int i = 0; i < 10; ++i) {
+            ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+            byte[] randomBytes = generateRandomBytes(ByteArrayPointable.MAX_LENGTH + 1, random);
+
+            ByteArraySerializerDeserializer.INSTANCE.serialize(randomBytes, new DataOutputStream(outputStream));
+            byte[] result = outputStream.toByteArray();
+            assertTrue(Arrays.equals(randomBytes, result));
+
+            ByteArrayInputStream inputStream = new ByteArrayInputStream(result);
+            assertTrue(Arrays.equals(randomBytes,
+                    ByteArraySerializerDeserializer.INSTANCE.deserialize(new DataInputStream(inputStream))));
+        }
+
+    }
+
+    @Test
+    public void testPutGetLength() throws Exception {
+        final int size = 5;
+        byte[] newBytes = new byte[size];
+        for (int i = 0; i < 10; ++i) {
+            int length = random.nextInt(ByteArrayPointable.MAX_LENGTH +1);
+            for (int j = 0; j < size - 1; ++j) {
+                ByteArrayPointable.putLength(length, newBytes, j);
+                int result = ByteArrayPointable.getLength(newBytes, j);
+                assertTrue(result == length);
+            }
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-common/src/test/java/edu/uci/ics/hyracks/dataflow/common/data/normalizers/ByteArrayNormalizedKeyComputerFactoryTest.java b/hyracks/hyracks-dataflow-common/src/test/java/edu/uci/ics/hyracks/dataflow/common/data/normalizers/ByteArrayNormalizedKeyComputerFactoryTest.java
new file mode 100644
index 0000000..e803db3
--- /dev/null
+++ b/hyracks/hyracks-dataflow-common/src/test/java/edu/uci/ics/hyracks/dataflow/common/data/normalizers/ByteArrayNormalizedKeyComputerFactoryTest.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.common.data.normalizers;
+
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.data.std.primitive.ByteArrayPointable;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.ByteArraySerializerDeserializerTest;
+import org.junit.Test;
+
+import java.util.Random;
+
+import static junit.framework.Assert.assertTrue;
+
+public class ByteArrayNormalizedKeyComputerFactoryTest {
+
+    Random random = new Random();
+
+    INormalizedKeyComputer computer = ByteArrayNormalizedKeyComputerFactory.INSTANCE.createNormalizedKeyComputer();
+
+    public static ByteArrayPointable generateRandomByteArrayPointable(int maxSize, Random random) {
+        byte[] bytes = ByteArraySerializerDeserializerTest
+                .generateRandomBytes(maxSize, random);
+        ByteArrayPointable pointable = new ByteArrayPointable();
+        pointable.set(bytes, 0, bytes.length);
+        return pointable;
+    }
+
+    @Test
+    public void testRandomNormalizedKey() {
+        for (int i = 0; i < 10; ++i) {
+            ByteArrayPointable pointable1 = generateRandomByteArrayPointable(ByteArrayPointable.MAX_LENGTH + 1,
+                    random);
+
+            ByteArrayPointable pointable2 = generateRandomByteArrayPointable(ByteArrayPointable.MAX_LENGTH + 1,
+                    random);
+            assertNormalizeValue(pointable1, pointable2, computer);
+        }
+    }
+
+    public static ByteArrayPointable generateRandomByteArrayPointableWithFixLength(int length, Random random) {
+        byte[] bytes = new byte[length + ByteArrayPointable.SIZE_OF_LENGTH];
+        random.nextBytes(bytes);
+        ByteArrayPointable pointable = new ByteArrayPointable();
+        ByteArrayPointable.putLength(length, bytes, 0);
+        pointable.set(bytes, 0, bytes.length);
+        return pointable;
+    }
+
+    public static void assertNormalizeValue(ByteArrayPointable pointable1, ByteArrayPointable pointable2,
+            INormalizedKeyComputer computer) {
+        int n1 = computer.normalize(pointable1.getByteArray(), pointable1.getStartOffset(), pointable1.getLength());
+        int n2 = computer.normalize(pointable2.getByteArray(), pointable2.getStartOffset(), pointable2.getLength());
+        if (n1 < n2) {
+            assertTrue(pointable1.compareTo(pointable2) < 0);
+        } else if (n1 > n2) {
+            assertTrue(pointable1.compareTo(pointable2) > 0);
+        }
+    }
+
+    @Test
+    public void testCornerCase() {
+        for (int len = 0; len < 4; ++len) {
+            ByteArrayPointable pointable1 = generateRandomByteArrayPointableWithFixLength(len, random);
+            ByteArrayPointable pointable2 = generateRandomByteArrayPointableWithFixLength(len, random);
+            assertNormalizeValue(pointable1, pointable2, computer);
+        }
+
+        byte[] bytes1 = new byte[] { 0, 4, 0, 25, 34, 42 };
+        byte[] bytes2 = new byte[] { 0, 4, (byte) 130, 25, 34, 42 };
+
+        int n1 = computer.normalize(bytes1, 0, bytes1.length);
+        int n2 = computer.normalize(bytes2, 0, bytes2.length);
+        assertTrue(n1 < n2);
+
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-common/src/test/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/ByteArrayBase64ParserFactoryTest.java b/hyracks/hyracks-dataflow-common/src/test/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/ByteArrayBase64ParserFactoryTest.java
new file mode 100644
index 0000000..34de7fb
--- /dev/null
+++ b/hyracks/hyracks-dataflow-common/src/test/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/ByteArrayBase64ParserFactoryTest.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.common.data.parsers;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.primitive.ByteArrayPointable;
+import junit.framework.TestCase;
+import org.junit.Test;
+
+import javax.xml.bind.DatatypeConverter;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.util.Arrays;
+
+import static edu.uci.ics.hyracks.dataflow.common.data.parsers.ByteArrayHexParserFactoryTest.subArray;
+
+public class ByteArrayBase64ParserFactoryTest extends TestCase {
+
+    @Test
+    public void testParseBase64String() throws HyracksDataException {
+        IValueParser parser = ByteArrayBase64ParserFactory.INSTANCE.createValueParser();
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutputStream outputStream = new DataOutputStream(bos);
+        String empty = "";
+
+        parser.parse(empty.toCharArray(), 0, empty.length(), outputStream);
+
+        byte[] cache = bos.toByteArray();
+        assertTrue(ByteArrayPointable.getLength(cache, 0) == 0);
+        assertTrue(DatatypeConverter.printBase64Binary(subArray(cache, 2)).equalsIgnoreCase(empty));
+
+        StringBuilder everyChar = new StringBuilder();
+        for (char c = 'a'; c <= 'z'; c++) {
+            everyChar.append(c);
+        }
+        for (char c = 'A'; c <= 'Z'; c++) {
+            everyChar.append(c);
+        }
+        for (char c = '0'; c <= '9'; c++) {
+            everyChar.append(c);
+        }
+        everyChar.append("+/");
+
+        bos.reset();
+        parser.parse(everyChar.toString().toCharArray(), 0, everyChar.length(), outputStream);
+        cache = bos.toByteArray();
+        byte[] answer = DatatypeConverter.parseBase64Binary(everyChar.toString());
+        assertTrue(ByteArrayPointable.getLength(cache, 0) == answer.length);
+        assertTrue(Arrays.equals(answer, subArray(cache, 2)));
+
+        byte[] maxBytes = new byte[ByteArrayPointable.MAX_LENGTH];
+        Arrays.fill(maxBytes, (byte) 0xff);
+        String maxString = DatatypeConverter.printBase64Binary(maxBytes);
+        bos.reset();
+        parser.parse(maxString.toCharArray(), 0, maxString.length(), outputStream);
+        cache = bos.toByteArray();
+        assertTrue(ByteArrayPointable.getLength(cache, 0) == maxBytes.length);
+        assertTrue(Arrays.equals(maxBytes, subArray(cache, 2)));
+    }
+
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-common/src/test/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/ByteArrayHexParserFactoryTest.java b/hyracks/hyracks-dataflow-common/src/test/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/ByteArrayHexParserFactoryTest.java
new file mode 100644
index 0000000..7da2bdf
--- /dev/null
+++ b/hyracks/hyracks-dataflow-common/src/test/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/ByteArrayHexParserFactoryTest.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.common.data.parsers;
+
+import edu.uci.ics.hyracks.data.std.primitive.ByteArrayPointable;
+import org.junit.Test;
+
+import javax.xml.bind.DatatypeConverter;
+import java.util.Arrays;
+
+import static org.junit.Assert.assertTrue;
+
+public class ByteArrayHexParserFactoryTest {
+
+    public static byte[] subArray(byte[] bytes, int start) {
+        return Arrays.copyOfRange(bytes, start, bytes.length);
+    }
+
+    @Test
+    public void testExtractPointableArrayFromHexString() throws Exception {
+        byte[] cache = new byte[] { };
+
+        String empty = "";
+        cache = ByteArrayHexParserFactory
+                .extractPointableArrayFromHexString(empty.toCharArray(), 0, empty.length(), cache);
+
+        assertTrue(ByteArrayPointable.getLength(cache, 0) == 0);
+        assertTrue(DatatypeConverter.printHexBinary(subArray(cache, 2)).equalsIgnoreCase(empty));
+
+        String everyChar = "ABCDEF0123456789";
+        cache = ByteArrayHexParserFactory
+                .extractPointableArrayFromHexString(everyChar.toCharArray(), 0, everyChar.length(), cache);
+        assertTrue(ByteArrayPointable.getLength(cache, 0) == everyChar.length() / 2);
+        assertTrue(DatatypeConverter.printHexBinary(subArray(cache, 2)).equalsIgnoreCase(everyChar));
+
+        String lowercase = "0123456789abcdef";
+        cache = ByteArrayHexParserFactory
+                .extractPointableArrayFromHexString(lowercase.toCharArray(), 0, lowercase.length(), cache);
+        assertTrue(ByteArrayPointable.getLength(cache, 0) == lowercase.length() / 2);
+        assertTrue(DatatypeConverter.printHexBinary(subArray(cache, 2)).equalsIgnoreCase(lowercase));
+
+        char[] maxChars = new char[ByteArrayPointable.MAX_LENGTH  * 2];
+        Arrays.fill(maxChars, 'f');
+        String maxString = new String(maxChars);
+        cache = ByteArrayHexParserFactory
+                .extractPointableArrayFromHexString(maxString.toCharArray(), 0, maxString.length(), cache);
+        assertTrue(ByteArrayPointable.getLength(cache, 0) == maxString.length() / 2);
+        assertTrue(DatatypeConverter.printHexBinary(subArray(cache, 2)).equalsIgnoreCase(maxString));
+    }
+
+}
\ No newline at end of file