Moved to filescanners that do not need object construction

git-svn-id: https://hyracks.googlecode.com/svn/trunk/hyracks@75 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/ArrayTupleBuilder.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/ArrayTupleBuilder.java
index 996d7c3..09d8840 100644
--- a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/ArrayTupleBuilder.java
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/ArrayTupleBuilder.java
@@ -14,6 +14,7 @@
  */
 package edu.uci.ics.hyracks.dataflow.common.comm.io;
 
+import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
 
@@ -69,4 +70,21 @@
         serDeser.serialize(instance, dos);
         fEndOffsets[nextField++] = baaos.size();
     }
+
+    public void addField(byte[] bytes, int start, int length) throws HyracksDataException {
+        try {
+            dos.write(bytes, start, length);
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+        fEndOffsets[nextField++] = baaos.size();
+    }
+
+    public DataOutput getDataOutput() {
+        return dos;
+    }
+
+    public void addFieldEndOffset() {
+        fEndOffsets[nextField++] = baaos.size();
+    }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/util/FrameUtils.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/util/FrameUtils.java
index 06255c1..ed34fcb 100644
--- a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/util/FrameUtils.java
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/util/FrameUtils.java
@@ -16,6 +16,9 @@
 
 import java.nio.ByteBuffer;
 
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
 public class FrameUtils {
     public static void copy(ByteBuffer srcFrame, ByteBuffer destFrame) {
         makeReadable(srcFrame);
@@ -27,4 +30,12 @@
         frame.position(0);
         frame.limit(frame.capacity());
     }
+
+    public static void flushFrame(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
+        buffer.position(0);
+        buffer.limit(buffer.capacity());
+        writer.nextFrame(buffer);
+        buffer.position(0);
+        buffer.limit(buffer.capacity());
+    }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/comparators/StringBinaryComparatorFactory.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/comparators/UTF8StringBinaryComparatorFactory.java
similarity index 88%
rename from hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/comparators/StringBinaryComparatorFactory.java
rename to hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/comparators/UTF8StringBinaryComparatorFactory.java
index 8ab015d..439859d 100644
--- a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/comparators/StringBinaryComparatorFactory.java
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/comparators/UTF8StringBinaryComparatorFactory.java
@@ -18,12 +18,12 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.util.StringUtils;
 
-public class StringBinaryComparatorFactory implements IBinaryComparatorFactory {
+public class UTF8StringBinaryComparatorFactory implements IBinaryComparatorFactory {
     private static final long serialVersionUID = 1L;
 
-    public static final StringBinaryComparatorFactory INSTANCE = new StringBinaryComparatorFactory();
+    public static final UTF8StringBinaryComparatorFactory INSTANCE = new UTF8StringBinaryComparatorFactory();
 
-    private StringBinaryComparatorFactory() {
+    private UTF8StringBinaryComparatorFactory() {
     }
 
     @Override
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/hash/StringBinaryHashFunctionFactory.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/hash/UTF8StringBinaryHashFunctionFactory.java
similarity index 86%
rename from hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/hash/StringBinaryHashFunctionFactory.java
rename to hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/hash/UTF8StringBinaryHashFunctionFactory.java
index 3c358fc..98634b6 100644
--- a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/hash/StringBinaryHashFunctionFactory.java
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/hash/UTF8StringBinaryHashFunctionFactory.java
@@ -18,12 +18,12 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.util.StringUtils;
 
-public class StringBinaryHashFunctionFactory implements IBinaryHashFunctionFactory {
-    public static final StringBinaryHashFunctionFactory INSTANCE = new StringBinaryHashFunctionFactory();
+public class UTF8StringBinaryHashFunctionFactory implements IBinaryHashFunctionFactory {
+    public static final UTF8StringBinaryHashFunctionFactory INSTANCE = new UTF8StringBinaryHashFunctionFactory();
 
     private static final long serialVersionUID = 1L;
 
-    private StringBinaryHashFunctionFactory() {
+    private UTF8StringBinaryHashFunctionFactory() {
     }
 
     @Override
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/marshalling/StringSerializerDeserializer.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/marshalling/UTF8StringSerializerDeserializer.java
similarity index 85%
rename from hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/marshalling/StringSerializerDeserializer.java
rename to hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/marshalling/UTF8StringSerializerDeserializer.java
index bc6fd4a..0fa887f 100644
--- a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/marshalling/StringSerializerDeserializer.java
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/marshalling/UTF8StringSerializerDeserializer.java
@@ -21,12 +21,12 @@
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
-public class StringSerializerDeserializer implements ISerializerDeserializer<String> {
-    public static final StringSerializerDeserializer INSTANCE = new StringSerializerDeserializer();
+public class UTF8StringSerializerDeserializer implements ISerializerDeserializer<String> {
+    public static final UTF8StringSerializerDeserializer INSTANCE = new UTF8StringSerializerDeserializer();
 
     private static final long serialVersionUID = 1L;
 
-    private StringSerializerDeserializer() {
+    private UTF8StringSerializerDeserializer() {
     }
 
     @Override
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/DoubleParserFactory.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/DoubleParserFactory.java
new file mode 100644
index 0000000..6363c18
--- /dev/null
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/DoubleParserFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.common.data.parsers;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class DoubleParserFactory implements IValueParserFactory {
+    public static final IValueParserFactory INSTANCE = new DoubleParserFactory();
+
+    private static final long serialVersionUID = 1L;
+
+    private DoubleParserFactory() {
+    }
+
+    @Override
+    public IValueParser createValueParser() {
+        return new IValueParser() {
+            @Override
+            public void parse(char[] buffer, int start, int length, DataOutput out) throws HyracksDataException {
+                String s = String.valueOf(buffer, start, length);
+                try {
+                    out.writeDouble(Double.parseDouble(s));
+                } catch (NumberFormatException e) {
+                    throw new HyracksDataException(e);
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/FloatParserFactory.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/FloatParserFactory.java
new file mode 100644
index 0000000..d20c98e
--- /dev/null
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/FloatParserFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.common.data.parsers;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class FloatParserFactory implements IValueParserFactory {
+    public static final IValueParserFactory INSTANCE = new FloatParserFactory();
+
+    private static final long serialVersionUID = 1L;
+
+    private FloatParserFactory() {
+    }
+
+    @Override
+    public IValueParser createValueParser() {
+        return new IValueParser() {
+            @Override
+            public void parse(char[] buffer, int start, int length, DataOutput out) throws HyracksDataException {
+                String s = String.valueOf(buffer, start, length);
+                try {
+                    out.writeFloat(Float.parseFloat(s));
+                } catch (NumberFormatException e) {
+                    throw new HyracksDataException(e);
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/IValueParser.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/IValueParser.java
new file mode 100644
index 0000000..ac07ca2
--- /dev/null
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/IValueParser.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.common.data.parsers;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IValueParser {
+    void parse(char[] buffer, int start, int length, DataOutput out) throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/IValueParserFactory.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/IValueParserFactory.java
new file mode 100644
index 0000000..0d8725a
--- /dev/null
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/IValueParserFactory.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.common.data.parsers;
+
+import java.io.Serializable;
+
+public interface IValueParserFactory extends Serializable {
+    public IValueParser createValueParser();
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/IntegerParserFactory.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/IntegerParserFactory.java
new file mode 100644
index 0000000..3a71afa
--- /dev/null
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/IntegerParserFactory.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.common.data.parsers;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class IntegerParserFactory implements IValueParserFactory {
+    public static final IValueParserFactory INSTANCE = new IntegerParserFactory();
+
+    private static final long serialVersionUID = 1L;
+
+    private IntegerParserFactory() {
+    }
+
+    @Override
+    public IValueParser createValueParser() {
+        return new IValueParser() {
+            @Override
+            public void parse(char[] buffer, int start, int length, DataOutput out) throws HyracksDataException {
+                int n = 0;
+                int sign = 1;
+                int i = 0;
+                boolean pre = true;
+                for (; pre && i < length; ++i) {
+                    char ch = buffer[i + start];
+                    switch (ch) {
+                        case ' ':
+                        case '\t':
+                        case '\n':
+                        case '\r':
+                        case '\f':
+                            break;
+
+                        case '-':
+                            sign = -1;
+
+                        case '0':
+                        case '1':
+                        case '2':
+                        case '3':
+                        case '4':
+                        case '5':
+                        case '6':
+                        case '7':
+                        case '8':
+                        case '9':
+                            pre = false;
+                            break;
+
+                        default:
+                            throw new HyracksDataException("Encountered " + ch);
+                    }
+                }
+                boolean post = false;
+                for (; !post && i < length; ++i) {
+                    char ch = buffer[i + start];
+                    switch (ch) {
+                        case '0':
+                        case '1':
+                        case '2':
+                        case '3':
+                        case '4':
+                        case '5':
+                        case '6':
+                        case '7':
+                        case '8':
+                        case '9':
+                            n = n * 10 + (ch - '0');
+                            break;
+                    }
+                }
+
+                for (; i < length; ++i) {
+                    char ch = buffer[i + start];
+                    switch (ch) {
+                        case ' ':
+                        case '\t':
+                        case '\n':
+                        case '\r':
+                        case '\f':
+                            break;
+
+                        default:
+                            throw new HyracksDataException("Encountered " + ch);
+                    }
+                }
+
+                try {
+                    out.writeInt(n * sign);
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/LongParserFactory.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/LongParserFactory.java
new file mode 100644
index 0000000..927d21c
--- /dev/null
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/LongParserFactory.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.common.data.parsers;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class LongParserFactory implements IValueParserFactory {
+    public static final IValueParserFactory INSTANCE = new LongParserFactory();
+
+    private static final long serialVersionUID = 1L;
+
+    private LongParserFactory() {
+    }
+
+    @Override
+    public IValueParser createValueParser() {
+        return new IValueParser() {
+            @Override
+            public void parse(char[] buffer, int start, int length, DataOutput out) throws HyracksDataException {
+                long n = 0;
+                int sign = 1;
+                int i = 0;
+                boolean pre = true;
+                for (; pre && i < length; ++i) {
+                    char ch = buffer[i + start];
+                    switch (ch) {
+                        case ' ':
+                        case '\t':
+                        case '\n':
+                        case '\r':
+                        case '\f':
+                            break;
+
+                        case '-':
+                            sign = -1;
+
+                        case '0':
+                        case '1':
+                        case '2':
+                        case '3':
+                        case '4':
+                        case '5':
+                        case '6':
+                        case '7':
+                        case '8':
+                        case '9':
+                            pre = false;
+                            break;
+
+                        default:
+                            throw new HyracksDataException("Encountered " + ch);
+                    }
+                }
+                boolean post = false;
+                for (; !post && i < length; ++i) {
+                    char ch = buffer[i + start];
+                    switch (ch) {
+                        case '0':
+                        case '1':
+                        case '2':
+                        case '3':
+                        case '4':
+                        case '5':
+                        case '6':
+                        case '7':
+                        case '8':
+                        case '9':
+                            n = n * 10 + (ch - '0');
+                            break;
+                    }
+                }
+
+                for (; i < length; ++i) {
+                    char ch = buffer[i + start];
+                    switch (ch) {
+                        case ' ':
+                        case '\t':
+                        case '\n':
+                        case '\r':
+                        case '\f':
+                            break;
+
+                        default:
+                            throw new HyracksDataException("Encountered " + ch);
+                    }
+                }
+
+                try {
+                    out.writeLong(n * sign);
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/UTF8StringParserFactory.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/UTF8StringParserFactory.java
new file mode 100644
index 0000000..8ddddac
--- /dev/null
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/UTF8StringParserFactory.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.common.data.parsers;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class UTF8StringParserFactory implements IValueParserFactory {
+    public static final IValueParserFactory INSTANCE = new UTF8StringParserFactory();
+
+    private static final long serialVersionUID = 1L;
+
+    private UTF8StringParserFactory() {
+    }
+
+    @Override
+    public IValueParser createValueParser() {
+        return new IValueParser() {
+            private byte[] utf8;
+
+            @Override
+            public void parse(char[] buffer, int start, int length, DataOutput out) throws HyracksDataException {
+                int utflen = 0;
+                for (int i = 0; i < length; i++) {
+                    char ch = buffer[i + start];
+                    if ((ch >= 0x0001) && (ch <= 0x007F)) {
+                        utflen++;
+                    } else if (ch > 0x07ff) {
+                        utflen += 3;
+                    } else {
+                        utflen += 2;
+                    }
+                }
+
+                if (utf8 == null || utf8.length < utflen + 2) {
+                    utf8 = new byte[utflen + 2];
+                }
+
+                int count = 0;
+                utf8[count++] = (byte) ((utflen >>> 8) & 0xff);
+                utf8[count++] = (byte) ((utflen >>> 0) & 0xff);
+
+                int i = 0;
+                for (i = 0; i < length; i++) {
+                    char ch = buffer[i + start];
+                    if (!((ch >= 0x0001) && (ch <= 0x007F)))
+                        break;
+                    utf8[count++] = (byte) ch;
+                }
+
+                for (; i < length; i++) {
+                    char ch = buffer[i + start];
+                    if ((ch >= 0x0001) && (ch <= 0x007F)) {
+                        utf8[count++] = (byte) ch;
+                    } else if (ch > 0x07FF) {
+                        utf8[count++] = (byte) (0xE0 | ((ch >> 12) & 0x0F));
+                        utf8[count++] = (byte) (0x80 | ((ch >> 6) & 0x3F));
+                        utf8[count++] = (byte) (0x80 | ((ch >> 0) & 0x3F));
+                    } else {
+                        utf8[count++] = (byte) (0xC0 | ((ch >> 6) & 0x1F));
+                        utf8[count++] = (byte) (0x80 | ((ch >> 0) & 0x3F));
+                    }
+                }
+                try {
+                    out.write(utf8, 0, utflen + 2);
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractUnaryOutputOperatorNodePushable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractUnaryOutputOperatorNodePushable.java
new file mode 100644
index 0000000..cfe1b65
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractUnaryOutputOperatorNodePushable.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.base;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public abstract class AbstractUnaryOutputOperatorNodePushable implements IOperatorNodePushable {
+    protected IFrameWriter writer;
+    protected RecordDescriptor recordDesc;
+
+    @Override
+    public final void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+        this.writer = writer;
+        this.recordDesc = recordDesc;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractUnaryOutputSourceOperatorNodePushable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractUnaryOutputSourceOperatorNodePushable.java
new file mode 100644
index 0000000..bebf7d4
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractUnaryOutputSourceOperatorNodePushable.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.base;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public abstract class AbstractUnaryOutputSourceOperatorNodePushable extends AbstractUnaryOutputOperatorNodePushable {
+    @Override
+    public final void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        throw new UnsupportedOperationException();
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/AbstractFileScanOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/AbstractDeserializedFileScanOperatorDescriptor.java
similarity index 88%
rename from hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/AbstractFileScanOperatorDescriptor.java
rename to hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/AbstractDeserializedFileScanOperatorDescriptor.java
index 43d821c..cea61f1 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/AbstractFileScanOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/AbstractDeserializedFileScanOperatorDescriptor.java
@@ -28,12 +28,12 @@
 import edu.uci.ics.hyracks.dataflow.std.base.IOpenableDataWriterOperator;
 import edu.uci.ics.hyracks.dataflow.std.util.DeserializedOperatorNodePushable;
 
-public abstract class AbstractFileScanOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+public abstract class AbstractDeserializedFileScanOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
     private static final long serialVersionUID = 1L;
 
     protected FileSplit[] splits;
 
-    public AbstractFileScanOperatorDescriptor(JobSpecification spec, FileSplit[] splits,
+    public AbstractDeserializedFileScanOperatorDescriptor(JobSpecification spec, FileSplit[] splits,
             RecordDescriptor recordDescriptor) {
         super(spec, 0, 1);
         recordDescriptors[0] = recordDescriptor;
@@ -44,11 +44,11 @@
 
     protected abstract void configure() throws Exception;
 
-    protected class FileScanOperator implements IOpenableDataWriterOperator {
+    protected class DeserializedFileScanOperator implements IOpenableDataWriterOperator {
         private IOpenableDataWriter<Object[]> writer;
         private int index;
 
-        FileScanOperator(int index) {
+        DeserializedFileScanOperator(int index) {
             this.index = index;
         }
 
@@ -102,6 +102,6 @@
     @Override
     public IOperatorNodePushable createPushRuntime(IHyracksContext ctx, IOperatorEnvironment env,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
-        return new DeserializedOperatorNodePushable(ctx, new FileScanOperator(partition), null);
+        return new DeserializedOperatorNodePushable(ctx, new DeserializedFileScanOperator(partition), null);
     }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/CSVFileScanOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/CSVFileScanOperatorDescriptor.java
deleted file mode 100644
index 54724a4..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/CSVFileScanOperatorDescriptor.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.file;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-
-public class CSVFileScanOperatorDescriptor extends
-        AbstractFileScanOperatorDescriptor {
-    private static class CSVRecordReaderImpl implements IRecordReader {
-        private final BufferedReader in;
-        private final char separator;
-        private final String quotes;
-
-        CSVRecordReaderImpl(File file, RecordDescriptor desc, char separator,
-                String quotes) throws Exception {
-            in = new BufferedReader(new InputStreamReader(new FileInputStream(
-                    file)));
-            this.separator = separator;
-            this.quotes = quotes;
-        }
-
-        @Override
-        public void close() {
-            try {
-                in.close();
-            } catch (IOException e) {
-                e.printStackTrace();
-            }
-        }
-
-        @Override
-        public boolean read(Object[] record) throws Exception {
-            String line = in.readLine();
-            if (line == null) {
-                return false;
-            }
-            int fid = 0;
-            char[] chars = line.toCharArray();
-            int i = 0;
-            boolean insideQuote = false;
-            char quoteChar = 0;
-            int partStart = 0;
-            boolean skipNext = false;
-            while (fid < record.length && i < chars.length) {
-                char c = chars[i];
-                if (!skipNext) {
-                    if (quotes.indexOf(c) >= 0) {
-                        if (insideQuote) {
-                            if (quoteChar == c) {
-                                insideQuote = false;
-                            }
-                        } else {
-                            insideQuote = true;
-                            quoteChar = c;
-                        }
-                    } else if (c == separator) {
-                        if (!insideQuote) {
-                            record[fid++] = String.valueOf(chars, partStart, i
-                                    - partStart);
-                            partStart = i + 1;
-                        }
-                    } else if (c == '\\') {
-                        skipNext = true;
-                    }
-                } else {
-                    skipNext = false;
-                }
-                ++i;
-            }
-            if (fid < record.length) {
-                record[fid] = String.valueOf(chars, partStart, i - partStart);
-            }
-            return true;
-        }
-    }
-
-    private static final long serialVersionUID = 1L;
-
-    private final char separator;
-    private final String quotes;
-
-    public CSVFileScanOperatorDescriptor(JobSpecification spec,
-            FileSplit[] splits, RecordDescriptor recordDescriptor) {
-        this(spec, splits, recordDescriptor, ',', "'\"");
-    }
-
-    public CSVFileScanOperatorDescriptor(JobSpecification spec,
-            FileSplit[] splits, RecordDescriptor recordDescriptor,
-            char separator, String quotes) {
-        super(spec, splits, recordDescriptor);
-        this.separator = separator;
-        this.quotes = quotes;
-    }
-
-    @Override
-    protected IRecordReader createRecordReader(File file, RecordDescriptor desc)
-            throws Exception {
-        return new CSVRecordReaderImpl(file, desc, separator, quotes);
-    }
-
-	@Override
-	protected void configure() throws Exception {
-		// currently a no-op, but is meant to initialize , if required before it is asked 
-		// to create a record reader
-		// this is executed at the node and is useful for operators that could not be 
-		// initialized from the client completely, because of lack of information specific 
-		// to the node where the operator gets executed. 
-		
-	}
-}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/ConstantFileSplitProvider.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/ConstantFileSplitProvider.java
new file mode 100644
index 0000000..21f5e3f
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/ConstantFileSplitProvider.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.file;
+
+public class ConstantFileSplitProvider implements IFileSplitProvider {
+    private static final long serialVersionUID = 1L;
+    private final FileSplit[] splits;
+
+    public ConstantFileSplitProvider(FileSplit[] splits) {
+        this.splits = splits;
+    }
+
+    @Override
+    public FileSplit[] getFileSplits() {
+        return splits;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
new file mode 100644
index 0000000..716f587
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
@@ -0,0 +1,256 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.file;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParser;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+
+public class DelimitedDataTupleParserFactory implements ITupleParserFactory {
+    private static final long serialVersionUID = 1L;
+    private IValueParserFactory[] valueParserFactories;
+    private char fieldDelimiter;
+
+    public DelimitedDataTupleParserFactory(IValueParserFactory[] fieldParserFactories, char fieldDelimiter) {
+        this.valueParserFactories = fieldParserFactories;
+        this.fieldDelimiter = fieldDelimiter;
+    }
+
+    @Override
+    public ITupleParser createTupleParser(final IHyracksContext ctx) {
+        return new ITupleParser() {
+            @Override
+            public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException {
+                try {
+                    IValueParser[] valueParsers = new IValueParser[valueParserFactories.length];
+                    for (int i = 0; i < valueParserFactories.length; ++i) {
+                        valueParsers[i] = valueParserFactories[i].createValueParser();
+                    }
+                    ByteBuffer frame = ctx.getResourceManager().allocateFrame();
+                    FrameTupleAppender appender = new FrameTupleAppender(ctx);
+                    appender.reset(frame, true);
+                    ArrayTupleBuilder tb = new ArrayTupleBuilder(valueParsers.length);
+                    DataOutput dos = tb.getDataOutput();
+
+                    FieldCursor cursor = new FieldCursor(new InputStreamReader(in));
+                    while (cursor.nextRecord()) {
+                        tb.reset();
+                        for (int i = 0; i < valueParsers.length; ++i) {
+                            if (!cursor.nextField()) {
+                                break;
+                            }
+                            valueParsers[i].parse(cursor.buffer, cursor.fStart, cursor.fEnd - cursor.fStart, dos);
+                            tb.addFieldEndOffset();
+                        }
+                        if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                            FrameUtils.flushFrame(frame, writer);
+                            appender.reset(frame, true);
+                            if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                                throw new IllegalStateException();
+                            }
+                        }
+                    }
+                    if (appender.getTupleCount() > 0) {
+                        FrameUtils.flushFrame(frame, writer);
+                    }
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+        };
+    }
+
+    private enum State {
+        INIT,
+        IN_RECORD,
+        EOR,
+        CR,
+        EOF
+    }
+
+    private class FieldCursor {
+        private static final int INITIAL_BUFFER_SIZE = 4096;
+        private static final int INCREMENT = 4096;
+
+        private final Reader in;
+
+        private char[] buffer;
+        private int start;
+        private int end;
+        private State state;
+
+        private int fStart;
+        private int fEnd;
+
+        public FieldCursor(Reader in) {
+            this.in = in;
+            buffer = new char[INITIAL_BUFFER_SIZE];
+            start = 0;
+            end = 0;
+            state = State.INIT;
+        }
+
+        public boolean nextRecord() throws IOException {
+            while (true) {
+                switch (state) {
+                    case INIT:
+                        boolean eof = !readMore();
+                        if (eof) {
+                            state = State.EOF;
+                            return false;
+                        } else {
+                            state = State.IN_RECORD;
+                            return true;
+                        }
+
+                    case IN_RECORD:
+                        int p = start;
+                        while (true) {
+                            if (p >= end) {
+                                int s = start;
+                                eof = !readMore();
+                                if (eof) {
+                                    state = State.EOF;
+                                    return start < end;
+                                }
+                                p -= (s - start);
+                            }
+                            char ch = buffer[p];
+                            if (ch == '\n') {
+                                start = p + 1;
+                                state = State.EOR;
+                                break;
+                            } else if (ch == '\r') {
+                                start = p + 1;
+                                state = State.CR;
+                                break;
+                            }
+                            ++p;
+                        }
+                        break;
+
+                    case CR:
+                        if (start >= end) {
+                            eof = !readMore();
+                            if (eof) {
+                                state = State.EOF;
+                                return false;
+                            }
+                        }
+                        char ch = buffer[start];
+                        if (ch == '\n') {
+                            ++start;
+                            state = State.EOR;
+                        } else {
+                            state = State.IN_RECORD;
+                            return true;
+                        }
+
+                    case EOR:
+                        if (start >= end) {
+                            eof = !readMore();
+                            if (eof) {
+                                state = State.EOF;
+                                return false;
+                            }
+                        }
+                        state = State.IN_RECORD;
+                        return start < end;
+
+                    case EOF:
+                        return false;
+                }
+            }
+        }
+
+        public boolean nextField() throws IOException {
+            switch (state) {
+                case INIT:
+                case EOR:
+                case EOF:
+                case CR:
+                    return false;
+
+                case IN_RECORD:
+                    boolean eof;
+                    int p = start;
+                    while (true) {
+                        if (p >= end) {
+                            int s = start;
+                            eof = !readMore();
+                            if (eof) {
+                                state = State.EOF;
+                                return true;
+                            }
+                            p -= (s - start);
+                        }
+                        char ch = buffer[p];
+                        if (ch == fieldDelimiter) {
+                            fStart = start;
+                            fEnd = p;
+                            start = p + 1;
+                            return true;
+                        } else if (ch == '\n') {
+                            fStart = start;
+                            fEnd = p;
+                            start = p + 1;
+                            state = State.EOR;
+                            return true;
+                        } else if (ch == '\r') {
+                            fStart = start;
+                            fEnd = p;
+                            start = p + 1;
+                            state = State.CR;
+                            return true;
+                        }
+                        ++p;
+                    }
+            }
+            throw new IllegalStateException();
+        }
+
+        private boolean readMore() throws IOException {
+            if (start > 0) {
+                System.arraycopy(buffer, start, buffer, 0, end - start);
+            }
+            end -= start;
+            start = 0;
+
+            if (end == buffer.length) {
+                buffer = Arrays.copyOf(buffer, buffer.length + INCREMENT);
+            }
+
+            int n = in.read(buffer, end, buffer.length - end);
+            if (n < 0) {
+                return false;
+            }
+            end += n;
+            return true;
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FileScanOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FileScanOperatorDescriptor.java
new file mode 100644
index 0000000..efc63d3
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FileScanOperatorDescriptor.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.file;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+
+public class FileScanOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+    private static final long serialVersionUID = 1L;
+
+    private final IFileSplitProvider fileSplitProvider;
+
+    private final ITupleParserFactory tupleParserFactory;
+
+    public FileScanOperatorDescriptor(JobSpecification spec, IFileSplitProvider fileSplitProvider,
+            ITupleParserFactory tupleParserFactory, RecordDescriptor rDesc) {
+        super(spec, 0, 1);
+        this.fileSplitProvider = fileSplitProvider;
+        this.tupleParserFactory = tupleParserFactory;
+        recordDescriptors[0] = rDesc;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(IHyracksContext ctx, IOperatorEnvironment env,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+        final FileSplit split = fileSplitProvider.getFileSplits()[partition];
+        final ITupleParser tp = tupleParserFactory.createTupleParser(ctx);
+        return new AbstractUnaryOutputSourceOperatorNodePushable() {
+            @Override
+            public void open() throws HyracksDataException {
+                File f = split.getLocalFile();
+                writer.open();
+                try {
+                    InputStream in;
+                    try {
+                        in = new FileInputStream(f);
+                    } catch (FileNotFoundException e) {
+                        throw new HyracksDataException(e);
+                    }
+                    tp.parse(in, writer);
+                } finally {
+                    writer.close();
+                }
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/IFileSplitProvider.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/IFileSplitProvider.java
new file mode 100644
index 0000000..90f65ab
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/IFileSplitProvider.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.file;
+
+import java.io.Serializable;
+
+public interface IFileSplitProvider extends Serializable {
+    public FileSplit[] getFileSplits();
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/ITupleParser.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/ITupleParser.java
new file mode 100644
index 0000000..5014e41
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/ITupleParser.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.file;
+
+import java.io.InputStream;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface ITupleParser {
+    public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/ITupleParserFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/ITupleParserFactory.java
new file mode 100644
index 0000000..708742b
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/ITupleParserFactory.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.file;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+
+public interface ITupleParserFactory extends Serializable {
+    public ITupleParser createTupleParser(IHyracksContext ctx);
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/LineFileScanOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/LineFileScanOperatorDescriptor.java
deleted file mode 100644
index 19c4ba9..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/LineFileScanOperatorDescriptor.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.file;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-
-public class LineFileScanOperatorDescriptor extends
-        AbstractFileScanOperatorDescriptor {
-    private static class LineReaderImpl extends  RecordReader {
-        private File file;
-
-        LineReaderImpl(File file) throws Exception {
-        	super(new Object[]{file});
-        	this.file = file;
-        }
-
-		@Override
-		public InputStream createInputStream(Object[] args) throws Exception{
-			this.file = (File)args[0];
-			return new FileInputStream(file) ;
-		}
-       }
-
-    private static final long serialVersionUID = 1L;
-
-    public LineFileScanOperatorDescriptor(JobSpecification spec,
-            FileSplit[] splits, RecordDescriptor recordDescriptor) {
-        super(spec, splits, recordDescriptor);
-    }
-
-    @Override
-    protected IRecordReader createRecordReader(File file, RecordDescriptor desc)
-            throws Exception {
-        return new LineReaderImpl(file);
-    }
-    
-    @Override
-	protected void configure() throws Exception {
-		// currently a no-op, but is meant to initialize , if required before it is asked 
-		// to create a record reader
-		// this is executed at the node and is useful for operators that could not be 
-		// initialized from the client completely, because of lack of information specific 
-		// to the node where the operator gets executed. 
-		
-	}
-}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/RecordFileScanOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/RecordFileScanOperatorDescriptor.java
index 9dd223e..aa6635d 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/RecordFileScanOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/RecordFileScanOperatorDescriptor.java
@@ -23,7 +23,7 @@
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 
-public class RecordFileScanOperatorDescriptor extends AbstractFileScanOperatorDescriptor {
+public class RecordFileScanOperatorDescriptor extends AbstractDeserializedFileScanOperatorDescriptor {
     private static final long serialVersionUID = 1L;
 
     public RecordFileScanOperatorDescriptor(JobSpecification spec, FileSplit[] splits, RecordDescriptor recordDescriptor) {
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/RecordReader.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/RecordReader.java
deleted file mode 100644
index 2064cc2..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/RecordReader.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.file;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-
-public  abstract class RecordReader implements IRecordReader {
-
-    private final BufferedReader bufferedReader;
-	private InputStream inputStream;
-	
-	@Override
-	public void close() {
-        try {
-        	bufferedReader.close();
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-    }
-
-    @Override
-    public boolean read(Object[] record) throws Exception {
-        String line = bufferedReader.readLine();
-        if (line == null) {
-            return false;
-        }
-        record[0] = line;
-        return true;
-    }
-	
-	public abstract InputStream createInputStream(Object[] args) throws Exception;
-	
-	public RecordReader(Object[] args) throws Exception{
-		try{
-			bufferedReader = new BufferedReader(new InputStreamReader(createInputStream(args)));
-		}catch(Exception e){
-			e.printStackTrace();
-			throw e;
-		}
-	}
-	
-}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
index 253e2c6..b2a9f39 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
@@ -105,7 +105,7 @@
     private void flushFrame(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
         buffer.position(0);
         buffer.limit(buffer.capacity());
-        writer.nextFrame(outBuffer);
+        writer.nextFrame(buffer);
         buffer.position(0);
         buffer.limit(buffer.capacity());
     }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java
new file mode 100644
index 0000000..3b7b02c
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java
@@ -0,0 +1,43 @@
+package edu.uci.ics.hyracks.dataflow.std.misc;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+
+public class NullSinkOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+    private static final long serialVersionUID = 1L;
+
+    public NullSinkOperatorDescriptor(JobSpecification spec) {
+        super(spec, 1, 0);
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(IHyracksContext ctx, IOperatorEnvironment env,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+        return new IOperatorNodePushable() {
+            @Override
+            public void open() throws HyracksDataException {
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+            }
+
+            @Override
+            public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/comm/SerializationDeserializationTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/comm/SerializationDeserializationTest.java
index ffcf92f..6ddc8ec 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/comm/SerializationDeserializationTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/comm/SerializationDeserializationTest.java
@@ -35,7 +35,7 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameDeserializingDataReader;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.SerializingDataWriter;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 
 public class SerializationDeserializationTest {
     private static final String DBLP_FILE = "data/dblp.txt";
@@ -129,7 +129,7 @@
     @Test
     public void serdeser01() throws Exception {
         RecordDescriptor rDes = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
         LineProcessor processor = new LineProcessor() {
             @Override
             public void process(String line, IDataWriter<Object[]> writer) throws Exception {
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java
index 5c07a16..c06bd40 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java
@@ -29,17 +29,22 @@
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.StringBinaryComparatorFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.comparators.StringComparatorFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.hash.StringBinaryHashFunctionFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.hash.UTF8StringBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
 import edu.uci.ics.hyracks.dataflow.std.aggregators.SumStringGroupAggregator;
 import edu.uci.ics.hyracks.dataflow.std.connectors.MToNHashPartitioningConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.MToNReplicatingConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.CSVFileScanOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
 import edu.uci.ics.hyracks.dataflow.std.group.PreclusteredGroupOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
@@ -50,85 +55,60 @@
     public void countOfCountsSingleNC() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileSplit[] splits = new FileSplit[] {
-            new FileSplit(NC1_ID, new File("data/words.txt"))
-        };
-        RecordDescriptor desc = new RecordDescriptor(new ISerializerDeserializer[] {
-            StringSerializerDeserializer.INSTANCE
-        });
+        FileSplit[] splits = new FileSplit[] { new FileSplit(NC1_ID, new File("data/words.txt")) };
+        IFileSplitProvider splitProvider = new ConstantFileSplitProvider(splits);
+        RecordDescriptor desc = new RecordDescriptor(
+                new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
 
-        CSVFileScanOperatorDescriptor csvScanner = new CSVFileScanOperatorDescriptor(spec, splits, desc);
-        PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
-            new AbsoluteLocationConstraint(NC1_ID)
-        });
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE },
+                        ','), desc);
+        PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(
+                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
         csvScanner.setPartitionConstraint(csvPartitionConstraint);
 
-        InMemorySortOperatorDescriptor sorter = new InMemorySortOperatorDescriptor(spec, new int[] {
-            0
-        }, new IBinaryComparatorFactory[] {
-            StringBinaryComparatorFactory.INSTANCE
-        }, desc);
-        PartitionConstraint sorterPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
-            new AbsoluteLocationConstraint(NC1_ID)
-        });
+        InMemorySortOperatorDescriptor sorter = new InMemorySortOperatorDescriptor(spec, new int[] { 0 },
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, desc);
+        PartitionConstraint sorterPartitionConstraint = new ExplicitPartitionConstraint(
+                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
         sorter.setPartitionConstraint(sorterPartitionConstraint);
 
         RecordDescriptor desc2 = new RecordDescriptor(new ISerializerDeserializer[] {
-            StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE
-        });
-        PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(spec, new int[] {
-            0
-        }, new IComparatorFactory[] {
-            StringComparatorFactory.INSTANCE
-        }, new SumStringGroupAggregator(0), desc2);
-        PartitionConstraint groupPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
-            new AbsoluteLocationConstraint(NC1_ID)
-        });
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+        PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 0 },
+                new IComparatorFactory[] { StringComparatorFactory.INSTANCE }, new SumStringGroupAggregator(0), desc2);
+        PartitionConstraint groupPartitionConstraint = new ExplicitPartitionConstraint(
+                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
         group.setPartitionConstraint(groupPartitionConstraint);
 
-        InMemorySortOperatorDescriptor sorter2 = new InMemorySortOperatorDescriptor(spec, new int[] {
-            1
-        }, new IBinaryComparatorFactory[] {
-            StringBinaryComparatorFactory.INSTANCE
-        }, desc2);
-        PartitionConstraint sorterPartitionConstraint2 = new ExplicitPartitionConstraint(new LocationConstraint[] {
-            new AbsoluteLocationConstraint(NC1_ID)
-        });
+        InMemorySortOperatorDescriptor sorter2 = new InMemorySortOperatorDescriptor(spec, new int[] { 1 },
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, desc2);
+        PartitionConstraint sorterPartitionConstraint2 = new ExplicitPartitionConstraint(
+                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
         sorter2.setPartitionConstraint(sorterPartitionConstraint2);
 
-        PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] {
-            1
-        }, new IComparatorFactory[] {
-            StringComparatorFactory.INSTANCE
-        }, new SumStringGroupAggregator(1), desc2);
-        PartitionConstraint groupPartitionConstraint2 = new ExplicitPartitionConstraint(new LocationConstraint[] {
-            new AbsoluteLocationConstraint(NC1_ID)
-        });
+        PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 1 },
+                new IComparatorFactory[] { StringComparatorFactory.INSTANCE }, new SumStringGroupAggregator(1), desc2);
+        PartitionConstraint groupPartitionConstraint2 = new ExplicitPartitionConstraint(
+                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
         group2.setPartitionConstraint(groupPartitionConstraint2);
 
         PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
-        PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
-            new AbsoluteLocationConstraint(NC1_ID)
-        });
+        PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
+                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
         printer.setPartitionConstraint(printerPartitionConstraint);
 
         IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(spec,
-            new FieldHashPartitionComputerFactory(new int[] {
-                0
-            }, new IBinaryHashFunctionFactory[] {
-                StringBinaryHashFunctionFactory.INSTANCE
-            }));
+                new FieldHashPartitionComputerFactory(new int[] { 0 },
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
         spec.connect(conn1, csvScanner, 0, sorter, 0);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, sorter, 0, group, 0);
 
         IConnectorDescriptor conn3 = new MToNHashPartitioningConnectorDescriptor(spec,
-            new FieldHashPartitionComputerFactory(new int[] {
-                1
-            }, new IBinaryHashFunctionFactory[] {
-                StringBinaryHashFunctionFactory.INSTANCE
-            }));
+                new FieldHashPartitionComputerFactory(new int[] { 1 },
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
         spec.connect(conn3, group, 0, sorter2, 0);
 
         IConnectorDescriptor conn4 = new OneToOneConnectorDescriptor(spec);
@@ -145,91 +125,62 @@
     public void countOfCountsMultiNC() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileSplit[] splits = new FileSplit[] {
-            new FileSplit(NC1_ID, new File("data/words.txt"))
-        };
-        RecordDescriptor desc = new RecordDescriptor(new ISerializerDeserializer[] {
-            StringSerializerDeserializer.INSTANCE
-        });
+        FileSplit[] splits = new FileSplit[] { new FileSplit(NC1_ID, new File("data/words.txt")) };
+        IFileSplitProvider splitProvider = new ConstantFileSplitProvider(splits);
+        RecordDescriptor desc = new RecordDescriptor(
+                new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
 
-        CSVFileScanOperatorDescriptor csvScanner = new CSVFileScanOperatorDescriptor(spec, splits, desc);
-        PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
-            new AbsoluteLocationConstraint(NC1_ID)
-        });
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE },
+                        ','), desc);
+        PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(
+                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
         csvScanner.setPartitionConstraint(csvPartitionConstraint);
 
-        InMemorySortOperatorDescriptor sorter = new InMemorySortOperatorDescriptor(spec, new int[] {
-            0
-        }, new IBinaryComparatorFactory[] {
-            StringBinaryComparatorFactory.INSTANCE
-        }, desc);
+        InMemorySortOperatorDescriptor sorter = new InMemorySortOperatorDescriptor(spec, new int[] { 0 },
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, desc);
         PartitionConstraint sorterPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
-            new AbsoluteLocationConstraint(NC1_ID),
-            new AbsoluteLocationConstraint(NC2_ID),
-            new AbsoluteLocationConstraint(NC1_ID),
-            new AbsoluteLocationConstraint(NC2_ID)
-        });
+                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID),
+                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
         sorter.setPartitionConstraint(sorterPartitionConstraint);
 
         RecordDescriptor desc2 = new RecordDescriptor(new ISerializerDeserializer[] {
-            StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE
-        });
-        PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(spec, new int[] {
-            0
-        }, new IComparatorFactory[] {
-            StringComparatorFactory.INSTANCE
-        }, new SumStringGroupAggregator(0), desc2);
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+        PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 0 },
+                new IComparatorFactory[] { StringComparatorFactory.INSTANCE }, new SumStringGroupAggregator(0), desc2);
         PartitionConstraint groupPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
-            new AbsoluteLocationConstraint(NC1_ID),
-            new AbsoluteLocationConstraint(NC2_ID),
-            new AbsoluteLocationConstraint(NC1_ID),
-            new AbsoluteLocationConstraint(NC2_ID)
-        });
+                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID),
+                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
         group.setPartitionConstraint(groupPartitionConstraint);
 
-        InMemorySortOperatorDescriptor sorter2 = new InMemorySortOperatorDescriptor(spec, new int[] {
-            1
-        }, new IBinaryComparatorFactory[] {
-            StringBinaryComparatorFactory.INSTANCE
-        }, desc2);
+        InMemorySortOperatorDescriptor sorter2 = new InMemorySortOperatorDescriptor(spec, new int[] { 1 },
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, desc2);
         PartitionConstraint sorterPartitionConstraint2 = new ExplicitPartitionConstraint(new LocationConstraint[] {
-            new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
-        });
+                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
         sorter2.setPartitionConstraint(sorterPartitionConstraint2);
 
-        PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] {
-            1
-        }, new IComparatorFactory[] {
-            StringComparatorFactory.INSTANCE
-        }, new SumStringGroupAggregator(1), desc2);
+        PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 1 },
+                new IComparatorFactory[] { StringComparatorFactory.INSTANCE }, new SumStringGroupAggregator(1), desc2);
         PartitionConstraint groupPartitionConstraint2 = new ExplicitPartitionConstraint(new LocationConstraint[] {
-            new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
-        });
+                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
         group2.setPartitionConstraint(groupPartitionConstraint2);
 
         PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
-        PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
-            new AbsoluteLocationConstraint(NC1_ID)
-        });
+        PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
+                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
         printer.setPartitionConstraint(printerPartitionConstraint);
 
         IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(spec,
-            new FieldHashPartitionComputerFactory(new int[] {
-                0
-            }, new IBinaryHashFunctionFactory[] {
-                StringBinaryHashFunctionFactory.INSTANCE
-            }));
+                new FieldHashPartitionComputerFactory(new int[] { 0 },
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
         spec.connect(conn1, csvScanner, 0, sorter, 0);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, sorter, 0, group, 0);
 
         IConnectorDescriptor conn3 = new MToNHashPartitioningConnectorDescriptor(spec,
-            new FieldHashPartitionComputerFactory(new int[] {
-                1
-            }, new IBinaryHashFunctionFactory[] {
-                StringBinaryHashFunctionFactory.INSTANCE
-            }));
+                new FieldHashPartitionComputerFactory(new int[] { 1 },
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
         spec.connect(conn3, group, 0, sorter2, 0);
 
         IConnectorDescriptor conn4 = new OneToOneConnectorDescriptor(spec);
@@ -246,91 +197,62 @@
     public void countOfCountsExternalSortMultiNC() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileSplit[] splits = new FileSplit[] {
-            new FileSplit(NC1_ID, new File("data/words.txt"))
-        };
-        RecordDescriptor desc = new RecordDescriptor(new ISerializerDeserializer[] {
-            StringSerializerDeserializer.INSTANCE
-        });
+        FileSplit[] splits = new FileSplit[] { new FileSplit(NC1_ID, new File("data/words.txt")) };
+        IFileSplitProvider splitProvider = new ConstantFileSplitProvider(splits);
+        RecordDescriptor desc = new RecordDescriptor(
+                new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
 
-        CSVFileScanOperatorDescriptor csvScanner = new CSVFileScanOperatorDescriptor(spec, splits, desc);
-        PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
-            new AbsoluteLocationConstraint(NC1_ID)
-        });
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE },
+                        ','), desc);
+        PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(
+                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
         csvScanner.setPartitionConstraint(csvPartitionConstraint);
 
-        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, 3, new int[] {
-            0
-        }, new IBinaryComparatorFactory[] {
-            StringBinaryComparatorFactory.INSTANCE
-        }, desc);
+        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, 3, new int[] { 0 },
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, desc);
         PartitionConstraint sorterPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
-            new AbsoluteLocationConstraint(NC1_ID),
-            new AbsoluteLocationConstraint(NC2_ID),
-            new AbsoluteLocationConstraint(NC1_ID),
-            new AbsoluteLocationConstraint(NC2_ID)
-        });
+                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID),
+                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
         sorter.setPartitionConstraint(sorterPartitionConstraint);
 
         RecordDescriptor desc2 = new RecordDescriptor(new ISerializerDeserializer[] {
-            StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE
-        });
-        PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(spec, new int[] {
-            0
-        }, new IComparatorFactory[] {
-            StringComparatorFactory.INSTANCE
-        }, new SumStringGroupAggregator(0), desc2);
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+        PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 0 },
+                new IComparatorFactory[] { StringComparatorFactory.INSTANCE }, new SumStringGroupAggregator(0), desc2);
         PartitionConstraint groupPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
-            new AbsoluteLocationConstraint(NC1_ID),
-            new AbsoluteLocationConstraint(NC2_ID),
-            new AbsoluteLocationConstraint(NC1_ID),
-            new AbsoluteLocationConstraint(NC2_ID)
-        });
+                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID),
+                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
         group.setPartitionConstraint(groupPartitionConstraint);
 
-        ExternalSortOperatorDescriptor sorter2 = new ExternalSortOperatorDescriptor(spec, 3, new int[] {
-            1
-        }, new IBinaryComparatorFactory[] {
-            StringBinaryComparatorFactory.INSTANCE
-        }, desc2);
+        ExternalSortOperatorDescriptor sorter2 = new ExternalSortOperatorDescriptor(spec, 3, new int[] { 1 },
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, desc2);
         PartitionConstraint sorterPartitionConstraint2 = new ExplicitPartitionConstraint(new LocationConstraint[] {
-            new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
-        });
+                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
         sorter2.setPartitionConstraint(sorterPartitionConstraint2);
 
-        PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] {
-            1
-        }, new IComparatorFactory[] {
-            StringComparatorFactory.INSTANCE
-        }, new SumStringGroupAggregator(1), desc2);
+        PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 1 },
+                new IComparatorFactory[] { StringComparatorFactory.INSTANCE }, new SumStringGroupAggregator(1), desc2);
         PartitionConstraint groupPartitionConstraint2 = new ExplicitPartitionConstraint(new LocationConstraint[] {
-            new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
-        });
+                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
         group2.setPartitionConstraint(groupPartitionConstraint2);
 
         PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
-        PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
-            new AbsoluteLocationConstraint(NC1_ID)
-        });
+        PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
+                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
         printer.setPartitionConstraint(printerPartitionConstraint);
 
         IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(spec,
-            new FieldHashPartitionComputerFactory(new int[] {
-                0
-            }, new IBinaryHashFunctionFactory[] {
-                StringBinaryHashFunctionFactory.INSTANCE
-            }));
+                new FieldHashPartitionComputerFactory(new int[] { 0 },
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
         spec.connect(conn1, csvScanner, 0, sorter, 0);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, sorter, 0, group, 0);
 
         IConnectorDescriptor conn3 = new MToNHashPartitioningConnectorDescriptor(spec,
-            new FieldHashPartitionComputerFactory(new int[] {
-                1
-            }, new IBinaryHashFunctionFactory[] {
-                StringBinaryHashFunctionFactory.INSTANCE
-            }));
+                new FieldHashPartitionComputerFactory(new int[] { 1 },
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
         spec.connect(conn3, group, 0, sorter2, 0);
 
         IConnectorDescriptor conn4 = new OneToOneConnectorDescriptor(spec);
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/ScanPrintTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/ScanPrintTest.java
index 9b4ce83..691aea8 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/ScanPrintTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/ScanPrintTest.java
@@ -23,13 +23,22 @@
 import edu.uci.ics.hyracks.api.constraints.LocationConstraint;
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.hash.UTF8StringBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNHashPartitioningConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.CSVFileScanOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
 import edu.uci.ics.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;
 
 public class ScanPrintTest extends AbstractIntegrationTest {
@@ -37,23 +46,24 @@
     public void scanPrint01() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileSplit[] splits = new FileSplit[] {
-            new FileSplit(NC2_ID, new File("data/words.txt")), new FileSplit(NC1_ID, new File("data/words.txt"))
-        };
-        RecordDescriptor desc = new RecordDescriptor(new ISerializerDeserializer[] {
-            StringSerializerDeserializer.INSTANCE
-        });
+        IFileSplitProvider splitProvider = new ConstantFileSplitProvider(new FileSplit[] {
+                new FileSplit(NC2_ID, new File("data/words.txt")), new FileSplit(NC1_ID, new File("data/words.txt")) });
 
-        CSVFileScanOperatorDescriptor csvScanner = new CSVFileScanOperatorDescriptor(spec, splits, desc);
+        RecordDescriptor desc = new RecordDescriptor(
+                new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec,
+                splitProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE }, ','),
+                desc);
         PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
-            new AbsoluteLocationConstraint(NC2_ID), new AbsoluteLocationConstraint(NC1_ID)
-        });
+                new AbsoluteLocationConstraint(NC2_ID), new AbsoluteLocationConstraint(NC1_ID) });
         csvScanner.setPartitionConstraint(csvPartitionConstraint);
 
         PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
         PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
-            new AbsoluteLocationConstraint(NC2_ID), new AbsoluteLocationConstraint(NC1_ID)
-        });
+                new AbsoluteLocationConstraint(NC2_ID), new AbsoluteLocationConstraint(NC1_ID) });
         printer.setPartitionConstraint(printerPartitionConstraint);
 
         IConnectorDescriptor conn = new OneToOneConnectorDescriptor(spec);
@@ -62,4 +72,41 @@
         spec.addRoot(printer);
         runTest(spec);
     }
+
+    @Test
+    public void scanPrint02() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new File("data/tpch0.001/orders.tbl")) };
+        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+        PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(
+                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+        ordScanner.setPartitionConstraint(ordersPartitionConstraint);
+
+        PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+        PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
+                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+        printer.setPartitionConstraint(printerPartitionConstraint);
+
+        IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(new int[] { 0 },
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        spec.connect(conn1, ordScanner, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+    }
 }
\ No newline at end of file
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
index 891fede..49070d7 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
@@ -27,14 +27,19 @@
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.StringBinaryComparatorFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.hash.StringBinaryHashFunctionFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.hash.UTF8StringBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
 import edu.uci.ics.hyracks.dataflow.std.connectors.MToNHashPartitioningMergingConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.CSVFileScanOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
 import edu.uci.ics.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.sort.InMemorySortOperatorDescriptor;
 
@@ -44,56 +49,43 @@
         JobSpecification spec = new JobSpecification();
 
         FileSplit[] ordersSplits = new FileSplit[] {
-            new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
-            new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl"))
-        };
+                new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
+                new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl")) };
+        IFileSplitProvider ordersSplitProvider = new ConstantFileSplitProvider(ordersSplits);
         RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE
-        });
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
 
-        CSVFileScanOperatorDescriptor ordScanner = new CSVFileScanOperatorDescriptor(spec, ordersSplits, ordersDesc,
-            '|', "'\"");
+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
         PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
-            new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
-        });
+                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
         ordScanner.setPartitionConstraint(ordersPartitionConstraint);
 
-        InMemorySortOperatorDescriptor sorter = new InMemorySortOperatorDescriptor(spec, new int[] {
-            1
-        }, new IBinaryComparatorFactory[] {
-            StringBinaryComparatorFactory.INSTANCE
-        }, ordersDesc);
+        InMemorySortOperatorDescriptor sorter = new InMemorySortOperatorDescriptor(spec, new int[] { 1 },
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, ordersDesc);
         PartitionConstraint sortersPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
-            new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
-        });
+                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
         sorter.setPartitionConstraint(sortersPartitionConstraint);
 
         PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
-        PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
-            new AbsoluteLocationConstraint(NC1_ID)
-        });
+        PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
+                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
         printer.setPartitionConstraint(printerPartitionConstraint);
 
         spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
 
         spec.connect(new MToNHashPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
-            new int[] {
-                1
-            }, new IBinaryHashFunctionFactory[] {
-                StringBinaryHashFunctionFactory.INSTANCE
-            }), new int[] {
-            1
-        }, new IBinaryComparatorFactory[] {
-            StringBinaryComparatorFactory.INSTANCE
-        }), sorter, 0, printer, 0);
+                new int[] { 1 }, new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+                new int[] { 1 }, new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }),
+                sorter, 0, printer, 0);
 
         runTest(spec);
     }
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
index 96e102a..5ebbf62 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
@@ -24,27 +24,36 @@
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
 import edu.uci.ics.hyracks.api.constraints.PartitionCountConstraint;
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.StringBinaryComparatorFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.hash.StringBinaryHashFunctionFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.hash.UTF8StringBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
 import edu.uci.ics.hyracks.dataflow.std.connectors.MToNHashPartitioningConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.MToNReplicatingConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.CSVFileScanOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
 import edu.uci.ics.hyracks.dataflow.std.join.GraceHashJoinOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.join.HybridHashJoinOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.join.InMemoryHashJoinOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.misc.MaterializingOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;
 
 public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
+    private static final boolean DEBUG = true;
+
     /*
      * TPCH Customer table: CREATE TABLE CUSTOMER ( C_CUSTKEY INTEGER NOT NULL, C_NAME VARCHAR(25) NOT NULL, C_ADDRESS VARCHAR(40) NOT NULL, C_NATIONKEY INTEGER NOT NULL, C_PHONE CHAR(15) NOT NULL, C_ACCTBAL DECIMAL(15,2) NOT NULL, C_MKTSEGMENT CHAR(10) NOT NULL, C_COMMENT VARCHAR(117) NOT NULL ); TPCH Orders table: CREATE TABLE ORDERS ( O_ORDERKEY INTEGER NOT NULL, O_CUSTKEY INTEGER NOT NULL, O_ORDERSTATUS CHAR(1) NOT NULL, O_TOTALPRICE DECIMAL(15,2) NOT NULL, O_ORDERDATE DATE NOT NULL, O_ORDERPRIORITY CHAR(15) NOT NULL, O_CLERK CHAR(15) NOT NULL, O_SHIPPRIORITY INTEGER NOT NULL, O_COMMENT VARCHAR(79) NOT NULL );
      */
@@ -54,51 +63,62 @@
         JobSpecification spec = new JobSpecification();
 
         FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new File("data/tpch0.001/customer.tbl")) };
+        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
         RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
 
         FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new File("data/tpch0.001/orders.tbl")) };
+        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
         RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE });
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
 
         RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE });
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
 
-        CSVFileScanOperatorDescriptor ordScanner = new CSVFileScanOperatorDescriptor(spec, ordersSplits, ordersDesc,
-                '|', "'\"");
+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
         PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(
                 new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
         ordScanner.setPartitionConstraint(ordersPartitionConstraint);
 
-        CSVFileScanOperatorDescriptor custScanner = new CSVFileScanOperatorDescriptor(spec, custSplits, custDesc, '|',
-                "'\"");
+        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
         PartitionConstraint custPartitionConstraint = new ExplicitPartitionConstraint(
                 new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
         custScanner.setPartitionConstraint(custPartitionConstraint);
 
         InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 },
-                new int[] { 0 }, new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE },
-                new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, 128);
+                new int[] { 0 }, new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, 128);
         PartitionConstraint joinPartitionConstraint = new ExplicitPartitionConstraint(
                 new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
         join.setPartitionConstraint(joinPartitionConstraint);
 
-        PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+        IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
+                : new NullSinkOperatorDescriptor(spec);
         PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
                 new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
         printer.setPartitionConstraint(printerPartitionConstraint);
@@ -121,52 +141,63 @@
         JobSpecification spec = new JobSpecification();
 
         FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new File("data/tpch0.001/customer.tbl")) };
+        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
         RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
 
         FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new File("data/tpch0.001/orders.tbl")) };
+        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
         RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE });
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
 
         RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE });
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
 
-        CSVFileScanOperatorDescriptor ordScanner = new CSVFileScanOperatorDescriptor(spec, ordersSplits, ordersDesc,
-                '|', "'\"");
+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
         PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(
                 new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
         ordScanner.setPartitionConstraint(ordersPartitionConstraint);
 
-        CSVFileScanOperatorDescriptor custScanner = new CSVFileScanOperatorDescriptor(spec, custSplits, custDesc, '|',
-                "'\"");
+        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
         PartitionConstraint custPartitionConstraint = new ExplicitPartitionConstraint(
                 new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
         custScanner.setPartitionConstraint(custPartitionConstraint);
 
         GraceHashJoinOperatorDescriptor join = new GraceHashJoinOperatorDescriptor(spec, 4, 10, 200, 1.2,
                 new int[] { 1 }, new int[] { 0 },
-                new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE },
-                new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc);
+                new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc);
         PartitionConstraint joinPartitionConstraint = new ExplicitPartitionConstraint(
                 new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
         join.setPartitionConstraint(joinPartitionConstraint);
 
-        PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+        IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
+                : new NullSinkOperatorDescriptor(spec);
         PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
                 new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
         printer.setPartitionConstraint(printerPartitionConstraint);
@@ -189,52 +220,63 @@
         JobSpecification spec = new JobSpecification();
 
         FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new File("data/tpch0.001/customer.tbl")) };
+        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
         RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
 
         FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new File("data/tpch0.001/orders.tbl")) };
+        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
         RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE });
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
 
         RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE });
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
 
-        CSVFileScanOperatorDescriptor ordScanner = new CSVFileScanOperatorDescriptor(spec, ordersSplits, ordersDesc,
-                '|', "'\"");
+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
         PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(
                 new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
         ordScanner.setPartitionConstraint(ordersPartitionConstraint);
 
-        CSVFileScanOperatorDescriptor custScanner = new CSVFileScanOperatorDescriptor(spec, custSplits, custDesc, '|',
-                "'\"");
+        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
         PartitionConstraint custPartitionConstraint = new ExplicitPartitionConstraint(
                 new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
         custScanner.setPartitionConstraint(custPartitionConstraint);
 
         HybridHashJoinOperatorDescriptor join = new HybridHashJoinOperatorDescriptor(spec, 5, 20, 200, 1.2,
                 new int[] { 1 }, new int[] { 0 },
-                new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE },
-                new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc);
+                new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc);
         PartitionConstraint joinPartitionConstraint = new ExplicitPartitionConstraint(
                 new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
         join.setPartitionConstraint(joinPartitionConstraint);
 
-        PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+        IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
+                : new NullSinkOperatorDescriptor(spec);
         PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
                 new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
         printer.setPartitionConstraint(printerPartitionConstraint);
@@ -259,65 +301,76 @@
         FileSplit[] custSplits = new FileSplit[] {
                 new FileSplit(NC1_ID, new File("data/tpch0.001/customer-part1.tbl")),
                 new FileSplit(NC2_ID, new File("data/tpch0.001/customer-part2.tbl")) };
+        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
         RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
 
         FileSplit[] ordersSplits = new FileSplit[] {
                 new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
                 new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl")) };
+        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
         RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE });
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
 
         RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE });
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
 
-        CSVFileScanOperatorDescriptor ordScanner = new CSVFileScanOperatorDescriptor(spec, ordersSplits, ordersDesc,
-                '|', "'\"");
+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
         PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
                 new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
         ordScanner.setPartitionConstraint(ordersPartitionConstraint);
 
-        CSVFileScanOperatorDescriptor custScanner = new CSVFileScanOperatorDescriptor(spec, custSplits, custDesc, '|',
-                "'\"");
+        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
         PartitionConstraint custPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
                 new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
         custScanner.setPartitionConstraint(custPartitionConstraint);
 
         InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 },
-                new int[] { 0 }, new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE },
-                new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, 128);
+                new int[] { 0 }, new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, 128);
         PartitionConstraint joinPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
                 new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
         join.setPartitionConstraint(joinPartitionConstraint);
 
-        PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+        IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
+                : new NullSinkOperatorDescriptor(spec);
         PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
                 new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
         printer.setPartitionConstraint(printerPartitionConstraint);
 
         IConnectorDescriptor ordJoinConn = new MToNHashPartitioningConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 1 },
-                        new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
         spec.connect(ordJoinConn, ordScanner, 0, join, 0);
 
         IConnectorDescriptor custJoinConn = new MToNHashPartitioningConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 0 },
-                        new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
         spec.connect(custJoinConn, custScanner, 0, join, 1);
 
         IConnectorDescriptor joinPrinterConn = new MToNReplicatingConnectorDescriptor(spec);
@@ -334,66 +387,77 @@
         FileSplit[] custSplits = new FileSplit[] {
                 new FileSplit(NC1_ID, new File("data/tpch0.001/customer-part1.tbl")),
                 new FileSplit(NC2_ID, new File("data/tpch0.001/customer-part2.tbl")) };
+        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
         RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
 
         FileSplit[] ordersSplits = new FileSplit[] {
                 new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
                 new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl")) };
+        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
         RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE });
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
 
         RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE });
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
 
-        CSVFileScanOperatorDescriptor ordScanner = new CSVFileScanOperatorDescriptor(spec, ordersSplits, ordersDesc,
-                '|', "'\"");
+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
         PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
                 new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
         ordScanner.setPartitionConstraint(ordersPartitionConstraint);
 
-        CSVFileScanOperatorDescriptor custScanner = new CSVFileScanOperatorDescriptor(spec, custSplits, custDesc, '|',
-                "'\"");
+        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
         PartitionConstraint custPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
                 new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
         custScanner.setPartitionConstraint(custPartitionConstraint);
 
         GraceHashJoinOperatorDescriptor join = new GraceHashJoinOperatorDescriptor(spec, 3, 20, 100, 1.2,
                 new int[] { 1 }, new int[] { 0 },
-                new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE },
-                new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc);
+                new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc);
         PartitionConstraint joinPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
                 new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
         join.setPartitionConstraint(joinPartitionConstraint);
 
-        PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+        IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
+                : new NullSinkOperatorDescriptor(spec);
         PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
                 new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
         printer.setPartitionConstraint(printerPartitionConstraint);
 
         IConnectorDescriptor ordJoinConn = new MToNHashPartitioningConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 1 },
-                        new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
         spec.connect(ordJoinConn, ordScanner, 0, join, 0);
 
         IConnectorDescriptor custJoinConn = new MToNHashPartitioningConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 0 },
-                        new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
         spec.connect(custJoinConn, custScanner, 0, join, 1);
 
         IConnectorDescriptor joinPrinterConn = new MToNReplicatingConnectorDescriptor(spec);
@@ -410,66 +474,77 @@
         FileSplit[] custSplits = new FileSplit[] {
                 new FileSplit(NC1_ID, new File("data/tpch0.001/customer-part1.tbl")),
                 new FileSplit(NC2_ID, new File("data/tpch0.001/customer-part2.tbl")) };
+        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
         RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
 
         FileSplit[] ordersSplits = new FileSplit[] {
                 new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
                 new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl")) };
+        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
         RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE });
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
 
         RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE });
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
 
-        CSVFileScanOperatorDescriptor ordScanner = new CSVFileScanOperatorDescriptor(spec, ordersSplits, ordersDesc,
-                '|', "'\"");
+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
         PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
                 new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
         ordScanner.setPartitionConstraint(ordersPartitionConstraint);
 
-        CSVFileScanOperatorDescriptor custScanner = new CSVFileScanOperatorDescriptor(spec, custSplits, custDesc, '|',
-                "'\"");
+        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
         PartitionConstraint custPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
                 new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
         custScanner.setPartitionConstraint(custPartitionConstraint);
 
         HybridHashJoinOperatorDescriptor join = new HybridHashJoinOperatorDescriptor(spec, 3, 20, 100, 1.2,
                 new int[] { 1 }, new int[] { 0 },
-                new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE },
-                new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc);
+                new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc);
         PartitionConstraint joinPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
                 new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
         join.setPartitionConstraint(joinPartitionConstraint);
 
-        PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+        IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
+                : new NullSinkOperatorDescriptor(spec);
         PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
                 new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
         printer.setPartitionConstraint(printerPartitionConstraint);
 
         IConnectorDescriptor ordJoinConn = new MToNHashPartitioningConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 1 },
-                        new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
         spec.connect(ordJoinConn, ordScanner, 0, join, 0);
 
         IConnectorDescriptor custJoinConn = new MToNHashPartitioningConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 0 },
-                        new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
         spec.connect(custJoinConn, custScanner, 0, join, 1);
 
         IConnectorDescriptor joinPrinterConn = new MToNReplicatingConnectorDescriptor(spec);
@@ -486,63 +561,74 @@
         FileSplit[] custSplits = new FileSplit[] {
                 new FileSplit(NC1_ID, new File("data/tpch0.001/customer-part1.tbl")),
                 new FileSplit(NC2_ID, new File("data/tpch0.001/customer-part2.tbl")) };
+        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
         RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
 
         FileSplit[] ordersSplits = new FileSplit[] {
                 new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
                 new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl")) };
+        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
         RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE });
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
 
         RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE });
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
 
-        CSVFileScanOperatorDescriptor ordScanner = new CSVFileScanOperatorDescriptor(spec, ordersSplits, ordersDesc,
-                '|', "'\"");
+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
         PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
                 new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
         ordScanner.setPartitionConstraint(ordersPartitionConstraint);
 
-        CSVFileScanOperatorDescriptor custScanner = new CSVFileScanOperatorDescriptor(spec, custSplits, custDesc, '|',
-                "'\"");
+        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
         PartitionConstraint custPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
                 new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
         custScanner.setPartitionConstraint(custPartitionConstraint);
 
         InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 },
-                new int[] { 0 }, new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE },
-                new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, 128);
+                new int[] { 0 }, new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, 128);
         join.setPartitionConstraint(new PartitionCountConstraint(2));
 
-        PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+        IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
+                : new NullSinkOperatorDescriptor(spec);
         PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
                 new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
         printer.setPartitionConstraint(printerPartitionConstraint);
 
         IConnectorDescriptor ordJoinConn = new MToNHashPartitioningConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 1 },
-                        new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
         spec.connect(ordJoinConn, ordScanner, 0, join, 0);
 
         IConnectorDescriptor custJoinConn = new MToNHashPartitioningConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 0 },
-                        new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
         spec.connect(custJoinConn, custScanner, 0, join, 1);
 
         IConnectorDescriptor joinPrinterConn = new MToNReplicatingConnectorDescriptor(spec);
@@ -559,41 +645,51 @@
         FileSplit[] custSplits = new FileSplit[] {
                 new FileSplit(NC1_ID, new File("data/tpch0.001/customer-part1.tbl")),
                 new FileSplit(NC2_ID, new File("data/tpch0.001/customer-part2.tbl")) };
+        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
         RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
 
         FileSplit[] ordersSplits = new FileSplit[] {
                 new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
                 new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl")) };
+        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
         RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE });
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
 
         RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE });
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
 
-        CSVFileScanOperatorDescriptor ordScanner = new CSVFileScanOperatorDescriptor(spec, ordersSplits, ordersDesc,
-                '|', "'\"");
+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
         PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
                 new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
         ordScanner.setPartitionConstraint(ordersPartitionConstraint);
 
-        CSVFileScanOperatorDescriptor custScanner = new CSVFileScanOperatorDescriptor(spec, custSplits, custDesc, '|',
-                "'\"");
+        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
         PartitionConstraint custPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
                 new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
         custScanner.setPartitionConstraint(custPartitionConstraint);
@@ -607,25 +703,26 @@
                 new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) }));
 
         InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 },
-                new int[] { 0 }, new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE },
-                new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, 128);
+                new int[] { 0 }, new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, 128);
         PartitionConstraint joinPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
                 new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
         join.setPartitionConstraint(joinPartitionConstraint);
 
-        PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+        IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
+                : new NullSinkOperatorDescriptor(spec);
         PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
                 new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
         printer.setPartitionConstraint(printerPartitionConstraint);
 
         IConnectorDescriptor ordPartConn = new MToNHashPartitioningConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 1 },
-                        new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
         spec.connect(ordPartConn, ordScanner, 0, ordMat, 0);
 
         IConnectorDescriptor custPartConn = new MToNHashPartitioningConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 0 },
-                        new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
         spec.connect(custPartConn, custScanner, 0, custMat, 0);
 
         IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
diff --git a/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java b/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
index 2eafd76..d90f384 100644
--- a/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
+++ b/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
@@ -17,15 +17,20 @@
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.StringBinaryComparatorFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.hash.StringBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.hash.UTF8StringBinaryHashFunctionFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
 import edu.uci.ics.hyracks.dataflow.std.connectors.MToNHashPartitioningConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.CSVFileScanOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
 import edu.uci.ics.hyracks.dataflow.std.group.HashGroupOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.join.InMemoryHashJoinOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;
@@ -62,51 +67,62 @@
         JobSpecification spec = new JobSpecification();
 
         FileSplit[] custSplits = createCustomerFileSplits();
+        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
         RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
 
         FileSplit[] ordersSplits = createOrdersFileSplits();
+        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
         RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE });
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
 
         RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE });
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
 
-        CSVFileScanOperatorDescriptor ordScanner = new CSVFileScanOperatorDescriptor(spec, ordersSplits, ordersDesc,
-                '|', "'\"");
+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
         ordScanner.setPartitionConstraint(createRRPartitionConstraint(2));
 
-        CSVFileScanOperatorDescriptor custScanner = new CSVFileScanOperatorDescriptor(spec, custSplits, custDesc, '|',
-                "'\"");
+        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
         custScanner.setPartitionConstraint(createRRPartitionConstraint(2));
 
         InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 0 },
-                new int[] { 1 }, new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE },
-                new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, 6000000);
+                new int[] { 1 }, new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc,
+                6000000);
         join.setPartitionConstraint(new PartitionCountConstraint(4));
 
         RecordDescriptor groupResultDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
 
         HashGroupOperatorDescriptor gby = new HashGroupOperatorDescriptor(spec, new int[] { 6 },
                 new FieldHashPartitionComputerFactory(new int[] { 6 },
-                        new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }),
-                new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE },
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
                 new CountAccumulatingAggregatorFactory(), groupResultDesc, 16);
         gby.setPartitionConstraint(new PartitionCountConstraint(4));
 
@@ -115,17 +131,17 @@
 
         IConnectorDescriptor ordJoinConn = new MToNHashPartitioningConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 1 },
-                        new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
         spec.connect(ordJoinConn, ordScanner, 0, join, 1);
 
         IConnectorDescriptor custJoinConn = new MToNHashPartitioningConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 0 },
-                        new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
         spec.connect(custJoinConn, custScanner, 0, join, 0);
 
         IConnectorDescriptor joinGroupConn = new MToNHashPartitioningConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 6 },
-                        new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
         spec.connect(joinGroupConn, join, 0, gby, 0);
 
         IConnectorDescriptor gbyPrinterConn = new OneToOneConnectorDescriptor(spec);