merge hyracks_dev_next r847:977

git-svn-id: https://hyracks.googlecode.com/svn/branches/aggregators_dev_next@978 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-examples/btree-example/btreeclient/pom.xml b/hyracks-examples/btree-example/btreeclient/pom.xml
index a95d17e..e0930b2 100644
--- a/hyracks-examples/btree-example/btreeclient/pom.xml
+++ b/hyracks-examples/btree-example/btreeclient/pom.xml
@@ -2,8 +2,6 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks.examples.btree</groupId>
   <artifactId>btreeclient</artifactId>
-  <version>0.2.0-SNAPSHOT</version>
-
   <parent>
     <groupId>edu.uci.ics.hyracks.examples</groupId>
     <artifactId>btree-example</artifactId>
diff --git a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/InsertPipelineExample.java b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/InsertPipelineExample.java
index 4711e9e..5a6d8cd 100644
--- a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/InsertPipelineExample.java
+++ b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/InsertPipelineExample.java
@@ -18,21 +18,21 @@
 import org.kohsuke.args4j.CmdLineParser;
 import org.kohsuke.args4j.Option;
 
-import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITypeTrait;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.value.TypeTrait;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.IntegerBinaryComparatorFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.hash.UTF8StringBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
@@ -63,7 +63,7 @@
         public String host;
 
         @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1099)")
-        public int port = 1099;
+        public int port = 1098;
 
         @Option(name = "-app", usage = "Hyracks Application name", required = true)
         public String app;
@@ -86,7 +86,7 @@
         CmdLineParser parser = new CmdLineParser(options);
         parser.parseArgument(args);
 
-        IHyracksClientConnection hcc = new HyracksRMIConnection(options.host, options.port);
+        IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port);
 
         JobSpecification job = createJob(options);
 
@@ -134,16 +134,16 @@
         // prepare insertion into primary index
         // tuples to be put into B-Tree shall have 4 fields
         int primaryFieldCount = 4;
-        ITypeTrait[] primaryTypeTraits = new ITypeTrait[primaryFieldCount];
-        primaryTypeTraits[0] = new TypeTrait(4);
-        primaryTypeTraits[1] = new TypeTrait(ITypeTrait.VARIABLE_LENGTH);
-        primaryTypeTraits[2] = new TypeTrait(4);
-        primaryTypeTraits[3] = new TypeTrait(ITypeTrait.VARIABLE_LENGTH);
+        ITypeTraits[] primaryTypeTraits = new ITypeTraits[primaryFieldCount];
+        primaryTypeTraits[0] = IntegerPointable.TYPE_TRAITS;
+        primaryTypeTraits[1] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[2] = IntegerPointable.TYPE_TRAITS;
+        primaryTypeTraits[3] = UTF8StringPointable.TYPE_TRAITS;
 
         // comparator factories for primary index
         IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[1];
-        primaryComparatorFactories[0] = IntegerBinaryComparatorFactory.INSTANCE;
-        
+        primaryComparatorFactories[0] = PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY);
+
         // create factories and providers for secondary B-Tree
         TypeAwareTupleWriterFactory primaryTupleWriterFactory = new TypeAwareTupleWriterFactory(primaryTypeTraits);
         ITreeIndexFrameFactory primaryInteriorFrameFactory = new BTreeNSMInteriorFrameFactory(primaryTupleWriterFactory);
@@ -168,15 +168,15 @@
         // prepare insertion into secondary index
         // tuples to be put into B-Tree shall have 2 fields
         int secondaryFieldCount = 2;
-        ITypeTrait[] secondaryTypeTraits = new ITypeTrait[secondaryFieldCount];
-        secondaryTypeTraits[0] = new TypeTrait(ITypeTrait.VARIABLE_LENGTH);
-        secondaryTypeTraits[1] = new TypeTrait(4);
+        ITypeTraits[] secondaryTypeTraits = new ITypeTraits[secondaryFieldCount];
+        secondaryTypeTraits[0] = UTF8StringPointable.TYPE_TRAITS;
+        secondaryTypeTraits[1] = IntegerPointable.TYPE_TRAITS;
 
         // comparator factories for secondary index
         IBinaryComparatorFactory[] secondaryComparatorFactories = new IBinaryComparatorFactory[2];
-        secondaryComparatorFactories[0] = UTF8StringBinaryComparatorFactory.INSTANCE;
-        secondaryComparatorFactories[1] = IntegerBinaryComparatorFactory.INSTANCE;
-        
+        secondaryComparatorFactories[0] = PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY);
+        secondaryComparatorFactories[1] = PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY);
+
         // create factories and providers for secondary B-Tree
         TypeAwareTupleWriterFactory secondaryTupleWriterFactory = new TypeAwareTupleWriterFactory(secondaryTypeTraits);
         ITreeIndexFrameFactory secondaryInteriorFrameFactory = new BTreeNSMInteriorFrameFactory(
@@ -202,7 +202,7 @@
         // distribute the records from the datagen via hashing to the bulk load
         // ops
         IBinaryHashFunctionFactory[] hashFactories = new IBinaryHashFunctionFactory[1];
-        hashFactories[0] = UTF8StringBinaryHashFunctionFactory.INSTANCE;
+        hashFactories[0] = PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY);
         IConnectorDescriptor hashConn = new MToNPartitioningConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 0 }, hashFactories));
 
diff --git a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
index 9790496..0b1cd6c 100644
--- a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
+++ b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
@@ -17,20 +17,21 @@
 import org.kohsuke.args4j.CmdLineParser;
 import org.kohsuke.args4j.Option;
 
-import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITypeTrait;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.value.TypeTrait;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.IntegerBinaryComparatorFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.hash.UTF8StringBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
@@ -60,7 +61,7 @@
         public String host;
 
         @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1099)")
-        public int port = 1099;
+        public int port = 1098;
 
         @Option(name = "-app", usage = "Hyracks Application name", required = true)
         public String app;
@@ -83,7 +84,7 @@
         CmdLineParser parser = new CmdLineParser(options);
         parser.parseArgument(args);
 
-        IHyracksClientConnection hcc = new HyracksRMIConnection(options.host, options.port);
+        IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port);
 
         JobSpecification job = createJob(options);
 
@@ -130,18 +131,18 @@
         int[] sortFields = { 2 };
         // comparators for sort fields
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
-        comparatorFactories[0] = IntegerBinaryComparatorFactory.INSTANCE;
+        comparatorFactories[0] = PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY);
         ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, options.sbSize, sortFields,
                 comparatorFactories, recDesc);
         JobHelper.createPartitionConstraint(spec, sorter, splitNCs);
 
         // tuples to be put into B-Tree shall have 4 fields
         int fieldCount = 4;
-        ITypeTrait[] typeTraits = new ITypeTrait[fieldCount];
-        typeTraits[0] = new TypeTrait(4);
-        typeTraits[1] = new TypeTrait(ITypeTrait.VARIABLE_LENGTH);
-        typeTraits[2] = new TypeTrait(4);
-        typeTraits[3] = new TypeTrait(ITypeTrait.VARIABLE_LENGTH);
+        ITypeTraits[] typeTraits = new ITypeTraits[fieldCount];
+        typeTraits[0] = IntegerPointable.TYPE_TRAITS;
+        typeTraits[1] = UTF8StringPointable.TYPE_TRAITS;
+        typeTraits[2] = IntegerPointable.TYPE_TRAITS;
+        typeTraits[3] = UTF8StringPointable.TYPE_TRAITS;
 
         // create factories and providers for B-Tree
         TypeAwareTupleWriterFactory tupleWriterFactory = new TypeAwareTupleWriterFactory(typeTraits);
@@ -165,7 +166,7 @@
         // distribute the records from the datagen via hashing to the bulk load
         // ops
         IBinaryHashFunctionFactory[] hashFactories = new IBinaryHashFunctionFactory[1];
-        hashFactories[0] = UTF8StringBinaryHashFunctionFactory.INSTANCE;
+        hashFactories[0] = PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY);
         IConnectorDescriptor hashConn = new MToNPartitioningConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 0 }, hashFactories));
 
diff --git a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexSearchExample.java b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexSearchExample.java
index 4929243..aa73a62 100644
--- a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexSearchExample.java
+++ b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexSearchExample.java
@@ -19,18 +19,19 @@
 import org.kohsuke.args4j.CmdLineParser;
 import org.kohsuke.args4j.Option;
 
-import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITypeTrait;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.value.TypeTrait;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.IntegerBinaryComparatorFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
@@ -59,7 +60,7 @@
         public String host;
 
         @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1099)")
-        public int port = 1099;
+        public int port = 1098;
 
         @Option(name = "-app", usage = "Hyracks Application name", required = true)
         public String app;
@@ -76,7 +77,7 @@
         CmdLineParser parser = new CmdLineParser(options);
         parser.parseArgument(args);
 
-        IHyracksClientConnection hcc = new HyracksRMIConnection(options.host, options.port);
+        IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port);
 
         JobSpecification job = createJob(options);
 
@@ -95,16 +96,16 @@
         String[] splitNCs = options.ncs.split(",");
 
         int fieldCount = 4;
-        ITypeTrait[] typeTraits = new ITypeTrait[fieldCount];
-        typeTraits[0] = new TypeTrait(4);
-        typeTraits[1] = new TypeTrait(ITypeTrait.VARIABLE_LENGTH);
-        typeTraits[2] = new TypeTrait(4);
-        typeTraits[3] = new TypeTrait(ITypeTrait.VARIABLE_LENGTH);
+        ITypeTraits[] typeTraits = new ITypeTraits[fieldCount];
+        typeTraits[0] = IntegerPointable.TYPE_TRAITS;
+        typeTraits[1] = UTF8StringPointable.TYPE_TRAITS;
+        typeTraits[2] = IntegerPointable.TYPE_TRAITS;
+        typeTraits[3] = UTF8StringPointable.TYPE_TRAITS;
 
         // comparators for btree
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
-        comparatorFactories[0] = IntegerBinaryComparatorFactory.INSTANCE;
-        
+        comparatorFactories[0] = PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY);
+
         // create factories and providers for B-Tree
         TypeAwareTupleWriterFactory tupleWriterFactory = new TypeAwareTupleWriterFactory(typeTraits);
         ITreeIndexFrameFactory interiorFrameFactory = new BTreeNSMInteriorFrameFactory(tupleWriterFactory);
diff --git a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
index d77eb01..ad9ad3a 100644
--- a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
+++ b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
@@ -17,17 +17,17 @@
 import org.kohsuke.args4j.CmdLineParser;
 import org.kohsuke.args4j.Option;
 
-import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITypeTrait;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.value.TypeTrait;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.IntegerBinaryComparatorFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
@@ -56,7 +56,7 @@
         public String host;
 
         @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1099)")
-        public int port = 1099;
+        public int port = 1098;
 
         @Option(name = "-app", usage = "Hyracks Application name", required = true)
         public String app;
@@ -79,7 +79,7 @@
         CmdLineParser parser = new CmdLineParser(options);
         parser.parseArgument(args);
 
-        IHyracksClientConnection hcc = new HyracksRMIConnection(options.host, options.port);
+        IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port);
 
         JobSpecification job = createJob(options);
 
@@ -111,17 +111,17 @@
                 IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
 
         int primaryFieldCount = 4;
-        ITypeTrait[] primaryTypeTraits = new ITypeTrait[primaryFieldCount];
-        primaryTypeTraits[0] = new TypeTrait(4);
-        primaryTypeTraits[1] = new TypeTrait(ITypeTrait.VARIABLE_LENGTH);
-        primaryTypeTraits[2] = new TypeTrait(4);
-        primaryTypeTraits[3] = new TypeTrait(ITypeTrait.VARIABLE_LENGTH);
+        ITypeTraits[] primaryTypeTraits = new ITypeTraits[primaryFieldCount];
+        primaryTypeTraits[0] = IntegerPointable.TYPE_TRAITS;
+        primaryTypeTraits[1] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[2] = IntegerPointable.TYPE_TRAITS;
+        primaryTypeTraits[3] = UTF8StringPointable.TYPE_TRAITS;
 
         // comparators for sort fields and BTree fields
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[2];
-        comparatorFactories[0] = UTF8StringBinaryComparatorFactory.INSTANCE;
-        comparatorFactories[1] = IntegerBinaryComparatorFactory.INSTANCE;
-        
+        comparatorFactories[0] = PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY);
+        comparatorFactories[1] = PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY);
+
         // create factories and providers for primary B-Tree
         TypeAwareTupleWriterFactory primaryTupleWriterFactory = new TypeAwareTupleWriterFactory(primaryTypeTraits);
         ITreeIndexFrameFactory primaryInteriorFrameFactory = new BTreeNSMInteriorFrameFactory(primaryTupleWriterFactory);
@@ -144,9 +144,9 @@
 
         // tuples to be put into B-Tree shall have 2 fields
         int secondaryFieldCount = 2;
-        ITypeTrait[] secondaryTypeTraits = new ITypeTrait[secondaryFieldCount];
-        secondaryTypeTraits[0] = new TypeTrait(ITypeTrait.VARIABLE_LENGTH);
-        secondaryTypeTraits[1] = new TypeTrait(4);
+        ITypeTraits[] secondaryTypeTraits = new ITypeTraits[secondaryFieldCount];
+        secondaryTypeTraits[0] = UTF8StringPointable.TYPE_TRAITS;
+        secondaryTypeTraits[1] = IntegerPointable.TYPE_TRAITS;
 
         // create factories and providers for secondary B-Tree
         TypeAwareTupleWriterFactory secondaryTupleWriterFactory = new TypeAwareTupleWriterFactory(secondaryTypeTraits);
diff --git a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java
index 3a79df6..2719397 100644
--- a/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java
+++ b/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexSearchExample.java
@@ -19,19 +19,19 @@
 import org.kohsuke.args4j.CmdLineParser;
 import org.kohsuke.args4j.Option;
 
-import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITypeTrait;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.value.TypeTrait;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.IntegerBinaryComparatorFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
@@ -60,7 +60,7 @@
         public String host;
 
         @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1099)")
-        public int port = 1099;
+        public int port = 1098;
 
         @Option(name = "-app", usage = "Hyracks Application name", required = true)
         public String app;
@@ -80,7 +80,7 @@
         CmdLineParser parser = new CmdLineParser(options);
         parser.parseArgument(args);
 
-        IHyracksClientConnection hcc = new HyracksRMIConnection(options.host, options.port);
+        IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port);
 
         JobSpecification job = createJob(options);
 
@@ -106,19 +106,19 @@
                 UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
 
         int secondaryFieldCount = 2;
-        ITypeTrait[] secondaryTypeTraits = new ITypeTrait[secondaryFieldCount];
-        secondaryTypeTraits[0] = new TypeTrait(ITypeTrait.VARIABLE_LENGTH);
-        secondaryTypeTraits[1] = new TypeTrait(4);
+        ITypeTraits[] secondaryTypeTraits = new ITypeTraits[secondaryFieldCount];
+        secondaryTypeTraits[0] = UTF8StringPointable.TYPE_TRAITS;
+        secondaryTypeTraits[1] = IntegerPointable.TYPE_TRAITS;
 
         // comparators for sort fields and BTree fields
         IBinaryComparatorFactory[] secondaryComparatorFactories = new IBinaryComparatorFactory[2];
-        secondaryComparatorFactories[0] = UTF8StringBinaryComparatorFactory.INSTANCE;
-        secondaryComparatorFactories[1] = IntegerBinaryComparatorFactory.INSTANCE;
-        
+        secondaryComparatorFactories[0] = PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY);
+        secondaryComparatorFactories[1] = PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY);
+
         // comparators for primary index
         IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[1];
-        primaryComparatorFactories[1] = IntegerBinaryComparatorFactory.INSTANCE;
-        
+        primaryComparatorFactories[1] = PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY);
+
         // create factories and providers for secondary B-Tree
         TypeAwareTupleWriterFactory secondaryTupleWriterFactory = new TypeAwareTupleWriterFactory(secondaryTypeTraits);
         ITreeIndexFrameFactory secondaryInteriorFrameFactory = new BTreeNSMInteriorFrameFactory(
@@ -131,11 +131,11 @@
                 IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE, });
 
         int primaryFieldCount = 4;
-        ITypeTrait[] primaryTypeTraits = new ITypeTrait[primaryFieldCount];
-        primaryTypeTraits[0] = new TypeTrait(4);
-        primaryTypeTraits[1] = new TypeTrait(ITypeTrait.VARIABLE_LENGTH);
-        primaryTypeTraits[2] = new TypeTrait(4);
-        primaryTypeTraits[3] = new TypeTrait(ITypeTrait.VARIABLE_LENGTH);
+        ITypeTraits[] primaryTypeTraits = new ITypeTraits[primaryFieldCount];
+        primaryTypeTraits[0] = IntegerPointable.TYPE_TRAITS;
+        primaryTypeTraits[1] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[2] = IntegerPointable.TYPE_TRAITS;
+        primaryTypeTraits[3] = UTF8StringPointable.TYPE_TRAITS;
 
         // create factories and providers for secondary B-Tree
         TypeAwareTupleWriterFactory primaryTupleWriterFactory = new TypeAwareTupleWriterFactory(primaryTypeTraits);
@@ -147,13 +147,13 @@
         // i.e. we will have a range condition on the first field only (implying
         // [-infinity, +infinity] for the second field)
         IBinaryComparatorFactory[] searchComparatorFactories = new IBinaryComparatorFactory[1];
-        searchComparatorFactories[0] = UTF8StringBinaryComparatorFactory.INSTANCE;
-        
+        searchComparatorFactories[0] = PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY);
+
         // build tuple containing low and high search keys
         ArrayTupleBuilder tb = new ArrayTupleBuilder(searchComparatorFactories.length * 2); // low
-                                                                                      // and
-                                                                                      // high
-                                                                                      // key
+        // and
+        // high
+        // key
         DataOutput dos = tb.getDataOutput();
 
         tb.reset();
diff --git a/hyracks-examples/btree-example/btreehelper/pom.xml b/hyracks-examples/btree-example/btreehelper/pom.xml
index 2615dc2..046d762 100644
--- a/hyracks-examples/btree-example/btreehelper/pom.xml
+++ b/hyracks-examples/btree-example/btreehelper/pom.xml
@@ -2,8 +2,6 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks.examples.btree</groupId>
   <artifactId>btreehelper</artifactId>
-  <version>0.2.0-SNAPSHOT</version>
-
   <parent>
     <groupId>edu.uci.ics.hyracks.examples</groupId>
     <artifactId>btree-example</artifactId>
@@ -29,6 +27,11 @@
   		<version>0.2.0-SNAPSHOT</version>
   		<scope>compile</scope>
   	</dependency>
+  	<dependency>
+  		<groupId>edu.uci.ics.hyracks</groupId>
+  		<artifactId>hyracks-data-std</artifactId>
+  		<version>0.2.0-SNAPSHOT</version>
+  	</dependency>
   </dependencies>
   <build>
     <plugins>
diff --git a/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/DataGenOperatorDescriptor.java b/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/DataGenOperatorDescriptor.java
index 35deb57..36e315d 100644
--- a/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/DataGenOperatorDescriptor.java
+++ b/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/DataGenOperatorDescriptor.java
@@ -58,8 +58,8 @@
     }
 
     @Override
-    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
-            int partition, int nPartitions) {
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
 
         final ByteBuffer outputFrame = ctx.allocateFrame();
         final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
@@ -82,7 +82,7 @@
                     appender.reset(outputFrame, true);
                     for (int i = 0; i < numRecords; i++) {
                         tb.reset();
-                        for (int j = 0; j < recDesc.getFields().length; j++) {
+                        for (int j = 0; j < recDesc.getFieldCount(); j++) {
                             genField(tb, j);
                         }
 
diff --git a/hyracks-examples/hyracks-integration-tests/pom.xml b/hyracks-examples/hyracks-integration-tests/pom.xml
index 7012cde..d74b384 100644
--- a/hyracks-examples/hyracks-integration-tests/pom.xml
+++ b/hyracks-examples/hyracks-integration-tests/pom.xml
@@ -2,8 +2,6 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks.examples</groupId>
   <artifactId>hyracks-integration-tests</artifactId>
-  <version>0.2.0-SNAPSHOT</version>
-
   <parent>
     <groupId>edu.uci.ics.hyracks</groupId>
     <artifactId>hyracks-examples</artifactId>
@@ -80,5 +78,10 @@
   		<type>jar</type>
   		<scope>test</scope>
   	</dependency>
+  	<dependency>
+  		<groupId>edu.uci.ics.hyracks</groupId>
+  		<artifactId>hyracks-data-std</artifactId>
+  		<version>0.2.0-SNAPSHOT</version>
+  	</dependency>
   </dependencies>
 </project>
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreePrimaryIndexScanOperatorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreePrimaryIndexScanOperatorTest.java
index 38dd051..6ef1740 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreePrimaryIndexScanOperatorTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreePrimaryIndexScanOperatorTest.java
@@ -28,12 +28,13 @@
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITypeTrait;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
@@ -76,7 +77,7 @@
 
     // field, type and key declarations for primary index
     private int primaryFieldCount = 6;
-    private ITypeTrait[] primaryTypeTraits = new ITypeTrait[primaryFieldCount];
+    private ITypeTraits[] primaryTypeTraits = new ITypeTraits[primaryFieldCount];
     private int primaryKeyFieldCount = 1;
     private IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[primaryKeyFieldCount];
     private TypeAwareTupleWriterFactory primaryTupleWriterFactory = new TypeAwareTupleWriterFactory(primaryTypeTraits);
@@ -98,13 +99,13 @@
     @Before
     public void setup() throws Exception {
         // field, type and key declarations for primary index
-        primaryTypeTraits[0] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryTypeTraits[1] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryTypeTraits[2] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryTypeTraits[3] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryTypeTraits[4] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryTypeTraits[5] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryComparatorFactories[0] = UTF8StringBinaryComparatorFactory.INSTANCE;
+        primaryTypeTraits[0] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[1] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[2] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[3] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[4] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[5] = UTF8StringPointable.TYPE_TRAITS;
+        primaryComparatorFactories[0] = PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY);
 
         loadPrimaryIndexTest();
     }
@@ -131,7 +132,8 @@
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID);
 
         ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, 1000, new int[] { 0 },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, ordersDesc);
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID);
 
         int[] fieldPermutation = { 0, 1, 2, 4, 5, 7 };
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreePrimaryIndexSearchOperatorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreePrimaryIndexSearchOperatorTest.java
index a043675..51dbb69 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreePrimaryIndexSearchOperatorTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreePrimaryIndexSearchOperatorTest.java
@@ -28,12 +28,13 @@
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITypeTrait;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
@@ -76,7 +77,7 @@
 
     // field, type and key declarations for primary index
     private int primaryFieldCount = 6;
-    private ITypeTrait[] primaryTypeTraits = new ITypeTrait[primaryFieldCount];
+    private ITypeTraits[] primaryTypeTraits = new ITypeTraits[primaryFieldCount];
     private int primaryKeyFieldCount = 1;
     private IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[primaryKeyFieldCount];
     private TypeAwareTupleWriterFactory primaryTupleWriterFactory = new TypeAwareTupleWriterFactory(primaryTypeTraits);
@@ -98,13 +99,13 @@
     @Before
     public void setup() throws Exception {
         // field, type and key declarations for primary index
-        primaryTypeTraits[0] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryTypeTraits[1] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryTypeTraits[2] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryTypeTraits[3] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryTypeTraits[4] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryTypeTraits[5] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryComparatorFactories[0] = UTF8StringBinaryComparatorFactory.INSTANCE;
+        primaryTypeTraits[0] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[1] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[2] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[3] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[4] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[5] = UTF8StringPointable.TYPE_TRAITS;
+        primaryComparatorFactories[0] = PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY);
 
         loadPrimaryIndexTest();
     }
@@ -131,7 +132,8 @@
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID);
 
         ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, 1000, new int[] { 0 },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, ordersDesc);
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID);
 
         int[] fieldPermutation = { 0, 1, 2, 4, 5, 7 };
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreePrimaryIndexStatsOperatorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreePrimaryIndexStatsOperatorTest.java
index 51d3db1..a804bae 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreePrimaryIndexStatsOperatorTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreePrimaryIndexStatsOperatorTest.java
@@ -27,11 +27,12 @@
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITypeTrait;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
@@ -73,7 +74,7 @@
 
     // field, type and key declarations for primary index
     private int primaryFieldCount = 6;
-    private ITypeTrait[] primaryTypeTraits = new ITypeTrait[primaryFieldCount];
+    private ITypeTraits[] primaryTypeTraits = new ITypeTraits[primaryFieldCount];
     private int primaryKeyFieldCount = 1;
     private IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[primaryKeyFieldCount];
     private TypeAwareTupleWriterFactory primaryTupleWriterFactory = new TypeAwareTupleWriterFactory(primaryTypeTraits);
@@ -95,13 +96,13 @@
     @Before
     public void setup() throws Exception {
         // field, type and key declarations for primary index
-        primaryTypeTraits[0] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryTypeTraits[1] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryTypeTraits[2] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryTypeTraits[3] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryTypeTraits[4] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryTypeTraits[5] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryComparatorFactories[0] = UTF8StringBinaryComparatorFactory.INSTANCE;
+        primaryTypeTraits[0] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[1] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[2] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[3] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[4] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[5] = UTF8StringPointable.TYPE_TRAITS;
+        primaryComparatorFactories[0] = PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY);
 
         loadPrimaryIndexTest();
     }
@@ -128,7 +129,8 @@
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID);
 
         ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, 1000, new int[] { 0 },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, ordersDesc);
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID);
 
         int[] fieldPermutation = { 0, 1, 2, 4, 5, 7 };
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreeSecondaryIndexInsertOperatorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreeSecondaryIndexInsertOperatorTest.java
index 3b6b202..a16bbe4 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreeSecondaryIndexInsertOperatorTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreeSecondaryIndexInsertOperatorTest.java
@@ -28,12 +28,13 @@
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITypeTrait;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
@@ -79,7 +80,7 @@
 
     // field, type and key declarations for primary index
     private int primaryFieldCount = 6;
-    private ITypeTrait[] primaryTypeTraits = new ITypeTrait[primaryFieldCount];
+    private ITypeTraits[] primaryTypeTraits = new ITypeTraits[primaryFieldCount];
     private int primaryKeyFieldCount = 1;
     private IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[primaryKeyFieldCount];
     private TypeAwareTupleWriterFactory primaryTupleWriterFactory = new TypeAwareTupleWriterFactory(primaryTypeTraits);
@@ -100,7 +101,7 @@
 
     // field, type and key declarations for secondary indexes
     private int secondaryFieldCount = 2;
-    private ITypeTrait[] secondaryTypeTraits = new ITypeTrait[secondaryFieldCount];
+    private ITypeTraits[] secondaryTypeTraits = new ITypeTraits[secondaryFieldCount];
     private int secondaryKeyFieldCount = 2;
     private IBinaryComparatorFactory[] secondaryComparatorFactories = new IBinaryComparatorFactory[secondaryKeyFieldCount];
     private TypeAwareTupleWriterFactory secondaryTupleWriterFactory = new TypeAwareTupleWriterFactory(
@@ -121,19 +122,19 @@
     @Before
     public void setup() throws Exception {
         // field, type and key declarations for primary index
-        primaryTypeTraits[0] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryTypeTraits[1] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryTypeTraits[2] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryTypeTraits[3] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryTypeTraits[4] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryTypeTraits[5] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryComparatorFactories[0] = UTF8StringBinaryComparatorFactory.INSTANCE;
+        primaryTypeTraits[0] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[1] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[2] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[3] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[4] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[5] = UTF8StringPointable.TYPE_TRAITS;
+        primaryComparatorFactories[0] = PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY);
 
         // field, type and key declarations for secondary indexes
-        secondaryTypeTraits[0] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        secondaryTypeTraits[1] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        secondaryComparatorFactories[0] = UTF8StringBinaryComparatorFactory.INSTANCE;
-        secondaryComparatorFactories[1] = UTF8StringBinaryComparatorFactory.INSTANCE;
+        secondaryTypeTraits[0] = UTF8StringPointable.TYPE_TRAITS;
+        secondaryTypeTraits[1] = UTF8StringPointable.TYPE_TRAITS;
+        secondaryComparatorFactories[0] = PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY);
+        secondaryComparatorFactories[1] = PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY);
 
         loadPrimaryIndexTest();
         loadSecondaryIndexTest();
@@ -162,7 +163,8 @@
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID);
 
         ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, 1000, new int[] { 0 },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, ordersDesc);
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID);
 
         int[] fieldPermutation = { 0, 1, 2, 4, 5, 7 };
@@ -211,7 +213,8 @@
 
         // sort based on secondary keys
         ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, 1000, new int[] { 3, 0 },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, primaryRecDesc);
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                primaryRecDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID);
 
         // load secondary index
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreeSecondaryIndexSearchOperatorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreeSecondaryIndexSearchOperatorTest.java
index 1e070a8..ef1c6f6 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreeSecondaryIndexSearchOperatorTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreeSecondaryIndexSearchOperatorTest.java
@@ -28,12 +28,13 @@
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITypeTrait;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
@@ -62,308 +63,238 @@
 import edu.uci.ics.hyracks.test.support.TestStorageManagerInterface;
 import edu.uci.ics.hyracks.tests.integration.AbstractIntegrationTest;
 
-public class BTreeSecondaryIndexSearchOperatorTest extends
-		AbstractIntegrationTest {
-	static {
-		TestStorageManagerComponentHolder.init(8192, 20, 20);
-	}
+public class BTreeSecondaryIndexSearchOperatorTest extends AbstractIntegrationTest {
+    static {
+        TestStorageManagerComponentHolder.init(8192, 20, 20);
+    }
 
-	private IStorageManagerInterface storageManager = new TestStorageManagerInterface();
-	private IIndexRegistryProvider<IIndex> indexRegistryProvider = new TestIndexRegistryProvider();
-	private IIndexDataflowHelperFactory dataflowHelperFactory = new BTreeDataflowHelperFactory();
+    private IStorageManagerInterface storageManager = new TestStorageManagerInterface();
+    private IIndexRegistryProvider<IIndex> indexRegistryProvider = new TestIndexRegistryProvider();
+    private IIndexDataflowHelperFactory dataflowHelperFactory = new BTreeDataflowHelperFactory();
 
-	private final static SimpleDateFormat simpleDateFormat = new SimpleDateFormat(
-			"ddMMyy-hhmmssSS");
-	private final static String sep = System.getProperty("file.separator");
+    private final static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("ddMMyy-hhmmssSS");
+    private final static String sep = System.getProperty("file.separator");
 
-	// field, type and key declarations for primary index
-	private int primaryFieldCount = 6;
-	private ITypeTrait[] primaryTypeTraits = new ITypeTrait[primaryFieldCount];
-	private int primaryKeyFieldCount = 1;
-	private IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[primaryKeyFieldCount];
-	private TypeAwareTupleWriterFactory primaryTupleWriterFactory = new TypeAwareTupleWriterFactory(
-			primaryTypeTraits);
-	private ITreeIndexFrameFactory primaryInteriorFrameFactory = new BTreeNSMInteriorFrameFactory(
-			primaryTupleWriterFactory);
-	private ITreeIndexFrameFactory primaryLeafFrameFactory = new BTreeNSMLeafFrameFactory(
-			primaryTupleWriterFactory);
+    // field, type and key declarations for primary index
+    private int primaryFieldCount = 6;
+    private ITypeTraits[] primaryTypeTraits = new ITypeTraits[primaryFieldCount];
+    private int primaryKeyFieldCount = 1;
+    private IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[primaryKeyFieldCount];
+    private TypeAwareTupleWriterFactory primaryTupleWriterFactory = new TypeAwareTupleWriterFactory(primaryTypeTraits);
+    private ITreeIndexFrameFactory primaryInteriorFrameFactory = new BTreeNSMInteriorFrameFactory(
+            primaryTupleWriterFactory);
+    private ITreeIndexFrameFactory primaryLeafFrameFactory = new BTreeNSMLeafFrameFactory(primaryTupleWriterFactory);
 
-	private static String primaryBtreeName = "primary"
-			+ simpleDateFormat.format(new Date());
-	private static String primaryFileName = System
-			.getProperty("java.io.tmpdir") + sep + primaryBtreeName;
+    private static String primaryBtreeName = "primary" + simpleDateFormat.format(new Date());
+    private static String primaryFileName = System.getProperty("java.io.tmpdir") + sep + primaryBtreeName;
 
-	private IFileSplitProvider primaryBtreeSplitProvider = new ConstantFileSplitProvider(
-			new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
-					primaryFileName))) });
+    private IFileSplitProvider primaryBtreeSplitProvider = new ConstantFileSplitProvider(
+            new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(primaryFileName))) });
 
-	private RecordDescriptor primaryRecDesc = new RecordDescriptor(
-			new ISerializerDeserializer[] {
-					UTF8StringSerializerDeserializer.INSTANCE,
-					UTF8StringSerializerDeserializer.INSTANCE,
-					UTF8StringSerializerDeserializer.INSTANCE,
-					UTF8StringSerializerDeserializer.INSTANCE,
-					UTF8StringSerializerDeserializer.INSTANCE,
-					UTF8StringSerializerDeserializer.INSTANCE });
+    private RecordDescriptor primaryRecDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
 
-	// field, type and key declarations for secondary indexes
-	private int secondaryFieldCount = 2;
-	private ITypeTrait[] secondaryTypeTraits = new ITypeTrait[secondaryFieldCount];
-	private int secondaryKeyFieldCount = 2;
-	private IBinaryComparatorFactory[] secondaryComparatorFactories = new IBinaryComparatorFactory[secondaryKeyFieldCount];
-	private TypeAwareTupleWriterFactory secondaryTupleWriterFactory = new TypeAwareTupleWriterFactory(
-			secondaryTypeTraits);
-	private ITreeIndexFrameFactory secondaryInteriorFrameFactory = new BTreeNSMInteriorFrameFactory(
-			secondaryTupleWriterFactory);
-	private ITreeIndexFrameFactory secondaryLeafFrameFactory = new BTreeNSMLeafFrameFactory(
-			secondaryTupleWriterFactory);
+    // field, type and key declarations for secondary indexes
+    private int secondaryFieldCount = 2;
+    private ITypeTraits[] secondaryTypeTraits = new ITypeTraits[secondaryFieldCount];
+    private int secondaryKeyFieldCount = 2;
+    private IBinaryComparatorFactory[] secondaryComparatorFactories = new IBinaryComparatorFactory[secondaryKeyFieldCount];
+    private TypeAwareTupleWriterFactory secondaryTupleWriterFactory = new TypeAwareTupleWriterFactory(
+            secondaryTypeTraits);
+    private ITreeIndexFrameFactory secondaryInteriorFrameFactory = new BTreeNSMInteriorFrameFactory(
+            secondaryTupleWriterFactory);
+    private ITreeIndexFrameFactory secondaryLeafFrameFactory = new BTreeNSMLeafFrameFactory(secondaryTupleWriterFactory);
 
-	private static String secondaryBtreeName = "secondary"
-			+ simpleDateFormat.format(new Date());
-	private static String secondaryFileName = System
-			.getProperty("java.io.tmpdir") + sep + secondaryBtreeName;
+    private static String secondaryBtreeName = "secondary" + simpleDateFormat.format(new Date());
+    private static String secondaryFileName = System.getProperty("java.io.tmpdir") + sep + secondaryBtreeName;
 
-	private IFileSplitProvider secondaryBtreeSplitProvider = new ConstantFileSplitProvider(
-			new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
-					secondaryFileName))) });
+    private IFileSplitProvider secondaryBtreeSplitProvider = new ConstantFileSplitProvider(
+            new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(secondaryFileName))) });
 
-	private RecordDescriptor secondaryRecDesc = new RecordDescriptor(
-			new ISerializerDeserializer[] {
-					UTF8StringSerializerDeserializer.INSTANCE,
-					UTF8StringSerializerDeserializer.INSTANCE });
+    private RecordDescriptor secondaryRecDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
 
-	@Before
-	public void setup() throws Exception {
-		// field, type and key declarations for primary index
-		primaryTypeTraits[0] = ITypeTrait.VARLEN_TYPE_TRAIT;
-		primaryTypeTraits[1] = ITypeTrait.VARLEN_TYPE_TRAIT;
-		primaryTypeTraits[2] = ITypeTrait.VARLEN_TYPE_TRAIT;
-		primaryTypeTraits[3] = ITypeTrait.VARLEN_TYPE_TRAIT;
-		primaryTypeTraits[4] = ITypeTrait.VARLEN_TYPE_TRAIT;
-		primaryTypeTraits[5] = ITypeTrait.VARLEN_TYPE_TRAIT;
-		primaryComparatorFactories[0] = UTF8StringBinaryComparatorFactory.INSTANCE;
+    @Before
+    public void setup() throws Exception {
+        // field, type and key declarations for primary index
+        primaryTypeTraits[0] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[1] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[2] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[3] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[4] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[5] = UTF8StringPointable.TYPE_TRAITS;
+        primaryComparatorFactories[0] = PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY);
 
-		// field, type and key declarations for secondary indexes
-		secondaryTypeTraits[0] = ITypeTrait.VARLEN_TYPE_TRAIT;
-		secondaryTypeTraits[1] = ITypeTrait.VARLEN_TYPE_TRAIT;
-		secondaryComparatorFactories[0] = UTF8StringBinaryComparatorFactory.INSTANCE;
-		secondaryComparatorFactories[1] = UTF8StringBinaryComparatorFactory.INSTANCE;
+        // field, type and key declarations for secondary indexes
+        secondaryTypeTraits[0] = UTF8StringPointable.TYPE_TRAITS;
+        secondaryTypeTraits[1] = UTF8StringPointable.TYPE_TRAITS;
+        secondaryComparatorFactories[0] = PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY);
+        secondaryComparatorFactories[1] = PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY);
 
-		loadPrimaryIndexTest();
-		loadSecondaryIndexTest();
-	}
+        loadPrimaryIndexTest();
+        loadSecondaryIndexTest();
+    }
 
-	public void loadPrimaryIndexTest() throws Exception {
-		JobSpecification spec = new JobSpecification();
+    public void loadPrimaryIndexTest() throws Exception {
+        JobSpecification spec = new JobSpecification();
 
-		FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC1_ID,
-				new FileReference(new File("data/tpch0.001/orders-part1.tbl"))) };
-		IFileSplitProvider ordersSplitProvider = new ConstantFileSplitProvider(
-				ordersSplits);
-		RecordDescriptor ordersDesc = new RecordDescriptor(
-				new ISerializerDeserializer[] {
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE });
+        FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
+                "data/tpch0.001/orders-part1.tbl"))) };
+        IFileSplitProvider ordersSplitProvider = new ConstantFileSplitProvider(ordersSplits);
+        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
 
-		FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(
-				spec, ordersSplitProvider, new DelimitedDataTupleParserFactory(
-						new IValueParserFactory[] {
-								UTF8StringParserFactory.INSTANCE,
-								UTF8StringParserFactory.INSTANCE,
-								UTF8StringParserFactory.INSTANCE,
-								UTF8StringParserFactory.INSTANCE,
-								UTF8StringParserFactory.INSTANCE,
-								UTF8StringParserFactory.INSTANCE,
-								UTF8StringParserFactory.INSTANCE,
-								UTF8StringParserFactory.INSTANCE,
-								UTF8StringParserFactory.INSTANCE }, '|'),
-				ordersDesc);
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-				ordScanner, NC1_ID);
+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID);
 
-		ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(
-				spec,
-				1000,
-				new int[] { 0 },
-				new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-				ordersDesc);
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter,
-				NC1_ID);
+        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, 1000, new int[] { 0 },
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                ordersDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID);
 
-		int[] fieldPermutation = { 0, 1, 2, 4, 5, 7 };
-		TreeIndexBulkLoadOperatorDescriptor primaryBtreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(
-				spec, storageManager, indexRegistryProvider,
-				primaryBtreeSplitProvider, primaryInteriorFrameFactory,
-				primaryLeafFrameFactory, primaryTypeTraits,
-				primaryComparatorFactories, fieldPermutation, 0.7f,
-				dataflowHelperFactory);
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-				primaryBtreeBulkLoad, NC1_ID);
+        int[] fieldPermutation = { 0, 1, 2, 4, 5, 7 };
+        TreeIndexBulkLoadOperatorDescriptor primaryBtreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
+                storageManager, indexRegistryProvider, primaryBtreeSplitProvider, primaryInteriorFrameFactory,
+                primaryLeafFrameFactory, primaryTypeTraits, primaryComparatorFactories, fieldPermutation, 0.7f,
+                dataflowHelperFactory);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeBulkLoad, NC1_ID);
 
-		spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0,
-				sorter, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
 
-		spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0,
-				primaryBtreeBulkLoad, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0, primaryBtreeBulkLoad, 0);
 
-		spec.addRoot(primaryBtreeBulkLoad);
-		runTest(spec);
-	}
+        spec.addRoot(primaryBtreeBulkLoad);
+        runTest(spec);
+    }
 
-	public void loadSecondaryIndexTest() throws Exception {
-		JobSpecification spec = new JobSpecification();
+    public void loadSecondaryIndexTest() throws Exception {
+        JobSpecification spec = new JobSpecification();
 
-		// build dummy tuple containing nothing
-		ArrayTupleBuilder tb = new ArrayTupleBuilder(primaryKeyFieldCount * 2);
-		DataOutput dos = tb.getDataOutput();
+        // build dummy tuple containing nothing
+        ArrayTupleBuilder tb = new ArrayTupleBuilder(primaryKeyFieldCount * 2);
+        DataOutput dos = tb.getDataOutput();
 
-		tb.reset();
-		UTF8StringSerializerDeserializer.INSTANCE.serialize("0", dos);
-		tb.addFieldEndOffset();
+        tb.reset();
+        UTF8StringSerializerDeserializer.INSTANCE.serialize("0", dos);
+        tb.addFieldEndOffset();
 
-		ISerializerDeserializer[] keyRecDescSers = {
-				UTF8StringSerializerDeserializer.INSTANCE,
-				UTF8StringSerializerDeserializer.INSTANCE };
-		RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+        ISerializerDeserializer[] keyRecDescSers = { UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE };
+        RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
 
-		ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(
-				spec, keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(),
-				tb.getSize());
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-				keyProviderOp, NC1_ID);
+        ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(spec,
+                keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, keyProviderOp, NC1_ID);
 
-		int[] lowKeyFields = null; // - infinity
-		int[] highKeyFields = null; // + infinity
+        int[] lowKeyFields = null; // - infinity
+        int[] highKeyFields = null; // + infinity
 
-		// scan primary index
-		BTreeSearchOperatorDescriptor primaryBtreeSearchOp = new BTreeSearchOperatorDescriptor(
-				spec, primaryRecDesc, storageManager,
-				indexRegistryProvider, primaryBtreeSplitProvider,
-				primaryInteriorFrameFactory, primaryLeafFrameFactory,
-				primaryTypeTraits, primaryComparatorFactories, true,
-				lowKeyFields, highKeyFields, true, true, dataflowHelperFactory);
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-				primaryBtreeSearchOp, NC1_ID);
+        // scan primary index
+        BTreeSearchOperatorDescriptor primaryBtreeSearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
+                storageManager, indexRegistryProvider, primaryBtreeSplitProvider, primaryInteriorFrameFactory,
+                primaryLeafFrameFactory, primaryTypeTraits, primaryComparatorFactories, true, lowKeyFields,
+                highKeyFields, true, true, dataflowHelperFactory);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeSearchOp, NC1_ID);
 
-		// sort based on secondary keys
-		ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(
-				spec,
-				1000,
-				new int[] { 3, 0 },
-				new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-				primaryRecDesc);
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter,
-				NC1_ID);
+        // sort based on secondary keys
+        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, 1000, new int[] { 3, 0 },
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                primaryRecDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID);
 
-		// load secondary index
-		int[] fieldPermutation = { 3, 0 };
-		TreeIndexBulkLoadOperatorDescriptor secondaryBtreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(
-				spec, storageManager, indexRegistryProvider,
-				secondaryBtreeSplitProvider, secondaryInteriorFrameFactory,
-				secondaryLeafFrameFactory, secondaryTypeTraits,
-				secondaryComparatorFactories, fieldPermutation, 0.7f,
-				dataflowHelperFactory);
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-				secondaryBtreeBulkLoad, NC1_ID);
+        // load secondary index
+        int[] fieldPermutation = { 3, 0 };
+        TreeIndexBulkLoadOperatorDescriptor secondaryBtreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
+                storageManager, indexRegistryProvider, secondaryBtreeSplitProvider, secondaryInteriorFrameFactory,
+                secondaryLeafFrameFactory, secondaryTypeTraits, secondaryComparatorFactories, fieldPermutation, 0.7f,
+                dataflowHelperFactory);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryBtreeBulkLoad, NC1_ID);
 
-		spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0,
-				primaryBtreeSearchOp, 0);
-		spec.connect(new OneToOneConnectorDescriptor(spec),
-				primaryBtreeSearchOp, 0, sorter, 0);
-		spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0,
-				secondaryBtreeBulkLoad, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryBtreeSearchOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), primaryBtreeSearchOp, 0, sorter, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0, secondaryBtreeBulkLoad, 0);
 
-		spec.addRoot(secondaryBtreeBulkLoad);
-		runTest(spec);
-	}
+        spec.addRoot(secondaryBtreeBulkLoad);
+        runTest(spec);
+    }
 
-	@Test
-	public void searchSecondaryIndexTest() throws Exception {
-		JobSpecification spec = new JobSpecification();
+    @Test
+    public void searchSecondaryIndexTest() throws Exception {
+        JobSpecification spec = new JobSpecification();
 
-		// build tuple containing search keys (only use the first key as search
-		// key)
-		ArrayTupleBuilder tb = new ArrayTupleBuilder(secondaryKeyFieldCount);
-		DataOutput dos = tb.getDataOutput();
+        // build tuple containing search keys (only use the first key as search
+        // key)
+        ArrayTupleBuilder tb = new ArrayTupleBuilder(secondaryKeyFieldCount);
+        DataOutput dos = tb.getDataOutput();
 
-		tb.reset();
-		// low key
-		UTF8StringSerializerDeserializer.INSTANCE.serialize("1998-07-21", dos);
-		tb.addFieldEndOffset();
-		// high key
-		UTF8StringSerializerDeserializer.INSTANCE.serialize("2000-10-18", dos);
-		tb.addFieldEndOffset();
+        tb.reset();
+        // low key
+        UTF8StringSerializerDeserializer.INSTANCE.serialize("1998-07-21", dos);
+        tb.addFieldEndOffset();
+        // high key
+        UTF8StringSerializerDeserializer.INSTANCE.serialize("2000-10-18", dos);
+        tb.addFieldEndOffset();
 
-		ISerializerDeserializer[] keyRecDescSers = {
-				UTF8StringSerializerDeserializer.INSTANCE,
-				UTF8StringSerializerDeserializer.INSTANCE };
-		RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+        ISerializerDeserializer[] keyRecDescSers = { UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE };
+        RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
 
-		ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(
-				spec, keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(),
-				tb.getSize());
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-				keyProviderOp, NC1_ID);
+        ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(spec,
+                keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, keyProviderOp, NC1_ID);
 
-		int[] secondaryLowKeyFields = { 0 };
-		int[] secondaryHighKeyFields = { 1 };
+        int[] secondaryLowKeyFields = { 0 };
+        int[] secondaryHighKeyFields = { 1 };
 
-		// search secondary index
-		BTreeSearchOperatorDescriptor secondaryBtreeSearchOp = new BTreeSearchOperatorDescriptor(
-				spec, secondaryRecDesc, storageManager,
-				indexRegistryProvider, secondaryBtreeSplitProvider,
-				secondaryInteriorFrameFactory, secondaryLeafFrameFactory,
-				secondaryTypeTraits, secondaryComparatorFactories, true,
-				secondaryLowKeyFields, secondaryHighKeyFields, true, true,
-				dataflowHelperFactory);
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-				secondaryBtreeSearchOp, NC1_ID);
+        // search secondary index
+        BTreeSearchOperatorDescriptor secondaryBtreeSearchOp = new BTreeSearchOperatorDescriptor(spec,
+                secondaryRecDesc, storageManager, indexRegistryProvider, secondaryBtreeSplitProvider,
+                secondaryInteriorFrameFactory, secondaryLeafFrameFactory, secondaryTypeTraits,
+                secondaryComparatorFactories, true, secondaryLowKeyFields, secondaryHighKeyFields, true, true,
+                dataflowHelperFactory);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryBtreeSearchOp, NC1_ID);
 
-		int[] primaryLowKeyFields = { 1 }; // second field from the tuples
-		// coming from secondary index
-		int[] primaryHighKeyFields = { 1 }; // second field from the tuples
-		// coming from secondary index
+        int[] primaryLowKeyFields = { 1 }; // second field from the tuples
+        // coming from secondary index
+        int[] primaryHighKeyFields = { 1 }; // second field from the tuples
+        // coming from secondary index
 
-		// search primary index
-		BTreeSearchOperatorDescriptor primaryBtreeSearchOp = new BTreeSearchOperatorDescriptor(
-				spec, primaryRecDesc, storageManager,
-				indexRegistryProvider, primaryBtreeSplitProvider,
-				primaryInteriorFrameFactory, primaryLeafFrameFactory,
-				primaryTypeTraits, primaryComparatorFactories, true,
-				primaryLowKeyFields, primaryHighKeyFields, true, true,
-				dataflowHelperFactory);
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-				primaryBtreeSearchOp, NC1_ID);
+        // search primary index
+        BTreeSearchOperatorDescriptor primaryBtreeSearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
+                storageManager, indexRegistryProvider, primaryBtreeSplitProvider, primaryInteriorFrameFactory,
+                primaryLeafFrameFactory, primaryTypeTraits, primaryComparatorFactories, true, primaryLowKeyFields,
+                primaryHighKeyFields, true, true, dataflowHelperFactory);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeSearchOp, NC1_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
                 createTempFile().getAbsolutePath()) });
         IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-				NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
-		spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0,
-				secondaryBtreeSearchOp, 0);
-		spec.connect(new OneToOneConnectorDescriptor(spec),
-				secondaryBtreeSearchOp, 0, primaryBtreeSearchOp, 0);
-		spec.connect(new OneToOneConnectorDescriptor(spec),
-				primaryBtreeSearchOp, 0, printer, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, secondaryBtreeSearchOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), secondaryBtreeSearchOp, 0, primaryBtreeSearchOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), primaryBtreeSearchOp, 0, printer, 0);
 
-		spec.addRoot(printer);
-		runTest(spec);
-	}
+        spec.addRoot(printer);
+        runTest(spec);
+    }
 
-	@AfterClass
-	public static void cleanup() throws Exception {
-		File primary = new File(primaryFileName);
-		primary.deleteOnExit();
-		File secondary = new File(secondaryFileName);
-		secondary.deleteOnExit();
-	}
+    @AfterClass
+    public static void cleanup() throws Exception {
+        File primary = new File(primaryFileName);
+        primary.deleteOnExit();
+        File secondary = new File(secondaryFileName);
+        secondary.deleteOnExit();
+    }
 }
\ No newline at end of file
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
index abedf4a..fac141c 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -28,7 +28,7 @@
 import org.junit.Rule;
 import org.junit.rules.TemporaryFolder;
 
-import edu.uci.ics.hyracks.api.client.HyracksLocalConnection;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
@@ -61,7 +61,10 @@
     @BeforeClass
     public static void init() throws Exception {
         CCConfig ccConfig = new CCConfig();
-        ccConfig.port = 39001;
+        ccConfig.clientNetIpAddress = "127.0.0.1";
+        ccConfig.clientNetPort = 39000;
+        ccConfig.clusterNetIpAddress = "127.0.0.1";
+        ccConfig.clusterNetPort = 39001;
         ccConfig.profileDumpPeriod = 10000;
         File outDir = new File("target/ClusterController");
         outDir.mkdirs();
@@ -75,6 +78,7 @@
         NCConfig ncConfig1 = new NCConfig();
         ncConfig1.ccHost = "localhost";
         ncConfig1.ccPort = 39001;
+        ncConfig1.clusterNetIPAddress = "127.0.0.1";
         ncConfig1.dataIPAddress = "127.0.0.1";
         ncConfig1.nodeId = NC1_ID;
         nc1 = new NodeControllerService(ncConfig1);
@@ -83,12 +87,13 @@
         NCConfig ncConfig2 = new NCConfig();
         ncConfig2.ccHost = "localhost";
         ncConfig2.ccPort = 39001;
+        ncConfig2.clusterNetIPAddress = "127.0.0.1";
         ncConfig2.dataIPAddress = "127.0.0.1";
         ncConfig2.nodeId = NC2_ID;
         nc2 = new NodeControllerService(ncConfig2);
         nc2.start();
 
-        hcc = new HyracksLocalConnection(cc);
+        hcc = new HyracksConnection(ccConfig.clientNetIpAddress, ccConfig.clientNetPort);
         hcc.createApplication("test", null);
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Starting CC in " + ccRoot.getAbsolutePath());
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
index 042c919..1c5c014 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
@@ -27,8 +27,9 @@
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.hash.UTF8StringBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.FloatSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
@@ -59,61 +60,40 @@
 import edu.uci.ics.hyracks.dataflow.std.group.aggregators.IntSumFieldAggregatorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.aggregators.MinMaxStringFieldAggregatorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 
 /**
  *
  */
 public class AggregationTests extends AbstractIntegrationTest {
 
-    final IFileSplitProvider splitProvider = new ConstantFileSplitProvider(
-            new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
-             "data/tpch0.001/lineitem.tbl"))) });
+    final IFileSplitProvider splitProvider = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC2_ID,
+            new FileReference(new File("data/tpch0.001/lineitem.tbl"))) });
 
-    final RecordDescriptor desc = new RecordDescriptor(
-            new ISerializerDeserializer[] {
-                    UTF8StringSerializerDeserializer.INSTANCE,
-                    IntegerSerializerDeserializer.INSTANCE,
-                    IntegerSerializerDeserializer.INSTANCE,
-                    IntegerSerializerDeserializer.INSTANCE,
-                    IntegerSerializerDeserializer.INSTANCE,
-                    FloatSerializerDeserializer.INSTANCE,
-                    FloatSerializerDeserializer.INSTANCE,
-                    FloatSerializerDeserializer.INSTANCE,
-                    UTF8StringSerializerDeserializer.INSTANCE,
-                    UTF8StringSerializerDeserializer.INSTANCE,
-                    UTF8StringSerializerDeserializer.INSTANCE,
-                    UTF8StringSerializerDeserializer.INSTANCE,
-                    UTF8StringSerializerDeserializer.INSTANCE,
-                    UTF8StringSerializerDeserializer.INSTANCE,
-                    UTF8StringSerializerDeserializer.INSTANCE,
-                    UTF8StringSerializerDeserializer.INSTANCE });
+    final RecordDescriptor desc = new RecordDescriptor(new ISerializerDeserializer[] {
+            UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+            IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+            IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+            FloatSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
 
-    final ITupleParserFactory tupleParserFactory = new DelimitedDataTupleParserFactory(
-            new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
-                    IntegerParserFactory.INSTANCE,
-                    IntegerParserFactory.INSTANCE,
-                    IntegerParserFactory.INSTANCE,
-                    IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
-                    FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
-                    UTF8StringParserFactory.INSTANCE,
-                    UTF8StringParserFactory.INSTANCE,
-                    UTF8StringParserFactory.INSTANCE,
-                    UTF8StringParserFactory.INSTANCE,
-                    UTF8StringParserFactory.INSTANCE,
-                    UTF8StringParserFactory.INSTANCE,
-                    UTF8StringParserFactory.INSTANCE,
-                    UTF8StringParserFactory.INSTANCE, }, '|');
+    final ITupleParserFactory tupleParserFactory = new DelimitedDataTupleParserFactory(new IValueParserFactory[] {
+            UTF8StringParserFactory.INSTANCE, IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
+            IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
+            FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+            UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+            UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+            UTF8StringParserFactory.INSTANCE, }, '|');
 
-    private AbstractSingleActivityOperatorDescriptor getPrinter(
-            JobSpecification spec, String prefix) throws IOException {
+    private AbstractSingleActivityOperatorDescriptor getPrinter(JobSpecification spec, String prefix)
+            throws IOException {
 
-        AbstractSingleActivityOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(
-                spec, new ConstantFileSplitProvider(new FileSplit[] {
-                        new FileSplit(NC1_ID, createTempFile()
-                                .getAbsolutePath()),
-                        new FileSplit(NC2_ID, createTempFile()
-                                .getAbsolutePath()) }), "\t");
+        AbstractSingleActivityOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec,
+                new ConstantFileSplitProvider(new FileSplit[] {
+                        new FileSplit(NC1_ID, createTempFile().getAbsolutePath()),
+                        new FileSplit(NC2_ID, createTempFile().getAbsolutePath()) }), "\t");
 
         return printer;
     }
@@ -122,49 +102,38 @@
     public void singleKeySumInmemGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+                desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 0 };
         int tableSize = 8;
 
-        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
-                spec,
-                keyFields,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-                new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] {
-                                new IntSumFieldAggregatorFactory(1, true),
-                                new IntSumFieldAggregatorFactory(3, true) }),
+        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(spec, keyFields,
+                new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }),
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                        new IntSumFieldAggregatorFactory(1, true), new IntSumFieldAggregatorFactory(3, true) }),
                 outputRec, tableSize);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-                spec,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                "singleKeySumInmemGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeySumInmemGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -177,64 +146,34 @@
     public void singleKeySumPreClusterGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
-
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, NC2_ID);
-
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE });
-
-        int[] keyFields = new int[] { 0 };
-        
-        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(
-                spec,
-                4,
-                keyFields,
-                null,
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
                 desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
 
-        IConnectorDescriptor conn0 = new MToNPartitioningConnectorDescriptor(
-                spec,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-        
-        spec.connect(conn0, csvScanner, 0, sorter, 0);
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE });
 
-        PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
-                spec,
-                keyFields,
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-                new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] {
-                                new IntSumFieldAggregatorFactory(1, true),
-                                new IntSumFieldAggregatorFactory(3, true) }),
+        int[] keyFields = new int[] { 0 };
+
+        PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                        new IntSumFieldAggregatorFactory(1, true), new IntSumFieldAggregatorFactory(3, true) }),
                 outputRec);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-                spec,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-        spec.connect(conn1, sorter, 0, grouper, 0);
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
+        spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                "singleKeySumInmemGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeySumInmemGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -247,58 +186,41 @@
     public void singleKeySumExtGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+                desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 0 };
         int frameLimits = 4;
         int tableSize = 8;
 
-        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
-                spec,
-                keyFields,
-                frameLimits,
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-                new UTF8StringNormalizedKeyComputerFactory(),
-                new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] {
-                                new IntSumFieldAggregatorFactory(1, false),
-                                new IntSumFieldAggregatorFactory(3, false) }),
-                new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] {
-                                new IntSumFieldAggregatorFactory(1, false),
-                                new IntSumFieldAggregatorFactory(2, false) }),
-                outputRec,
-                new HashSpillableTableFactory(
-                        new FieldHashPartitionComputerFactory(
-                                keyFields,
-                                new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-                        tableSize), true);
+        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                new UTF8StringNormalizedKeyComputerFactory(), new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
+                                new IntSumFieldAggregatorFactory(3, false) }), new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
+                                new IntSumFieldAggregatorFactory(2, false) }), outputRec,
+                new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }), tableSize), true);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-                spec,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                "singleKeySumExtGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeySumExtGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -311,51 +233,38 @@
     public void singleKeyAvgInmemGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+                desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        FloatSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 0 };
         int tableSize = 8;
 
-        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
-                spec,
-                keyFields,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-                new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] {
-                                new IntSumFieldAggregatorFactory(1, true),
-                                new CountFieldAggregatorFactory(true),
-                                new AvgFieldGroupAggregatorFactory(1, true) }),
-                outputRec, tableSize);
+        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(spec, keyFields,
+                new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }),
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                        new IntSumFieldAggregatorFactory(1, true), new CountFieldAggregatorFactory(true),
+                        new AvgFieldGroupAggregatorFactory(1, true) }), outputRec, tableSize);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-                spec,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                "singleKeyAvgInmemGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeyAvgInmemGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -368,66 +277,34 @@
     public void singleKeyAvgPreClusterGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+                desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        FloatSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 0 };
 
-        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(
-                spec,
-                4,
-                keyFields,
-                null,
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-                desc);
+        PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                        new IntSumFieldAggregatorFactory(1, true), new CountFieldAggregatorFactory(true),
+                        new AvgFieldGroupAggregatorFactory(1, true) }), outputRec);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn0 = new MToNPartitioningConnectorDescriptor(
-                spec,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-        
-        spec.connect(conn0, csvScanner, 0, sorter, 0);
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
+        spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
-                spec,
-                keyFields,
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-                new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] {
-                                new IntSumFieldAggregatorFactory(1, true),
-                                new CountFieldAggregatorFactory(true),
-                                new AvgFieldGroupAggregatorFactory(1, true) }),
-                outputRec);
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeyAvgInmemGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                NC2_ID, NC1_ID);
-
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-                spec,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-        spec.connect(conn1, sorter, 0, grouper, 0);
-
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                "singleKeyAvgInmemGroupTest");
-
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -440,61 +317,43 @@
     public void singleKeyAvgExtGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+                desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        FloatSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 0 };
         int frameLimits = 4;
         int tableSize = 8;
 
-        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
-                spec,
-                keyFields,
-                frameLimits,
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
                 new UTF8StringNormalizedKeyComputerFactory(),
-                new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] {
-                                new IntSumFieldAggregatorFactory(1, false),
-                                new CountFieldAggregatorFactory(false),
-                                new AvgFieldGroupAggregatorFactory(1, false) }),
-                new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] {
-                                new IntSumFieldAggregatorFactory(1, false),
+                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                        new IntSumFieldAggregatorFactory(1, false), new CountFieldAggregatorFactory(false),
+                        new AvgFieldGroupAggregatorFactory(1, false) }), new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
                                 new IntSumFieldAggregatorFactory(2, false),
-                                new AvgFieldMergeAggregatorFactory(3, false) }),
-                outputRec,
-                new HashSpillableTableFactory(
-                        new FieldHashPartitionComputerFactory(
-                                keyFields,
-                                new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-                        tableSize), true);
+                                new AvgFieldMergeAggregatorFactory(3, false) }), outputRec,
+                new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }), tableSize), true);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-                spec,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                "singleKeyAvgExtGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeyAvgExtGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -507,49 +366,38 @@
     public void singleKeyMinMaxStringInmemGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+                desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 0 };
         int tableSize = 8;
 
-        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
-                spec,
-                keyFields,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-                new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] {
-                                new IntSumFieldAggregatorFactory(1, true),
-                                new MinMaxStringFieldAggregatorFactory(15,
-                                        true, false) }), outputRec, tableSize);
+        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(spec, keyFields,
+                new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }),
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                        new IntSumFieldAggregatorFactory(1, true),
+                        new MinMaxStringFieldAggregatorFactory(15, true, false) }), outputRec, tableSize);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-                spec,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                "singleKeyAvgInmemGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeyAvgInmemGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -562,64 +410,34 @@
     public void singleKeyMinMaxStringPreClusterGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
-
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, NC2_ID);
-
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE });
-
-        int[] keyFields = new int[] { 0 };
-        
-        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(
-                spec,
-                4,
-                keyFields,
-                null,
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
                 desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
 
-        IConnectorDescriptor conn0 = new MToNPartitioningConnectorDescriptor(
-                spec,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-        
-        spec.connect(conn0, csvScanner, 0, sorter, 0);
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
 
-        PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
-                spec,
-                keyFields,
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-                new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] {
-                                new IntSumFieldAggregatorFactory(1, true),
-                                new MinMaxStringFieldAggregatorFactory(15,
-                                        true, false) }), outputRec);
+        int[] keyFields = new int[] { 0 };
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                NC2_ID, NC1_ID);
+        PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                        new IntSumFieldAggregatorFactory(1, true),
+                        new MinMaxStringFieldAggregatorFactory(15, true, false) }), outputRec);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-                spec,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-        spec.connect(conn1, sorter, 0, grouper, 0);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                "singleKeyAvgInmemGroupTest");
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
+        spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                NC2_ID, NC1_ID);
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeyAvgInmemGroupTest");
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -632,60 +450,42 @@
     public void singleKeyMinMaxStringExtGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+                desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 0 };
         int frameLimits = 4;
         int tableSize = 8;
 
-        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
-                spec,
-                keyFields,
-                frameLimits,
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-                new UTF8StringNormalizedKeyComputerFactory(),
-                new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] {
-                                new IntSumFieldAggregatorFactory(1, false),
-                                new MinMaxStringFieldAggregatorFactory(15,
-                                        true, true) }),
-                new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] {
-                                new IntSumFieldAggregatorFactory(1, false),
-                                new MinMaxStringFieldAggregatorFactory(2, true,
-                                        true) }),
-                outputRec,
-                new HashSpillableTableFactory(
-                        new FieldHashPartitionComputerFactory(
-                                keyFields,
-                                new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-                        tableSize), true);
+        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                new UTF8StringNormalizedKeyComputerFactory(), new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
+                                new MinMaxStringFieldAggregatorFactory(15, true, true) }),
+                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                        new IntSumFieldAggregatorFactory(1, false),
+                        new MinMaxStringFieldAggregatorFactory(2, true, true) }), outputRec,
+                new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }), tableSize), true);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-                spec,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                "singleKeyAvgExtGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeyAvgExtGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -698,53 +498,39 @@
     public void multiKeySumInmemGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+                desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 8, 0 };
         int tableSize = 8;
 
-        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
-                spec,
-                keyFields,
-                new FieldHashPartitionComputerFactory(keyFields,
-                        new IBinaryHashFunctionFactory[] {
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE,
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-                new IBinaryComparatorFactory[] {
-                        UTF8StringBinaryComparatorFactory.INSTANCE,
-                        UTF8StringBinaryComparatorFactory.INSTANCE },
-                new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] {
-                                new IntSumFieldAggregatorFactory(1, true),
-                                new IntSumFieldAggregatorFactory(3, true) }),
+        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(spec, keyFields,
+                new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }),
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                        new IntSumFieldAggregatorFactory(1, true), new IntSumFieldAggregatorFactory(3, true) }),
                 outputRec, tableSize);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-                spec, new FieldHashPartitionComputerFactory(keyFields,
-                        new IBinaryHashFunctionFactory[] {
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE,
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                "multiKeySumInmemGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeySumInmemGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -757,67 +543,35 @@
     public void multiKeySumPreClusterGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
-
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, NC2_ID);
-
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE });
-
-        int[] keyFields = new int[] { 8, 0 };
-        
-        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(
-                spec,
-                4,
-                keyFields,
-                null,
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE,
-                        UTF8StringBinaryComparatorFactory.INSTANCE},
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
                 desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
 
-        IConnectorDescriptor conn0 = new MToNPartitioningConnectorDescriptor(
-                spec,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE,
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE}));
-        
-        spec.connect(conn0, csvScanner, 0, sorter, 0);
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
 
-        PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
-                spec, keyFields, new IBinaryComparatorFactory[] {
-                        UTF8StringBinaryComparatorFactory.INSTANCE,
-                        UTF8StringBinaryComparatorFactory.INSTANCE },
-                new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] {
-                                new IntSumFieldAggregatorFactory(1, true),
-                                new IntSumFieldAggregatorFactory(3, true) }),
+        int[] keyFields = new int[] { 8, 0 };
+
+        PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                        new IntSumFieldAggregatorFactory(1, true), new IntSumFieldAggregatorFactory(3, true) }),
                 outputRec);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-                spec, new FieldHashPartitionComputerFactory(keyFields,
-                        new IBinaryHashFunctionFactory[] {
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE,
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-        spec.connect(conn1, sorter, 0, grouper, 0);
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
+        spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                "multiKeySumInmemGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeySumInmemGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -830,64 +584,43 @@
     public void multiKeySumExtGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+                desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 8, 0 };
-        int[] keys = new int[] { 0, 1 };
         int frameLimits = 4;
         int tableSize = 8;
 
-        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
-                spec,
-                keyFields,
-                frameLimits,
-                new IBinaryComparatorFactory[] {
-                        UTF8StringBinaryComparatorFactory.INSTANCE,
-                        UTF8StringBinaryComparatorFactory.INSTANCE },
-                new UTF8StringNormalizedKeyComputerFactory(),
-                new MultiFieldsAggregatorFactory(keyFields,
-                        new IFieldAggregateDescriptorFactory[] {
-                                new IntSumFieldAggregatorFactory(1, false),
-                                new IntSumFieldAggregatorFactory(3, false) }),
-                new MultiFieldsAggregatorFactory(keys,
-                        new IFieldAggregateDescriptorFactory[] {
-                                new IntSumFieldAggregatorFactory(2, false),
-                                new IntSumFieldAggregatorFactory(3, false) }),
-                outputRec,
-                new HashSpillableTableFactory(
-                        new FieldHashPartitionComputerFactory(
-                                keyFields,
-                                new IBinaryHashFunctionFactory[] {
-                                        UTF8StringBinaryHashFunctionFactory.INSTANCE,
-                                        UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-                        tableSize), true);
-
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                NC2_ID, NC1_ID);
-
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-                spec, new FieldHashPartitionComputerFactory(keyFields,
+        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                new UTF8StringNormalizedKeyComputerFactory(), new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
+                                new IntSumFieldAggregatorFactory(3, false) }), new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(2, false),
+                                new IntSumFieldAggregatorFactory(3, false) }), outputRec,
+                new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(keyFields,
                         new IBinaryHashFunctionFactory[] {
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE,
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+                                PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+                                PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }), tableSize), true);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                "multiKeySumExtGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeySumExtGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -900,55 +633,40 @@
     public void multiKeyAvgInmemGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+                desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        FloatSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                FloatSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 8, 0 };
         int tableSize = 8;
 
-        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
-                spec,
-                keyFields,
-                new FieldHashPartitionComputerFactory(keyFields,
-                        new IBinaryHashFunctionFactory[] {
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE,
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-                new IBinaryComparatorFactory[] {
-                        UTF8StringBinaryComparatorFactory.INSTANCE,
-                        UTF8StringBinaryComparatorFactory.INSTANCE },
-                new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] {
-                                new IntSumFieldAggregatorFactory(1, true),
-                                new CountFieldAggregatorFactory(true),
-                                new AvgFieldGroupAggregatorFactory(1, true) }),
-                outputRec, tableSize);
+        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(spec, keyFields,
+                new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }),
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                        new IntSumFieldAggregatorFactory(1, true), new CountFieldAggregatorFactory(true),
+                        new AvgFieldGroupAggregatorFactory(1, true) }), outputRec, tableSize);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-                spec, new FieldHashPartitionComputerFactory(keyFields,
-                        new IBinaryHashFunctionFactory[] {
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE,
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                "multiKeyAvgInmemGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeyAvgInmemGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -961,69 +679,36 @@
     public void multiKeyAvgPreClusterGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
-
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, NC2_ID);
-
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        FloatSerializerDeserializer.INSTANCE });
-
-        int[] keyFields = new int[] { 8, 0 };
-        
-        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(
-                spec,
-                4,
-                keyFields,
-                null,
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE,
-                        UTF8StringBinaryComparatorFactory.INSTANCE},
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
                 desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
 
-        IConnectorDescriptor conn0 = new MToNPartitioningConnectorDescriptor(
-                spec,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE,
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE}));
-        
-        spec.connect(conn0, csvScanner, 0, sorter, 0);
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                FloatSerializerDeserializer.INSTANCE });
 
-        PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
-                spec, keyFields, new IBinaryComparatorFactory[] {
-                        UTF8StringBinaryComparatorFactory.INSTANCE,
-                        UTF8StringBinaryComparatorFactory.INSTANCE },
-                new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] {
-                                new IntSumFieldAggregatorFactory(1, true),
-                                new CountFieldAggregatorFactory(true),
-                                new AvgFieldGroupAggregatorFactory(1, true) }),
-                outputRec);
+        int[] keyFields = new int[] { 8, 0 };
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                NC2_ID, NC1_ID);
+        PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                        new IntSumFieldAggregatorFactory(1, true), new CountFieldAggregatorFactory(true),
+                        new AvgFieldGroupAggregatorFactory(1, true) }), outputRec);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-                spec, new FieldHashPartitionComputerFactory(keyFields,
-                        new IBinaryHashFunctionFactory[] {
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE,
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-        spec.connect(conn1, sorter, 0, grouper, 0);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                "multiKeyAvgInmemGroupTest");
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
+        spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                NC2_ID, NC1_ID);
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeyAvgInmemGroupTest");
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -1036,66 +721,46 @@
     public void multiKeyAvgExtGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+                desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        FloatSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                FloatSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 8, 0 };
         int frameLimits = 4;
         int tableSize = 8;
 
-        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
-                spec,
-                keyFields,
-                frameLimits,
-                new IBinaryComparatorFactory[] {
-                        UTF8StringBinaryComparatorFactory.INSTANCE,
-                        UTF8StringBinaryComparatorFactory.INSTANCE },
+        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
                 new UTF8StringNormalizedKeyComputerFactory(),
-                new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] {
-                                new IntSumFieldAggregatorFactory(1, false),
-                                new CountFieldAggregatorFactory(false),
-                                new AvgFieldGroupAggregatorFactory(1, false) }),
-                new MultiFieldsAggregatorFactory(new int[] { 0, 1 },
-                        new IFieldAggregateDescriptorFactory[] {
-                                new IntSumFieldAggregatorFactory(2, false),
+                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                        new IntSumFieldAggregatorFactory(1, false), new CountFieldAggregatorFactory(false),
+                        new AvgFieldGroupAggregatorFactory(1, false) }), new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(2, false),
                                 new IntSumFieldAggregatorFactory(3, false),
-                                new AvgFieldMergeAggregatorFactory(4, false) }),
-                outputRec,
-                new HashSpillableTableFactory(
-                        new FieldHashPartitionComputerFactory(
-                                keyFields,
-                                new IBinaryHashFunctionFactory[] {
-                                        UTF8StringBinaryHashFunctionFactory.INSTANCE,
-                                        UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-                        tableSize), true);
-
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                NC2_ID, NC1_ID);
-
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-                spec, new FieldHashPartitionComputerFactory(keyFields,
+                                new AvgFieldMergeAggregatorFactory(4, false) }), outputRec,
+                new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(keyFields,
                         new IBinaryHashFunctionFactory[] {
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE,
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+                                PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+                                PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }), tableSize), true);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                "multiKeyAvgExtGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeyAvgExtGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -1108,53 +773,39 @@
     public void multiKeyMinMaxStringInmemGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+                desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 8, 0 };
         int tableSize = 8;
 
-        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
-                spec,
-                keyFields,
-                new FieldHashPartitionComputerFactory(keyFields,
-                        new IBinaryHashFunctionFactory[] {
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE,
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-                new IBinaryComparatorFactory[] {
-                        UTF8StringBinaryComparatorFactory.INSTANCE,
-                        UTF8StringBinaryComparatorFactory.INSTANCE },
-                new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] {
-                                new IntSumFieldAggregatorFactory(1, true),
-                                new MinMaxStringFieldAggregatorFactory(15,
-                                        true, false) }), outputRec, tableSize);
+        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(spec, keyFields,
+                new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }),
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                        new IntSumFieldAggregatorFactory(1, true),
+                        new MinMaxStringFieldAggregatorFactory(15, true, false) }), outputRec, tableSize);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-                spec, new FieldHashPartitionComputerFactory(keyFields,
-                        new IBinaryHashFunctionFactory[] {
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE,
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                "multiKeyMinMaxStringInmemGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeyMinMaxStringInmemGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -1167,67 +818,35 @@
     public void multiKeyMinMaxStringPreClusterGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
-
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, NC2_ID);
-
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE });
-
-        int[] keyFields = new int[] { 8, 0 };
-        
-        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(
-                spec,
-                4,
-                keyFields,
-                null,
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE,
-                        UTF8StringBinaryComparatorFactory.INSTANCE},
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
                 desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
 
-        IConnectorDescriptor conn0 = new MToNPartitioningConnectorDescriptor(
-                spec,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE,
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE}));
-        
-        spec.connect(conn0, csvScanner, 0, sorter, 0);
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
 
-        PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
-                spec, keyFields, new IBinaryComparatorFactory[] {
-                        UTF8StringBinaryComparatorFactory.INSTANCE,
-                        UTF8StringBinaryComparatorFactory.INSTANCE },
-                new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] {
-                                new IntSumFieldAggregatorFactory(1, true),
-                                new MinMaxStringFieldAggregatorFactory(15,
-                                        true, false) }), outputRec);
+        int[] keyFields = new int[] { 8, 0 };
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                NC2_ID, NC1_ID);
+        PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                        new IntSumFieldAggregatorFactory(1, true),
+                        new MinMaxStringFieldAggregatorFactory(15, true, false) }), outputRec);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-                spec, new FieldHashPartitionComputerFactory(keyFields,
-                        new IBinaryHashFunctionFactory[] {
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE,
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-        spec.connect(conn1, sorter, 0, grouper, 0);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                "multiKeyMinMaxStringPreClusterGroupTest");
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
+        spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                NC2_ID, NC1_ID);
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeyMinMaxStringPreClusterGroupTest");
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -1240,65 +859,44 @@
     public void multiKeyMinMaxStringExtGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+                desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 8, 0 };
         int frameLimits = 4;
         int tableSize = 8;
 
-        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
-                spec,
-                keyFields,
-                frameLimits,
-                new IBinaryComparatorFactory[] {
-                        UTF8StringBinaryComparatorFactory.INSTANCE,
-                        UTF8StringBinaryComparatorFactory.INSTANCE },
-                new UTF8StringNormalizedKeyComputerFactory(),
-                new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] {
-                                new IntSumFieldAggregatorFactory(1, false),
-                                new MinMaxStringFieldAggregatorFactory(15,
-                                        true, true) }),
-                new MultiFieldsAggregatorFactory(new int[] { 0, 1 },
-                        new IFieldAggregateDescriptorFactory[] {
-                                new IntSumFieldAggregatorFactory(2, false),
-                                new MinMaxStringFieldAggregatorFactory(3, true,
-                                        true) }),
-                outputRec,
-                new HashSpillableTableFactory(
-                        new FieldHashPartitionComputerFactory(
-                                keyFields,
-                                new IBinaryHashFunctionFactory[] {
-                                        UTF8StringBinaryHashFunctionFactory.INSTANCE,
-                                        UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-                        tableSize), true);
-
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                NC2_ID, NC1_ID);
-
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-                spec, new FieldHashPartitionComputerFactory(keyFields,
+        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                new UTF8StringNormalizedKeyComputerFactory(), new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
+                                new MinMaxStringFieldAggregatorFactory(15, true, true) }),
+                new MultiFieldsAggregatorFactory(new int[] { 0, 1 }, new IFieldAggregateDescriptorFactory[] {
+                        new IntSumFieldAggregatorFactory(2, false),
+                        new MinMaxStringFieldAggregatorFactory(3, true, true) }), outputRec,
+                new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(keyFields,
                         new IBinaryHashFunctionFactory[] {
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE,
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+                                PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+                                PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }), tableSize), true);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                "multiKeyMinMaxStringExtGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeyMinMaxStringExtGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java
index 1c96755..ce72ec5 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java
@@ -27,9 +27,10 @@
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.IntegerBinaryComparatorFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.hash.UTF8StringBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
@@ -69,7 +70,8 @@
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC1_ID);
 
         InMemorySortOperatorDescriptor sorter = new InMemorySortOperatorDescriptor(spec, new int[] { 0 },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, desc);
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                desc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID);
 
         RecordDescriptor desc2 = new RecordDescriptor(new ISerializerDeserializer[] {
@@ -77,19 +79,19 @@
         PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(
                 spec,
                 new int[] { 0 },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
                 new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
                 desc2);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, group, NC1_ID);
 
         InMemorySortOperatorDescriptor sorter2 = new InMemorySortOperatorDescriptor(spec, new int[] { 1 },
-                new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, desc2);
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) }, desc2);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter2, NC1_ID);
 
         RecordDescriptor desc3 = new RecordDescriptor(new ISerializerDeserializer[] {
                 IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
         PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 1 },
-                new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, new MultiFieldsAggregatorFactory(
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) }, new MultiFieldsAggregatorFactory(
                         new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }), desc3);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, group2, NC1_ID);
 
@@ -100,7 +102,8 @@
 
         IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 0 },
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn1, csvScanner, 0, sorter, 0);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
@@ -108,7 +111,8 @@
 
         IConnectorDescriptor conn3 = new MToNPartitioningConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 1 },
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn3, group, 0, sorter2, 0);
 
         IConnectorDescriptor conn4 = new OneToOneConnectorDescriptor(spec);
@@ -138,7 +142,8 @@
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC1_ID);
 
         InMemorySortOperatorDescriptor sorter = new InMemorySortOperatorDescriptor(spec, new int[] { 0 },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, desc);
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                desc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID, NC1_ID, NC2_ID);
 
         RecordDescriptor desc2 = new RecordDescriptor(new ISerializerDeserializer[] {
@@ -146,19 +151,19 @@
         PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(
                 spec,
                 new int[] { 0 },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
                 new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
                 desc2);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, group, NC1_ID, NC2_ID, NC1_ID, NC2_ID);
 
         InMemorySortOperatorDescriptor sorter2 = new InMemorySortOperatorDescriptor(spec, new int[] { 1 },
-                new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, desc2);
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) }, desc2);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter2, NC1_ID, NC2_ID);
 
         RecordDescriptor desc3 = new RecordDescriptor(new ISerializerDeserializer[] {
                 IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
         PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 1 },
-                new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, new MultiFieldsAggregatorFactory(
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) }, new MultiFieldsAggregatorFactory(
                         new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }), desc3);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, group2, NC1_ID, NC2_ID);
 
@@ -169,7 +174,8 @@
 
         IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 0 },
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn1, csvScanner, 0, sorter, 0);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
@@ -177,7 +183,8 @@
 
         IConnectorDescriptor conn3 = new MToNPartitioningConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 1 },
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn3, group, 0, sorter2, 0);
 
         IConnectorDescriptor conn4 = new OneToOneConnectorDescriptor(spec);
@@ -207,7 +214,8 @@
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC1_ID);
 
         ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, 3, new int[] { 0 },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, desc);
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                desc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID, NC1_ID, NC2_ID);
 
         RecordDescriptor desc2 = new RecordDescriptor(new ISerializerDeserializer[] {
@@ -215,19 +223,19 @@
         PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(
                 spec,
                 new int[] { 0 },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
                 new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
                 desc2);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, group, NC1_ID, NC2_ID, NC1_ID, NC2_ID);
 
         InMemorySortOperatorDescriptor sorter2 = new InMemorySortOperatorDescriptor(spec, new int[] { 1 },
-                new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, desc2);
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) }, desc2);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter2, NC1_ID, NC2_ID);
 
         RecordDescriptor desc3 = new RecordDescriptor(new ISerializerDeserializer[] {
                 IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
         PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 1 },
-                new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, new MultiFieldsAggregatorFactory(
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) }, new MultiFieldsAggregatorFactory(
                         new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }), desc3);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, group2, NC1_ID, NC2_ID);
 
@@ -238,7 +246,8 @@
 
         IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 0 },
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn1, csvScanner, 0, sorter, 0);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
@@ -246,7 +255,8 @@
 
         IConnectorDescriptor conn3 = new MToNPartitioningConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 1 },
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn3, group, 0, sorter2, 0);
 
         IConnectorDescriptor conn4 = new OneToOneConnectorDescriptor(spec);
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/ScanPrintTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/ScanPrintTest.java
index 55a0c1c..9355110 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/ScanPrintTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/ScanPrintTest.java
@@ -26,8 +26,9 @@
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.data.hash.IntegerBinaryHashFunctionFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.hash.UTF8StringBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
@@ -104,7 +105,8 @@
 
         IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 0 },
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn1, ordScanner, 0, printer, 0);
 
         spec.addRoot(printer);
@@ -140,7 +142,8 @@
 
         IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 0 },
-                        new IBinaryHashFunctionFactory[] { IntegerBinaryHashFunctionFactory.INSTANCE }));
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(IntegerPointable.FACTORY) }));
         spec.connect(conn1, ordScanner, 0, printer, 0);
 
         spec.addRoot(printer);
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
index 57888c8..2c3fddf 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
@@ -26,8 +26,9 @@
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.hash.UTF8StringBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
@@ -68,7 +69,8 @@
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
 
         InMemorySortOperatorDescriptor sorter = new InMemorySortOperatorDescriptor(spec, new int[] { 1 },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, ordersDesc);
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
@@ -78,10 +80,12 @@
 
         spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
 
-        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
-                new int[] { 1 }, new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-                new int[] { 1 }, new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }),
-                sorter, 0, printer, 0);
+        spec.connect(
+                new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
+                        new int[] { 1 }, new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }), new int[] { 1 },
+                        new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+                                .of(UTF8StringPointable.FACTORY) }), sorter, 0, printer, 0);
 
         runTest(spec);
     }
@@ -110,8 +114,8 @@
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
 
         ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, 4, new int[] { 1, 0 },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE,
-                        UTF8StringBinaryComparatorFactory.INSTANCE }, ordersDesc);
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
@@ -121,11 +125,15 @@
 
         spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
 
-        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
-                new int[] { 1, 0 }, new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE,
-                        UTF8StringBinaryHashFunctionFactory.INSTANCE }), new int[] { 1, 0 },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE,
-                        UTF8StringBinaryComparatorFactory.INSTANCE }), sorter, 0, printer, 0);
+        spec.connect(
+                new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(new int[] {
+                        1, 0 }, new IBinaryHashFunctionFactory[] {
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }), new int[] { 1, 0 },
+                        new IBinaryComparatorFactory[] {
+                                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+                                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }), sorter, 0,
+                printer, 0);
 
         runTest(spec);
     }
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
index 2fbf6d2..61d4696 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
@@ -32,8 +32,9 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.hash.UTF8StringBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
@@ -138,9 +139,13 @@
                         UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
 
-        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 },
-                new int[] { 0 }, new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, 128);
+        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(
+                spec,
+                new int[] { 1 },
+                new int[] { 0 },
+                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                custOrderJoinDesc, 128);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
@@ -211,10 +216,17 @@
                         UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
 
-        GraceHashJoinOperatorDescriptor join = new GraceHashJoinOperatorDescriptor(spec, 4, 10, 200, 1.2,
-                new int[] { 1 }, new int[] { 0 },
-                new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc);
+        GraceHashJoinOperatorDescriptor join = new GraceHashJoinOperatorDescriptor(
+                spec,
+                4,
+                10,
+                200,
+                1.2,
+                new int[] { 1 },
+                new int[] { 0 },
+                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                custOrderJoinDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
@@ -285,10 +297,17 @@
                         UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
 
-        HybridHashJoinOperatorDescriptor join = new HybridHashJoinOperatorDescriptor(spec, 5, 20, 200, 1.2,
-                new int[] { 1 }, new int[] { 0 },
-                new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc);
+        HybridHashJoinOperatorDescriptor join = new HybridHashJoinOperatorDescriptor(
+                spec,
+                5,
+                20,
+                200,
+                1.2,
+                new int[] { 1 },
+                new int[] { 0 },
+                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                custOrderJoinDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
@@ -359,15 +378,18 @@
                         UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
 
-        INullWriterFactory[] nullWriterFactories = new INullWriterFactory[ordersDesc.getFields().length];
+        INullWriterFactory[] nullWriterFactories = new INullWriterFactory[ordersDesc.getFieldCount()];
         for (int j = 0; j < nullWriterFactories.length; j++) {
             nullWriterFactories[j] = NoopNullWriterFactory.INSTANCE;
         }
 
-        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 0 },
-                new int[] { 1 }, new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, true,
-                nullWriterFactories, 128);
+        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(
+                spec,
+                new int[] { 0 },
+                new int[] { 1 },
+                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                custOrderJoinDesc, true, nullWriterFactories, 128);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
@@ -438,16 +460,22 @@
                         UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
 
-        INullWriterFactory[] nullWriterFactories = new INullWriterFactory[ordersDesc.getFields().length];
+        INullWriterFactory[] nullWriterFactories = new INullWriterFactory[ordersDesc.getFieldCount()];
         for (int j = 0; j < nullWriterFactories.length; j++) {
             nullWriterFactories[j] = NoopNullWriterFactory.INSTANCE;
         }
 
-        GraceHashJoinOperatorDescriptor join = new GraceHashJoinOperatorDescriptor(spec, 5, 20, 200, 1.2,
-                new int[] { 0 }, new int[] { 1 },
-                new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, true,
-                nullWriterFactories);
+        GraceHashJoinOperatorDescriptor join = new GraceHashJoinOperatorDescriptor(
+                spec,
+                5,
+                20,
+                200,
+                1.2,
+                new int[] { 0 },
+                new int[] { 1 },
+                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                custOrderJoinDesc, true, nullWriterFactories);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
@@ -518,16 +546,22 @@
                         UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
 
-        INullWriterFactory[] nullWriterFactories = new INullWriterFactory[ordersDesc.getFields().length];
+        INullWriterFactory[] nullWriterFactories = new INullWriterFactory[ordersDesc.getFieldCount()];
         for (int j = 0; j < nullWriterFactories.length; j++) {
             nullWriterFactories[j] = NoopNullWriterFactory.INSTANCE;
         }
 
-        HybridHashJoinOperatorDescriptor join = new HybridHashJoinOperatorDescriptor(spec, 5, 20, 200, 1.2,
-                new int[] { 0 }, new int[] { 1 },
-                new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, true,
-                nullWriterFactories);
+        HybridHashJoinOperatorDescriptor join = new HybridHashJoinOperatorDescriptor(
+                spec,
+                5,
+                20,
+                200,
+                1.2,
+                new int[] { 0 },
+                new int[] { 1 },
+                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                custOrderJoinDesc, true, nullWriterFactories);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
@@ -600,9 +634,13 @@
                         UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID);
 
-        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 },
-                new int[] { 0 }, new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, 128);
+        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(
+                spec,
+                new int[] { 1 },
+                new int[] { 0 },
+                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                custOrderJoinDesc, 128);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
@@ -612,12 +650,14 @@
 
         IConnectorDescriptor ordJoinConn = new MToNPartitioningConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 1 },
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
         spec.connect(ordJoinConn, ordScanner, 0, join, 0);
 
         IConnectorDescriptor custJoinConn = new MToNPartitioningConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 0 },
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
         spec.connect(custJoinConn, custScanner, 0, join, 1);
 
         IConnectorDescriptor joinPrinterConn = new MToNReplicatingConnectorDescriptor(spec);
@@ -679,10 +719,17 @@
                         UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID);
 
-        GraceHashJoinOperatorDescriptor join = new GraceHashJoinOperatorDescriptor(spec, 3, 20, 100, 1.2,
-                new int[] { 1 }, new int[] { 0 },
-                new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc);
+        GraceHashJoinOperatorDescriptor join = new GraceHashJoinOperatorDescriptor(
+                spec,
+                3,
+                20,
+                100,
+                1.2,
+                new int[] { 1 },
+                new int[] { 0 },
+                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                custOrderJoinDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
@@ -692,12 +739,14 @@
 
         IConnectorDescriptor ordJoinConn = new MToNPartitioningConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 1 },
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
         spec.connect(ordJoinConn, ordScanner, 0, join, 0);
 
         IConnectorDescriptor custJoinConn = new MToNPartitioningConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 0 },
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
         spec.connect(custJoinConn, custScanner, 0, join, 1);
 
         IConnectorDescriptor joinPrinterConn = new MToNReplicatingConnectorDescriptor(spec);
@@ -759,10 +808,17 @@
                         UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID);
 
-        HybridHashJoinOperatorDescriptor join = new HybridHashJoinOperatorDescriptor(spec, 3, 20, 100, 1.2,
-                new int[] { 1 }, new int[] { 0 },
-                new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc);
+        HybridHashJoinOperatorDescriptor join = new HybridHashJoinOperatorDescriptor(
+                spec,
+                3,
+                20,
+                100,
+                1.2,
+                new int[] { 1 },
+                new int[] { 0 },
+                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                custOrderJoinDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
@@ -772,12 +828,14 @@
 
         IConnectorDescriptor ordJoinConn = new MToNPartitioningConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 1 },
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
         spec.connect(ordJoinConn, ordScanner, 0, join, 0);
 
         IConnectorDescriptor custJoinConn = new MToNPartitioningConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 0 },
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
         spec.connect(custJoinConn, custScanner, 0, join, 1);
 
         IConnectorDescriptor joinPrinterConn = new MToNReplicatingConnectorDescriptor(spec);
@@ -839,9 +897,13 @@
                         UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID);
 
-        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 },
-                new int[] { 0 }, new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, 128);
+        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(
+                spec,
+                new int[] { 1 },
+                new int[] { 0 },
+                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                custOrderJoinDesc, 128);
         PartitionConstraintHelper.addPartitionCountConstraint(spec, join, 2);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
@@ -851,12 +913,14 @@
 
         IConnectorDescriptor ordJoinConn = new MToNPartitioningConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 1 },
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
         spec.connect(ordJoinConn, ordScanner, 0, join, 0);
 
         IConnectorDescriptor custJoinConn = new MToNPartitioningConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 0 },
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
         spec.connect(custJoinConn, custScanner, 0, join, 1);
 
         IConnectorDescriptor joinPrinterConn = new MToNReplicatingConnectorDescriptor(spec);
@@ -924,9 +988,13 @@
         MaterializingOperatorDescriptor custMat = new MaterializingOperatorDescriptor(spec, custDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custMat, NC1_ID, NC2_ID);
 
-        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 },
-                new int[] { 0 }, new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, 128);
+        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(
+                spec,
+                new int[] { 1 },
+                new int[] { 0 },
+                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                custOrderJoinDesc, 128);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
@@ -936,12 +1004,14 @@
 
         IConnectorDescriptor ordPartConn = new MToNPartitioningConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 1 },
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
         spec.connect(ordPartConn, ordScanner, 0, ordMat, 0);
 
         IConnectorDescriptor custPartConn = new MToNPartitioningConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 0 },
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
         spec.connect(custPartConn, custScanner, 0, custMat, 0);
 
         IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
index 135f1fe..6411390 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
@@ -30,7 +30,8 @@
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
@@ -163,7 +164,7 @@
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
 
         NestedLoopJoinOperatorDescriptor join = new NestedLoopJoinOperatorDescriptor(spec, new JoinComparatorFactory(
-                UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), custOrderJoinDesc, 4);
+                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), custOrderJoinDesc, 4);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
@@ -237,7 +238,7 @@
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID);
 
         NestedLoopJoinOperatorDescriptor join = new NestedLoopJoinOperatorDescriptor(spec, new JoinComparatorFactory(
-                UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), custOrderJoinDesc, 5);
+                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), custOrderJoinDesc, 5);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
@@ -311,7 +312,7 @@
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID);
 
         NestedLoopJoinOperatorDescriptor join = new NestedLoopJoinOperatorDescriptor(spec, new JoinComparatorFactory(
-                UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), custOrderJoinDesc, 6);
+                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), custOrderJoinDesc, 6);
         PartitionConstraintHelper.addPartitionCountConstraint(spec, join, 2);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/invertedindex/WordInvertedIndexTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/invertedindex/WordInvertedIndexTest.java
index 6440db3..298c146 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/invertedindex/WordInvertedIndexTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/invertedindex/WordInvertedIndexTest.java
@@ -27,14 +27,15 @@
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITypeTrait;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.IntegerBinaryComparatorFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
@@ -99,7 +100,7 @@
 
     // Primary BTree index.
     private int primaryFieldCount = 2;
-    private ITypeTrait[] primaryTypeTraits = new ITypeTrait[primaryFieldCount];
+    private ITypeTraits[] primaryTypeTraits = new ITypeTraits[primaryFieldCount];
     private int primaryKeyFieldCount = 1;
     private IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[primaryKeyFieldCount];
     private TypeAwareTupleWriterFactory primaryTupleWriterFactory = new TypeAwareTupleWriterFactory(primaryTypeTraits);
@@ -110,12 +111,12 @@
             IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
 
     // Inverted index BTree dictionary.
-    private ITypeTrait[] tokenTypeTraits = new ITypeTrait[1];
+    private ITypeTraits[] tokenTypeTraits = new ITypeTraits[1];
     private IBinaryComparatorFactory[] tokenComparatorFactories = new IBinaryComparatorFactory[1];
 
     // Inverted index stuff.
     private int invListElementFieldCount = 1;
-    private ITypeTrait[] invListsTypeTraits = new ITypeTrait[invListElementFieldCount];
+    private ITypeTraits[] invListsTypeTraits = new ITypeTraits[invListElementFieldCount];
     private IBinaryComparatorFactory[] invListsComparatorFactories = new IBinaryComparatorFactory[invListElementFieldCount];
     private RecordDescriptor tokenizerRecDesc = new RecordDescriptor(new ISerializerDeserializer[] {
             UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
@@ -130,17 +131,17 @@
     @Before
     public void setup() throws Exception {
         // Field declarations and comparators for primary BTree index.
-        primaryTypeTraits[0] = ITypeTrait.INTEGER_TYPE_TRAIT;
-        primaryTypeTraits[1] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryComparatorFactories[0] = IntegerBinaryComparatorFactory.INSTANCE;
+        primaryTypeTraits[0] = IntegerPointable.TYPE_TRAITS;
+        primaryTypeTraits[1] = UTF8StringPointable.TYPE_TRAITS;
+        primaryComparatorFactories[0] = PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY);
 
         // Field declarations and comparators for tokens.
-        tokenTypeTraits[0] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        tokenComparatorFactories[0] = UTF8StringBinaryComparatorFactory.INSTANCE;
+        tokenTypeTraits[0] = UTF8StringPointable.TYPE_TRAITS;
+        tokenComparatorFactories[0] = PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY);
 
         // Field declarations and comparators for inverted lists.
-        invListsTypeTraits[0] = ITypeTrait.INTEGER_TYPE_TRAIT;
-        invListsComparatorFactories[0] = IntegerBinaryComparatorFactory.INSTANCE;
+        invListsTypeTraits[0] = IntegerPointable.TYPE_TRAITS;
+        invListsComparatorFactories[0] = PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY);
 
         loadPrimaryIndex();
         printPrimaryIndex();
@@ -232,8 +233,8 @@
     private IOperatorDescriptor createExternalSortOp(JobSpecification spec, int[] sortFields,
             RecordDescriptor outputRecDesc) {
         ExternalSortOperatorDescriptor externalSortOp = new ExternalSortOperatorDescriptor(spec, 1000, sortFields,
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE,
-                        IntegerBinaryComparatorFactory.INSTANCE }, outputRecDesc);
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) }, outputRecDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, externalSortOp, NC1_ID);
         return externalSortOp;
     }
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/rtree/RTreePrimaryIndexSearchOperatorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/rtree/RTreePrimaryIndexSearchOperatorTest.java
index 74dc4a8..0c0df42 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/rtree/RTreePrimaryIndexSearchOperatorTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/rtree/RTreePrimaryIndexSearchOperatorTest.java
@@ -28,12 +28,14 @@
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITypeTrait;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.primitive.DoublePointable;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.DoubleBinaryComparatorFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.DoubleSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.DoubleParserFactory;
@@ -80,7 +82,7 @@
     // field, type and key declarations for primary R-tree index
     private int primaryFieldCount = 5;
     private int primaryKeyFieldCount = 4;
-    private ITypeTrait[] primaryTypeTraits = new ITypeTrait[primaryFieldCount];
+    private ITypeTraits[] primaryTypeTraits = new ITypeTraits[primaryFieldCount];
     private IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[primaryKeyFieldCount];
 
     private RTreeTypeAwareTupleWriterFactory primaryTupleWriterFactory = new RTreeTypeAwareTupleWriterFactory(
@@ -103,18 +105,18 @@
     @Before
     public void setup() throws Exception {
         // field, type and key declarations for primary R-tree index
-        primaryTypeTraits[0] = ITypeTrait.DOUBLE_TYPE_TRAIT;
-        primaryTypeTraits[1] = ITypeTrait.DOUBLE_TYPE_TRAIT;
-        primaryTypeTraits[2] = ITypeTrait.DOUBLE_TYPE_TRAIT;
-        primaryTypeTraits[3] = ITypeTrait.DOUBLE_TYPE_TRAIT;
-        primaryTypeTraits[4] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryComparatorFactories[0] = DoubleBinaryComparatorFactory.INSTANCE;
+        primaryTypeTraits[0] = DoublePointable.TYPE_TRAITS;
+        primaryTypeTraits[1] = DoublePointable.TYPE_TRAITS;
+        primaryTypeTraits[2] = DoublePointable.TYPE_TRAITS;
+        primaryTypeTraits[3] = DoublePointable.TYPE_TRAITS;
+        primaryTypeTraits[4] = UTF8StringPointable.TYPE_TRAITS;
+        primaryComparatorFactories[0] = PointableBinaryComparatorFactory.of(DoublePointable.FACTORY);
         primaryComparatorFactories[1] = primaryComparatorFactories[0];
         primaryComparatorFactories[2] = primaryComparatorFactories[0];
         primaryComparatorFactories[3] = primaryComparatorFactories[0];
 
         IPrimitiveValueProviderFactory[] primaryValueProviderFactories = RTreeUtils
-                .comparatorFactoriesToPrimitiveValueProviderFactories(primaryComparatorFactories);
+                .createPrimitiveValueProviderFactories(primaryComparatorFactories.length, DoublePointable.FACTORY);
 
         primaryInteriorFrameFactory = new RTreeNSMInteriorFrameFactory(primaryTupleWriterFactory,
                 primaryValueProviderFactories);
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/rtree/RTreePrimaryIndexStatsOperatorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/rtree/RTreePrimaryIndexStatsOperatorTest.java
index 62fc150..a4912e8 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/rtree/RTreePrimaryIndexStatsOperatorTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/rtree/RTreePrimaryIndexStatsOperatorTest.java
@@ -27,11 +27,12 @@
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITypeTrait;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.DoubleBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.primitive.DoublePointable;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.DoubleSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.DoubleParserFactory;
@@ -77,7 +78,7 @@
     // field, type and key declarations for primary R-tree index
     private int primaryFieldCount = 5;
     private int primaryKeyFieldCount = 4;
-    private ITypeTrait[] primaryTypeTraits = new ITypeTrait[primaryFieldCount];
+    private ITypeTraits[] primaryTypeTraits = new ITypeTraits[primaryFieldCount];
     private IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[primaryKeyFieldCount];
 
     private RTreeTypeAwareTupleWriterFactory primaryTupleWriterFactory = new RTreeTypeAwareTupleWriterFactory(
@@ -100,18 +101,18 @@
     @Before
     public void setup() throws Exception {
         // field, type and key declarations for primary R-tree index
-        primaryTypeTraits[0] = ITypeTrait.DOUBLE_TYPE_TRAIT;
-        primaryTypeTraits[1] = ITypeTrait.DOUBLE_TYPE_TRAIT;
-        primaryTypeTraits[2] = ITypeTrait.DOUBLE_TYPE_TRAIT;
-        primaryTypeTraits[3] = ITypeTrait.DOUBLE_TYPE_TRAIT;
-        primaryTypeTraits[4] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryComparatorFactories[0] = DoubleBinaryComparatorFactory.INSTANCE;
+        primaryTypeTraits[0] = DoublePointable.TYPE_TRAITS;
+        primaryTypeTraits[1] = DoublePointable.TYPE_TRAITS;
+        primaryTypeTraits[2] = DoublePointable.TYPE_TRAITS;
+        primaryTypeTraits[3] = DoublePointable.TYPE_TRAITS;
+        primaryTypeTraits[4] = DoublePointable.TYPE_TRAITS;
+        primaryComparatorFactories[0] = PointableBinaryComparatorFactory.of(DoublePointable.FACTORY);
         primaryComparatorFactories[1] = primaryComparatorFactories[0];
         primaryComparatorFactories[2] = primaryComparatorFactories[0];
         primaryComparatorFactories[3] = primaryComparatorFactories[0];
 
         IPrimitiveValueProviderFactory[] primaryValueProviderFactories = RTreeUtils
-                .comparatorFactoriesToPrimitiveValueProviderFactories(primaryComparatorFactories);
+                .createPrimitiveValueProviderFactories(primaryComparatorFactories.length, DoublePointable.FACTORY);
 
         primaryInteriorFrameFactory = new RTreeNSMInteriorFrameFactory(primaryTupleWriterFactory,
                 primaryValueProviderFactories);
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/rtree/RTreeSecondaryIndexSearchOperatorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/rtree/RTreeSecondaryIndexSearchOperatorTest.java
index 300a726..025f675 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/rtree/RTreeSecondaryIndexSearchOperatorTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/rtree/RTreeSecondaryIndexSearchOperatorTest.java
@@ -28,13 +28,14 @@
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITypeTrait;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.primitive.DoublePointable;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.DoubleBinaryComparatorFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.DoubleSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.DoubleParserFactory;
@@ -88,7 +89,7 @@
 
     // field, type and key declarations for primary B-tree index
     private int primaryBTreeFieldCount = 10;
-    private ITypeTrait[] primaryBTreeTypeTraits = new ITypeTrait[primaryBTreeFieldCount];
+    private ITypeTraits[] primaryBTreeTypeTraits = new ITypeTraits[primaryBTreeFieldCount];
     private int primaryBTreeKeyFieldCount = 1;
     private IBinaryComparatorFactory[] primaryBTreeComparatorFactories = new IBinaryComparatorFactory[primaryBTreeKeyFieldCount];
     private TypeAwareTupleWriterFactory primaryBTreeTupleWriterFactory = new TypeAwareTupleWriterFactory(
@@ -113,7 +114,7 @@
 
     // field, type and key declarations for secondary indexes
     private int secondaryFieldCount = 5;
-    private ITypeTrait[] secondaryTypeTraits = new ITypeTrait[secondaryFieldCount];
+    private ITypeTraits[] secondaryTypeTraits = new ITypeTraits[secondaryFieldCount];
     private int secondaryKeyFieldCount = 4;
     private IBinaryComparatorFactory[] secondaryComparatorFactories = new IBinaryComparatorFactory[secondaryKeyFieldCount];
     private IPrimitiveValueProviderFactory[] secondaryValueProviderFactories = new IPrimitiveValueProviderFactory[secondaryKeyFieldCount];
@@ -138,25 +139,25 @@
     @Before
     public void setup() throws Exception {
         // field, type and key declarations for primary B-tree index
-        primaryBTreeTypeTraits[0] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryBTreeTypeTraits[1] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryBTreeTypeTraits[2] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryBTreeTypeTraits[3] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryBTreeTypeTraits[4] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryBTreeTypeTraits[5] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryBTreeTypeTraits[6] = ITypeTrait.DOUBLE_TYPE_TRAIT;
-        primaryBTreeTypeTraits[7] = ITypeTrait.DOUBLE_TYPE_TRAIT;
-        primaryBTreeTypeTraits[8] = ITypeTrait.DOUBLE_TYPE_TRAIT;
-        primaryBTreeTypeTraits[9] = ITypeTrait.DOUBLE_TYPE_TRAIT;
-        primaryBTreeComparatorFactories[0] = UTF8StringBinaryComparatorFactory.INSTANCE;
+        primaryBTreeTypeTraits[0] = UTF8StringPointable.TYPE_TRAITS;
+        primaryBTreeTypeTraits[1] = UTF8StringPointable.TYPE_TRAITS;
+        primaryBTreeTypeTraits[2] = UTF8StringPointable.TYPE_TRAITS;
+        primaryBTreeTypeTraits[3] = UTF8StringPointable.TYPE_TRAITS;
+        primaryBTreeTypeTraits[4] = UTF8StringPointable.TYPE_TRAITS;
+        primaryBTreeTypeTraits[5] = UTF8StringPointable.TYPE_TRAITS;
+        primaryBTreeTypeTraits[6] = DoublePointable.TYPE_TRAITS;
+        primaryBTreeTypeTraits[7] = DoublePointable.TYPE_TRAITS;
+        primaryBTreeTypeTraits[8] = DoublePointable.TYPE_TRAITS;
+        primaryBTreeTypeTraits[9] = DoublePointable.TYPE_TRAITS;
+        primaryBTreeComparatorFactories[0] = PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY);
 
         // field, type and key declarations for secondary indexes
-        secondaryTypeTraits[0] = ITypeTrait.DOUBLE_TYPE_TRAIT;
-        secondaryTypeTraits[1] = ITypeTrait.DOUBLE_TYPE_TRAIT;
-        secondaryTypeTraits[2] = ITypeTrait.DOUBLE_TYPE_TRAIT;
-        secondaryTypeTraits[3] = ITypeTrait.DOUBLE_TYPE_TRAIT;
-        secondaryTypeTraits[4] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        secondaryComparatorFactories[0] = DoubleBinaryComparatorFactory.INSTANCE;
+        secondaryTypeTraits[0] = DoublePointable.TYPE_TRAITS;
+        secondaryTypeTraits[1] = DoublePointable.TYPE_TRAITS;
+        secondaryTypeTraits[2] = DoublePointable.TYPE_TRAITS;
+        secondaryTypeTraits[3] = DoublePointable.TYPE_TRAITS;
+        secondaryTypeTraits[4] = UTF8StringPointable.TYPE_TRAITS;
+        secondaryComparatorFactories[0] = PointableBinaryComparatorFactory.of(DoublePointable.FACTORY);
         secondaryComparatorFactories[1] = secondaryComparatorFactories[0];
         secondaryComparatorFactories[2] = secondaryComparatorFactories[0];
         secondaryComparatorFactories[3] = secondaryComparatorFactories[0];
@@ -166,7 +167,7 @@
         secondaryValueProviderFactories[3] = secondaryValueProviderFactories[0];
 
         IPrimitiveValueProviderFactory[] secondaryValueProviderFactories = RTreeUtils
-                .comparatorFactoriesToPrimitiveValueProviderFactories(secondaryComparatorFactories);
+                .createPrimitiveValueProviderFactories(secondaryComparatorFactories.length, DoublePointable.FACTORY);
 
         secondaryInteriorFrameFactory = new RTreeNSMInteriorFrameFactory(secondaryTupleWriterFactory,
                 secondaryValueProviderFactories);
@@ -203,7 +204,8 @@
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID);
 
         ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, 1000, new int[] { 0 },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, ordersDesc);
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID);
 
         int[] fieldPermutation = { 0, 1, 2, 4, 5, 7, 9, 10, 11, 12 };
diff --git a/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java b/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java
index 8db3f48..66f7efc 100644
--- a/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java
+++ b/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/ExternalGroupClient.java
@@ -19,7 +19,7 @@
 import org.kohsuke.args4j.CmdLineParser;
 import org.kohsuke.args4j.Option;
 
-import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
@@ -31,8 +31,9 @@
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.IntegerBinaryComparatorFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.hash.IntegerBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.FloatSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
@@ -53,11 +54,9 @@
 import edu.uci.ics.hyracks.dataflow.std.file.FrameFileWriterOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
 import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.group.ExternalGroupOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.group.HashGroupOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableTableFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
-import edu.uci.ics.hyracks.dataflow.std.group.PreclusteredGroupOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.group.aggregators.CountFieldAggregatorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.aggregators.IntSumFieldAggregatorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
@@ -98,7 +97,7 @@
 
         @Option(name = "-out-plain", usage = "Whether to output plain text (default: true)", required = false)
         public boolean outPlain = true;
-        
+
         @Option(name = "-algo", usage = "The algorithm to be used", required = true)
         public int algo;
     }
@@ -111,17 +110,15 @@
         CmdLineParser parser = new CmdLineParser(options);
         parser.parseArgument(args);
 
-        IHyracksClientConnection hcc = new HyracksRMIConnection(options.host,
-                options.port);
+        IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port);
 
         JobSpecification job;
 
         for (int i = 0; i < 6; i++) {
             long start = System.currentTimeMillis();
-            job = createJob(parseFileSplits(options.inFileSplits),
-                    parseFileSplits(options.outFileSplits, i),
-                    options.htSize, options.sbSize, options.framesLimit,
-                    options.sortOutput, options.algo, options.outPlain);
+            job = createJob(parseFileSplits(options.inFileSplits), parseFileSplits(options.outFileSplits, i),
+                    options.htSize, options.sbSize, options.framesLimit, options.sortOutput, options.algo,
+                    options.outPlain);
 
             System.out.print(i + "\t" + (System.currentTimeMillis() - start));
             start = System.currentTimeMillis();
@@ -139,11 +136,9 @@
             String s = splits[i].trim();
             int idx = s.indexOf(':');
             if (idx < 0) {
-                throw new IllegalArgumentException("File split " + s
-                        + " not well formed");
+                throw new IllegalArgumentException("File split " + s + " not well formed");
             }
-            fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(
-                    new File(s.substring(idx + 1))));
+            fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(new File(s.substring(idx + 1))));
         }
         return fSplits;
     }
@@ -155,70 +150,46 @@
             String s = splits[i].trim();
             int idx = s.indexOf(':');
             if (idx < 0) {
-                throw new IllegalArgumentException("File split " + s
-                        + " not well formed");
+                throw new IllegalArgumentException("File split " + s + " not well formed");
             }
-            fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(
-                    new File(s.substring(idx + 1) + "_" + count)));
+            fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(new File(s.substring(idx + 1) + "_"
+                    + count)));
         }
         return fSplits;
     }
 
-    private static JobSpecification createJob(FileSplit[] inSplits,
-            FileSplit[] outSplits, int htSize, int sbSize, int framesLimit,
-            boolean sortOutput, int alg, boolean outPlain) {
+    private static JobSpecification createJob(FileSplit[] inSplits, FileSplit[] outSplits, int htSize, int sbSize,
+            int framesLimit, boolean sortOutput, int alg, boolean outPlain) {
         JobSpecification spec = new JobSpecification();
-        IFileSplitProvider splitsProvider = new ConstantFileSplitProvider(
-                inSplits);
+        IFileSplitProvider splitsProvider = new ConstantFileSplitProvider(inSplits);
 
-        RecordDescriptor inDesc = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        IntegerSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        FloatSerializerDeserializer.INSTANCE,
-                        FloatSerializerDeserializer.INSTANCE,
-                        FloatSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE });
+        RecordDescriptor inDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+                FloatSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
 
-        FileScanOperatorDescriptor fileScanner = new FileScanOperatorDescriptor(
-                spec, splitsProvider, new DelimitedDataTupleParserFactory(
-                        new IValueParserFactory[] {
-                                IntegerParserFactory.INSTANCE,
-                                IntegerParserFactory.INSTANCE,
-                                IntegerParserFactory.INSTANCE,
-                                IntegerParserFactory.INSTANCE,
-                                IntegerParserFactory.INSTANCE,
-                                FloatParserFactory.INSTANCE,
-                                FloatParserFactory.INSTANCE,
-                                FloatParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE, }, '|'),
-                inDesc);
+        FileScanOperatorDescriptor fileScanner = new FileScanOperatorDescriptor(spec, splitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { IntegerParserFactory.INSTANCE,
+                        IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
+                        IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
+                        FloatParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, }, '|'), inDesc);
 
         createPartitionConstraint(spec, fileScanner, inSplits);
 
         // Output: each unique string with an integer count
-        RecordDescriptor outDesc = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        IntegerSerializerDeserializer.INSTANCE,
-                        // IntegerSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE });
+        RecordDescriptor outDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                IntegerSerializerDeserializer.INSTANCE,
+                // IntegerSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE });
 
         // Specify the grouping key, which will be the string extracted during
         // the scan.
@@ -229,156 +200,126 @@
         AbstractOperatorDescriptor grouper;
 
         switch (alg) {
-        case 0: // new external hash graph
-            grouper = new ExternalGroupOperatorDescriptor(
-                    spec,
-                    keys,
-                    framesLimit,
-                    new IBinaryComparatorFactory[] {
-                    // IntegerBinaryComparatorFactory.INSTANCE,
-                    IntegerBinaryComparatorFactory.INSTANCE },
-                    new IntegerNormalizedKeyComputerFactory(),
-                    new MultiFieldsAggregatorFactory(
-                            new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(
-                                    false) }),
-                    new MultiFieldsAggregatorFactory(
-                            new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(
-                                    keys.length, false) }), outDesc,
-                    new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(keys,
-                                    new IBinaryHashFunctionFactory[] {
-                                    // IntegerBinaryHashFunctionFactory.INSTANCE,
-                                    IntegerBinaryHashFunctionFactory.INSTANCE }),
-                            htSize), false);
-            
-            createPartitionConstraint(spec, grouper, outSplits);
+            case 0: // new external hash graph
+                grouper = new edu.uci.ics.hyracks.dataflow.std.group.ExternalGroupOperatorDescriptor(spec, keys,
+                        framesLimit, new IBinaryComparatorFactory[] {
+                        // PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY),
+                        PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },
+                        new IntegerNormalizedKeyComputerFactory(), new MultiFieldsAggregatorFactory(
+                                new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(false) }),
+                        new MultiFieldsAggregatorFactory(
+                                new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(keys.length,
+                                        false) }), outDesc, new HashSpillableTableFactory(
+                                new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
+                                // PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY),
+                                PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) }), htSize), false);
 
-            // Connect scanner with the grouper
-            IConnectorDescriptor scanGroupConnDef2 = new MToNPartitioningConnectorDescriptor(
-                    spec, new FieldHashPartitionComputerFactory(keys,
-                            new IBinaryHashFunctionFactory[] {
-                            // IntegerBinaryHashFunctionFactory.INSTANCE,
-                            IntegerBinaryHashFunctionFactory.INSTANCE }));
-            spec.connect(scanGroupConnDef2, fileScanner, 0, grouper, 0);
-            
-            break;
-        case 1: // External-sort + new-precluster
-            ExternalSortOperatorDescriptor sorter2 = new ExternalSortOperatorDescriptor(
-                    spec, framesLimit, keys, new IBinaryComparatorFactory[] {
-                    // IntegerBinaryComparatorFactory.INSTANCE,
-                    IntegerBinaryComparatorFactory.INSTANCE }, inDesc);
-            createPartitionConstraint(spec, sorter2, inSplits);
+                createPartitionConstraint(spec, grouper, outSplits);
 
-            // Connect scan operator with the sorter
-            IConnectorDescriptor scanSortConn2 = new MToNPartitioningConnectorDescriptor(
-                    spec, new FieldHashPartitionComputerFactory(keys,
-                            new IBinaryHashFunctionFactory[] {
-                            // IntegerBinaryHashFunctionFactory.INSTANCE,
-                            IntegerBinaryHashFunctionFactory.INSTANCE }));
-            spec.connect(scanSortConn2, fileScanner, 0, sorter2, 0);
+                // Connect scanner with the grouper
+                IConnectorDescriptor scanGroupConnDef2 = new MToNPartitioningConnectorDescriptor(spec,
+                        new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
+                        // PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY),
+                        PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) }));
+                spec.connect(scanGroupConnDef2, fileScanner, 0, grouper, 0);
 
-            grouper = new PreclusteredGroupOperatorDescriptor(
-                    spec,
-                    keys,
-                    new IBinaryComparatorFactory[] {
-                    // IntegerBinaryComparatorFactory.INSTANCE,
-                    IntegerBinaryComparatorFactory.INSTANCE },
-                    new MultiFieldsAggregatorFactory(
-                            new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
-                    outDesc);
+                break;
+            case 1: // External-sort + new-precluster
+                ExternalSortOperatorDescriptor sorter2 = new ExternalSortOperatorDescriptor(spec, framesLimit, keys,
+                        new IBinaryComparatorFactory[] {
+                        // PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY),
+                        PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) }, inDesc);
+                createPartitionConstraint(spec, sorter2, inSplits);
 
-            createPartitionConstraint(spec, grouper, outSplits);
+                // Connect scan operator with the sorter
+                IConnectorDescriptor scanSortConn2 = new MToNPartitioningConnectorDescriptor(spec,
+                        new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
+                        // PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY),
+                        PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) }));
+                spec.connect(scanSortConn2, fileScanner, 0, sorter2, 0);
 
-            // Connect sorter with the pre-cluster
-            OneToOneConnectorDescriptor sortGroupConn2 = new OneToOneConnectorDescriptor(
-                    spec);
-            spec.connect(sortGroupConn2, sorter2, 0, grouper, 0);
-            break;
-        case 2: // Inmem
-            grouper = new HashGroupOperatorDescriptor(
-                    spec,
-                    keys,
-                    new FieldHashPartitionComputerFactory(keys,
-                            new IBinaryHashFunctionFactory[] {
-                            // IntegerBinaryHashFunctionFactory.INSTANCE,
-                            IntegerBinaryHashFunctionFactory.INSTANCE }),
-                    new IBinaryComparatorFactory[] {
-                    // IntegerBinaryComparatorFactory.INSTANCE,
-                    IntegerBinaryComparatorFactory.INSTANCE },
-                    new MultiFieldsAggregatorFactory(
-                            new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
-                    outDesc, htSize);
+                grouper = new edu.uci.ics.hyracks.dataflow.std.group.PreclusteredGroupOperatorDescriptor(spec, keys,
+                        new IBinaryComparatorFactory[] {
+                        // PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY),
+                        PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },
+                        new MultiFieldsAggregatorFactory(
+                                new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
+                        outDesc);
 
-            createPartitionConstraint(spec, grouper, outSplits);
+                createPartitionConstraint(spec, grouper, outSplits);
 
-            // Connect scanner with the grouper
-            IConnectorDescriptor scanConn2 = new MToNPartitioningConnectorDescriptor(
-                    spec, new FieldHashPartitionComputerFactory(keys,
-                            new IBinaryHashFunctionFactory[] {
-                            // IntegerBinaryHashFunctionFactory.INSTANCE,
-                            IntegerBinaryHashFunctionFactory.INSTANCE }));
-            spec.connect(scanConn2, fileScanner, 0, grouper, 0);
-            break;
-        default:
-            grouper = new edu.uci.ics.hyracks.dataflow.std.group.ExternalGroupOperatorDescriptor(
-                    spec,
-                    keys,
-                    framesLimit,
-                    new IBinaryComparatorFactory[] {
-                    // IntegerBinaryComparatorFactory.INSTANCE,
-                    IntegerBinaryComparatorFactory.INSTANCE },
-                    new IntegerNormalizedKeyComputerFactory(),
-                    new MultiFieldsAggregatorFactory(
-                            new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(
-                                    false) }),
-                    new MultiFieldsAggregatorFactory(
-                            new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(
-                                    keys.length, false) }), outDesc,
-                    new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(keys,
-                                    new IBinaryHashFunctionFactory[] {
-                                    // IntegerBinaryHashFunctionFactory.INSTANCE,
-                                    IntegerBinaryHashFunctionFactory.INSTANCE }),
-                            htSize), false);
-            
-            createPartitionConstraint(spec, grouper, outSplits);
+                // Connect sorter with the pre-cluster
+                OneToOneConnectorDescriptor sortGroupConn2 = new OneToOneConnectorDescriptor(spec);
+                spec.connect(sortGroupConn2, sorter2, 0, grouper, 0);
+                break;
+            case 2: // Inmem
+                grouper = new HashGroupOperatorDescriptor(spec, keys, new FieldHashPartitionComputerFactory(keys,
+                        new IBinaryHashFunctionFactory[] {
+                        // PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY),
+                        PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) }),
+                        new IBinaryComparatorFactory[] {
+                        // PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY),
+                        PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },
+                        new MultiFieldsAggregatorFactory(
+                                new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
+                        outDesc, htSize);
 
-            // Connect scanner with the grouper
-            IConnectorDescriptor scanGroupConnDef = new MToNPartitioningConnectorDescriptor(
-                    spec, new FieldHashPartitionComputerFactory(keys,
-                            new IBinaryHashFunctionFactory[] {
-                            // IntegerBinaryHashFunctionFactory.INSTANCE,
-                            IntegerBinaryHashFunctionFactory.INSTANCE }));
-            spec.connect(scanGroupConnDef, fileScanner, 0, grouper, 0);
+                createPartitionConstraint(spec, grouper, outSplits);
+
+                // Connect scanner with the grouper
+                IConnectorDescriptor scanConn2 = new MToNPartitioningConnectorDescriptor(spec,
+                        new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
+                        // PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY),
+                        PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) }));
+                spec.connect(scanConn2, fileScanner, 0, grouper, 0);
+                break;
+            default:
+                grouper = new edu.uci.ics.hyracks.dataflow.std.group.ExternalGroupOperatorDescriptor(spec, keys,
+                        framesLimit, new IBinaryComparatorFactory[] {
+                        // PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY),
+                        PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },
+                        new IntegerNormalizedKeyComputerFactory(), new MultiFieldsAggregatorFactory(
+                                new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(false) }),
+                        new MultiFieldsAggregatorFactory(
+                                new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(keys.length,
+                                        false) }), outDesc, new HashSpillableTableFactory(
+                                new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
+                                // PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY),
+                                PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) }), htSize), false);
+
+                createPartitionConstraint(spec, grouper, outSplits);
+
+                // Connect scanner with the grouper
+                IConnectorDescriptor scanGroupConnDef = new MToNPartitioningConnectorDescriptor(spec,
+                        new FieldHashPartitionComputerFactory(keys, new IBinaryHashFunctionFactory[] {
+                        // PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY),
+                        PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) }));
+                spec.connect(scanGroupConnDef, fileScanner, 0, grouper, 0);
         }
 
-        IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(
-                outSplits);
+        IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(outSplits);
 
         AbstractSingleActivityOperatorDescriptor writer;
 
         if (outPlain)
-            writer = new PlainFileWriterOperatorDescriptor(spec,
-                    outSplitProvider, "|");
+            writer = new PlainFileWriterOperatorDescriptor(spec, outSplitProvider, "|");
         else
-            writer = new FrameFileWriterOperatorDescriptor(spec,
-                    outSplitProvider);
+            writer = new FrameFileWriterOperatorDescriptor(spec, outSplitProvider);
 
         createPartitionConstraint(spec, writer, outSplits);
 
-        IConnectorDescriptor groupOutConn = new OneToOneConnectorDescriptor(
-                spec);
+        IConnectorDescriptor groupOutConn = new OneToOneConnectorDescriptor(spec);
         spec.connect(groupOutConn, grouper, 0, writer, 0);
 
         spec.addRoot(writer);
         return spec;
     }
 
-    private static void createPartitionConstraint(JobSpecification spec,
-            IOperatorDescriptor op, FileSplit[] splits) {
+    private static void createPartitionConstraint(JobSpecification spec, IOperatorDescriptor op, FileSplit[] splits) {
         String[] parts = new String[splits.length];
         for (int i = 0; i < splits.length; ++i) {
             parts[i] = splits[i].getNodeName();
         }
-        PartitionConstraintHelper
-                .addAbsoluteLocationConstraint(spec, op, parts);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, op, parts);
     }
 }
\ No newline at end of file
diff --git a/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java b/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java
index 8cac822..1708259 100644
--- a/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java
+++ b/hyracks-examples/text-example/textclient/src/main/java/edu/uci/ics/hyracks/examples/text/client/WordCountMain.java
@@ -20,7 +20,7 @@
 import org.kohsuke.args4j.CmdLineParser;
 import org.kohsuke.args4j.Option;
 
-import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
@@ -33,8 +33,9 @@
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.hash.UTF8StringBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.normalizers.UTF8StringNormalizedKeyComputerFactory;
@@ -62,7 +63,7 @@
         public String host;
 
         @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1099)")
-        public int port = 1099;
+        public int port = 1098;
 
         @Option(name = "-app", usage = "Hyracks Application name", required = true)
         public String app;
@@ -94,7 +95,7 @@
         CmdLineParser parser = new CmdLineParser(options);
         parser.parseArgument(args);
 
-        IHyracksClientConnection hcc = new HyracksRMIConnection(options.host, options.port);
+        IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port);
 
         JobSpecification job = createJob(parseFileSplits(options.inFileSplits), parseFileSplits(options.outFileSplits),
                 options.algo, options.htSize, options.sbSize, options.format);
@@ -140,19 +141,25 @@
         IOperatorDescriptor gBy;
         int[] keys = new int[] { 0 };
         if ("hash".equalsIgnoreCase(algo)) {
-            gBy = new HashGroupOperatorDescriptor(spec, keys, new FieldHashPartitionComputerFactory(keys,
-                    new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-                    new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+            gBy = new HashGroupOperatorDescriptor(
+                    spec,
+                    keys,
+                    new FieldHashPartitionComputerFactory(keys,
+                            new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                    .of(UTF8StringPointable.FACTORY) }),
+                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
                     new MultiFieldsAggregatorFactory(
                             new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
                     groupResultDesc, htSize);
             createPartitionConstraint(spec, gBy, outSplits);
             IConnectorDescriptor scanGroupConn = new MToNPartitioningConnectorDescriptor(spec,
                     new FieldHashPartitionComputerFactory(keys,
-                            new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+                            new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                    .of(UTF8StringPointable.FACTORY) }));
             spec.connect(scanGroupConn, wordScanner, 0, gBy, 0);
         } else {
-            IBinaryComparatorFactory[] cfs = new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE };
+            IBinaryComparatorFactory[] cfs = new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+                    .of(UTF8StringPointable.FACTORY) };
             IOperatorDescriptor sorter = "memsort".equalsIgnoreCase(algo) ? new InMemorySortOperatorDescriptor(spec,
                     keys, new UTF8StringNormalizedKeyComputerFactory(), cfs, wordDesc)
                     : new ExternalSortOperatorDescriptor(spec, sbSize, keys,
@@ -161,11 +168,14 @@
 
             IConnectorDescriptor scanSortConn = new MToNPartitioningConnectorDescriptor(spec,
                     new FieldHashPartitionComputerFactory(keys,
-                            new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+                            new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                    .of(UTF8StringPointable.FACTORY) }));
             spec.connect(scanSortConn, wordScanner, 0, sorter, 0);
 
-            gBy = new PreclusteredGroupOperatorDescriptor(spec, keys,
-                    new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+            gBy = new PreclusteredGroupOperatorDescriptor(
+                    spec,
+                    keys,
+                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
                     new MultiFieldsAggregatorFactory(
                             new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
                     groupResultDesc);
diff --git a/hyracks-examples/text-example/texthelper/pom.xml b/hyracks-examples/text-example/texthelper/pom.xml
index 60c423d..b0bfcef 100644
--- a/hyracks-examples/text-example/texthelper/pom.xml
+++ b/hyracks-examples/text-example/texthelper/pom.xml
@@ -2,7 +2,6 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks.examples.text</groupId>
   <artifactId>texthelper</artifactId>
-  <version>0.2.0-SNAPSHOT</version>
 
   <parent>
     <groupId>edu.uci.ics.hyracks.examples</groupId>
@@ -23,6 +22,11 @@
   		<version>0.2.0-SNAPSHOT</version>
   		<scope>compile</scope>
   	</dependency>
+  	<dependency>
+  		<groupId>edu.uci.ics.hyracks</groupId>
+  		<artifactId>hyracks-data-std</artifactId>
+  		<version>0.2.0-SNAPSHOT</version>
+  	</dependency>
   </dependencies>
   <build>
     <plugins>
diff --git a/hyracks-examples/tpch-example/tpchapp/pom.xml b/hyracks-examples/tpch-example/tpchapp/pom.xml
index c65aaf5..05256a3 100644
--- a/hyracks-examples/tpch-example/tpchapp/pom.xml
+++ b/hyracks-examples/tpch-example/tpchapp/pom.xml
@@ -2,8 +2,6 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks.examples.tpch</groupId>
   <artifactId>tpchapp</artifactId>
-  <version>0.2.0-SNAPSHOT</version>
-
   <parent>
     <groupId>edu.uci.ics.hyracks.examples</groupId>
     <artifactId>tpch-example</artifactId>
@@ -83,5 +81,10 @@
         <version>0.2.0-SNAPSHOT</version>
         <scope>compile</scope>
     </dependency>
+    <dependency>
+    	<groupId>edu.uci.ics.hyracks</groupId>
+    	<artifactId>hyracks-data-std</artifactId>
+    	<version>0.2.0-SNAPSHOT</version>
+    </dependency>
   </dependencies>
 </project>
diff --git a/hyracks-examples/tpch-example/tpchclient/pom.xml b/hyracks-examples/tpch-example/tpchclient/pom.xml
index c7a138a..d9b5fce 100644
--- a/hyracks-examples/tpch-example/tpchclient/pom.xml
+++ b/hyracks-examples/tpch-example/tpchclient/pom.xml
@@ -2,8 +2,6 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks.examples.tpch</groupId>
   <artifactId>tpchclient</artifactId>
-  <version>0.2.0-SNAPSHOT</version>
-
   <parent>
     <groupId>edu.uci.ics.hyracks.examples</groupId>
     <artifactId>tpch-example</artifactId>
@@ -17,6 +15,11 @@
   		<version>0.2.0-SNAPSHOT</version>
   		<scope>compile</scope>
   	</dependency>
+  	<dependency>
+  		<groupId>edu.uci.ics.hyracks</groupId>
+  		<artifactId>hyracks-data-std</artifactId>
+  		<version>0.2.0-SNAPSHOT</version>
+  	</dependency>
   </dependencies>
   <build>
     <plugins>
diff --git a/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java b/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
index 9e88f8c..72533e7 100644
--- a/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
+++ b/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
@@ -20,7 +20,7 @@
 import org.kohsuke.args4j.CmdLineParser;
 import org.kohsuke.args4j.Option;
 
-import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
@@ -38,8 +38,9 @@
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.hash.UTF8StringBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
@@ -68,7 +69,7 @@
         public String host;
 
         @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1099)", required = false)
-        public int port = 1099;
+        public int port = 1098;
 
         @Option(name = "-app", usage = "Hyracks Application name", required = true)
         public String app;
@@ -117,24 +118,16 @@
         CmdLineParser parser = new CmdLineParser(options);
         parser.parseArgument(args);
 
-        IHyracksClientConnection hcc = new HyracksRMIConnection(options.host,
-                options.port);
+        IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port);
 
-        JobSpecification job = createJob(
-                parseFileSplits(options.inFileCustomerSplits),
-                parseFileSplits(options.inFileOrderSplits),
-                parseFileSplits(options.outFileSplits),
-                options.numJoinPartitions, options.algo,
-                options.graceInputSize, options.graceRecordsPerFrame,
-                options.graceFactor, options.memSize, options.tableSize,
-                options.hasGroupBy);
+        JobSpecification job = createJob(parseFileSplits(options.inFileCustomerSplits),
+                parseFileSplits(options.inFileOrderSplits), parseFileSplits(options.outFileSplits),
+                options.numJoinPartitions, options.algo, options.graceInputSize, options.graceRecordsPerFrame,
+                options.graceFactor, options.memSize, options.tableSize, options.hasGroupBy);
 
         long start = System.currentTimeMillis();
-        JobId jobId = hcc.createJob(
-                options.app,
-                job,
-                options.profile ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet
-                        .noneOf(JobFlag.class));
+        JobId jobId = hcc.createJob(options.app, job,
+                options.profile ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
         hcc.start(jobId);
         hcc.waitForCompletion(jobId);
         long end = System.currentTimeMillis();
@@ -148,104 +141,65 @@
             String s = splits[i].trim();
             int idx = s.indexOf(':');
             if (idx < 0) {
-                throw new IllegalArgumentException("File split " + s
-                        + " not well formed");
+                throw new IllegalArgumentException("File split " + s + " not well formed");
             }
-            fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(
-                    new File(s.substring(idx + 1))));
+            fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(new File(s.substring(idx + 1))));
         }
         return fSplits;
     }
 
-    private static JobSpecification createJob(FileSplit[] customerSplits,
-            FileSplit[] orderSplits, FileSplit[] resultSplits,
-            int numJoinPartitions, String algo, int graceInputSize,
-            int graceRecordsPerFrame, double graceFactor, int memSize,
-            int tableSize, boolean hasGroupBy) throws HyracksDataException {
+    private static JobSpecification createJob(FileSplit[] customerSplits, FileSplit[] orderSplits,
+            FileSplit[] resultSplits, int numJoinPartitions, String algo, int graceInputSize, int graceRecordsPerFrame,
+            double graceFactor, int memSize, int tableSize, boolean hasGroupBy) throws HyracksDataException {
         JobSpecification spec = new JobSpecification();
 
-        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(
-                customerSplits);
-        RecordDescriptor custDesc = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE });
+        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(customerSplits);
+        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
 
-        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(
-                orderSplits);
-        RecordDescriptor ordersDesc = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE });
+        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(orderSplits);
+        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
 
-        RecordDescriptor custOrderJoinDesc = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE });
+        RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
 
-        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(
-                spec, ordersSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] {
-                        UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
         createPartitionConstraint(spec, ordScanner, orderSplits);
 
-        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(
-                spec, custSplitsProvider, new DelimitedDataTupleParserFactory(
-                        new IValueParserFactory[] {
-                                UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE }, '|'),
-                custDesc);
+        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
         createPartitionConstraint(spec, custScanner, customerSplits);
 
         IOperatorDescriptor join;
 
         if ("nestedloop".equalsIgnoreCase(algo)) {
-            join = new NestedLoopJoinOperatorDescriptor(spec,
-                    new JoinComparatorFactory(
-                            UTF8StringBinaryComparatorFactory.INSTANCE, 0, 1),
-                    custOrderJoinDesc, memSize);
+            join = new NestedLoopJoinOperatorDescriptor(spec, new JoinComparatorFactory(
+                    PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1), custOrderJoinDesc, memSize);
 
         } else if ("gracehash".equalsIgnoreCase(algo)) {
             join = new GraceHashJoinOperatorDescriptor(
@@ -256,8 +210,9 @@
                     graceFactor,
                     new int[] { 0 },
                     new int[] { 1 },
-                    new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
-                    new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                    new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                            .of(UTF8StringPointable.FACTORY) },
+                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
                     custOrderJoinDesc);
 
         } else if ("hybridhash".equalsIgnoreCase(algo)) {
@@ -269,8 +224,9 @@
                     graceFactor,
                     new int[] { 0 },
                     new int[] { 1 },
-                    new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
-                    new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                    new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                            .of(UTF8StringPointable.FACTORY) },
+                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
                     custOrderJoinDesc);
 
         } else {
@@ -278,81 +234,71 @@
                     spec,
                     new int[] { 0 },
                     new int[] { 1 },
-                    new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
-                    new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                    new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                            .of(UTF8StringPointable.FACTORY) },
+                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
                     custOrderJoinDesc, 6000000);
         }
 
-        PartitionConstraintHelper.addPartitionCountConstraint(spec, join,
-                numJoinPartitions);
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, join, numJoinPartitions);
 
-        IConnectorDescriptor ordJoinConn = new MToNPartitioningConnectorDescriptor(
-                spec,
-                new FieldHashPartitionComputerFactory(
-                        new int[] { 1 },
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        IConnectorDescriptor ordJoinConn = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(new int[] { 1 },
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
         spec.connect(ordJoinConn, ordScanner, 0, join, 1);
 
-        IConnectorDescriptor custJoinConn = new MToNPartitioningConnectorDescriptor(
-                spec,
-                new FieldHashPartitionComputerFactory(
-                        new int[] { 0 },
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        IConnectorDescriptor custJoinConn = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(new int[] { 0 },
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
         spec.connect(custJoinConn, custScanner, 0, join, 0);
 
         IOperatorDescriptor endingOp = join;
 
         if (hasGroupBy) {
 
-            RecordDescriptor groupResultDesc = new RecordDescriptor(
-                    new ISerializerDeserializer[] {
-                            UTF8StringSerializerDeserializer.INSTANCE,
-                            IntegerSerializerDeserializer.INSTANCE });
+            RecordDescriptor groupResultDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                    UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
 
             HashGroupOperatorDescriptor gby = new HashGroupOperatorDescriptor(
                     spec,
                     new int[] { 6 },
-                    new FieldHashPartitionComputerFactory(
-                            new int[] { 6 },
-                            new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-                    new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                    new FieldHashPartitionComputerFactory(new int[] { 6 },
+                            new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                    .of(UTF8StringPointable.FACTORY) }),
+                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
                     new MultiFieldsAggregatorFactory(
                             new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
                     groupResultDesc, 16);
             createPartitionConstraint(spec, gby, resultSplits);
 
-            IConnectorDescriptor joinGroupConn = new MToNPartitioningConnectorDescriptor(
-                    spec,
-                    new FieldHashPartitionComputerFactory(
-                            new int[] { 6 },
-                            new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+            IConnectorDescriptor joinGroupConn = new MToNPartitioningConnectorDescriptor(spec,
+                    new FieldHashPartitionComputerFactory(new int[] { 6 },
+                            new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                    .of(UTF8StringPointable.FACTORY) }));
             spec.connect(joinGroupConn, join, 0, gby, 0);
 
             endingOp = gby;
         }
 
-        IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(
-                resultSplits);
-        FrameFileWriterOperatorDescriptor writer = new FrameFileWriterOperatorDescriptor(
-                spec, outSplitProvider);
+        IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(resultSplits);
+        FrameFileWriterOperatorDescriptor writer = new FrameFileWriterOperatorDescriptor(spec, outSplitProvider);
         createPartitionConstraint(spec, writer, resultSplits);
 
-        IConnectorDescriptor endingPrinterConn = new OneToOneConnectorDescriptor(
-                spec);
+        IConnectorDescriptor endingPrinterConn = new OneToOneConnectorDescriptor(spec);
         spec.connect(endingPrinterConn, endingOp, 0, writer, 0);
 
         spec.addRoot(writer);
         return spec;
     }
 
-    private static void createPartitionConstraint(JobSpecification spec,
-            IOperatorDescriptor op, FileSplit[] splits) {
+    private static void createPartitionConstraint(JobSpecification spec, IOperatorDescriptor op, FileSplit[] splits) {
         String[] parts = new String[splits.length];
         for (int i = 0; i < splits.length; ++i) {
             parts[i] = splits[i].getNodeName();
         }
-        PartitionConstraintHelper
-                .addAbsoluteLocationConstraint(spec, op, parts);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, op, parts);
     }
 
     static class JoinComparatorFactory implements ITuplePairComparatorFactory {
@@ -362,8 +308,7 @@
         private final int pos0;
         private final int pos1;
 
-        public JoinComparatorFactory(IBinaryComparatorFactory bFactory,
-                int pos0, int pos1) {
+        public JoinComparatorFactory(IBinaryComparatorFactory bFactory, int pos0, int pos1) {
             this.bFactory = bFactory;
             this.pos0 = pos0;
             this.pos1 = pos1;
@@ -371,8 +316,7 @@
 
         @Override
         public ITuplePairComparator createTuplePairComparator() {
-            return new JoinComparator(bFactory.createBinaryComparator(), pos0,
-                    pos1);
+            return new JoinComparator(bFactory.createBinaryComparator(), pos0, pos1);
         }
     }
 
@@ -382,16 +326,14 @@
         private final int field0;
         private final int field1;
 
-        public JoinComparator(IBinaryComparator bComparator, int field0,
-                int field1) {
+        public JoinComparator(IBinaryComparator bComparator, int field0, int field1) {
             this.bComparator = bComparator;
             this.field0 = field0;
             this.field1 = field1;
         }
 
         @Override
-        public int compare(IFrameTupleAccessor accessor0, int tIndex0,
-                IFrameTupleAccessor accessor1, int tIndex1) {
+        public int compare(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1, int tIndex1) {
             int tStart0 = accessor0.getTupleStartOffset(tIndex0);
             int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0;
 
@@ -406,9 +348,8 @@
             int fEnd1 = accessor1.getFieldEndOffset(tIndex1, field1);
             int fLen1 = fEnd1 - fStart1;
 
-            int c = bComparator.compare(accessor0.getBuffer().array(), fStart0
-                    + fStartOffset0, fLen0, accessor1.getBuffer().array(),
-                    fStart1 + fStartOffset1, fLen1);
+            int c = bComparator.compare(accessor0.getBuffer().array(), fStart0 + fStartOffset0, fLen0, accessor1
+                    .getBuffer().array(), fStart1 + fStartOffset1, fLen1);
             if (c != 0) {
                 return c;
             }