merge hyracks_dev_next r847:977

git-svn-id: https://hyracks.googlecode.com/svn/branches/aggregators_dev_next@978 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-examples/hyracks-integration-tests/pom.xml b/hyracks-examples/hyracks-integration-tests/pom.xml
index 7012cde..d74b384 100644
--- a/hyracks-examples/hyracks-integration-tests/pom.xml
+++ b/hyracks-examples/hyracks-integration-tests/pom.xml
@@ -2,8 +2,6 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks.examples</groupId>
   <artifactId>hyracks-integration-tests</artifactId>
-  <version>0.2.0-SNAPSHOT</version>
-
   <parent>
     <groupId>edu.uci.ics.hyracks</groupId>
     <artifactId>hyracks-examples</artifactId>
@@ -80,5 +78,10 @@
   		<type>jar</type>
   		<scope>test</scope>
   	</dependency>
+  	<dependency>
+  		<groupId>edu.uci.ics.hyracks</groupId>
+  		<artifactId>hyracks-data-std</artifactId>
+  		<version>0.2.0-SNAPSHOT</version>
+  	</dependency>
   </dependencies>
 </project>
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreePrimaryIndexScanOperatorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreePrimaryIndexScanOperatorTest.java
index 38dd051..6ef1740 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreePrimaryIndexScanOperatorTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreePrimaryIndexScanOperatorTest.java
@@ -28,12 +28,13 @@
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITypeTrait;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
@@ -76,7 +77,7 @@
 
     // field, type and key declarations for primary index
     private int primaryFieldCount = 6;
-    private ITypeTrait[] primaryTypeTraits = new ITypeTrait[primaryFieldCount];
+    private ITypeTraits[] primaryTypeTraits = new ITypeTraits[primaryFieldCount];
     private int primaryKeyFieldCount = 1;
     private IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[primaryKeyFieldCount];
     private TypeAwareTupleWriterFactory primaryTupleWriterFactory = new TypeAwareTupleWriterFactory(primaryTypeTraits);
@@ -98,13 +99,13 @@
     @Before
     public void setup() throws Exception {
         // field, type and key declarations for primary index
-        primaryTypeTraits[0] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryTypeTraits[1] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryTypeTraits[2] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryTypeTraits[3] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryTypeTraits[4] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryTypeTraits[5] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryComparatorFactories[0] = UTF8StringBinaryComparatorFactory.INSTANCE;
+        primaryTypeTraits[0] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[1] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[2] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[3] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[4] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[5] = UTF8StringPointable.TYPE_TRAITS;
+        primaryComparatorFactories[0] = PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY);
 
         loadPrimaryIndexTest();
     }
@@ -131,7 +132,8 @@
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID);
 
         ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, 1000, new int[] { 0 },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, ordersDesc);
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID);
 
         int[] fieldPermutation = { 0, 1, 2, 4, 5, 7 };
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreePrimaryIndexSearchOperatorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreePrimaryIndexSearchOperatorTest.java
index a043675..51dbb69 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreePrimaryIndexSearchOperatorTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreePrimaryIndexSearchOperatorTest.java
@@ -28,12 +28,13 @@
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITypeTrait;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
@@ -76,7 +77,7 @@
 
     // field, type and key declarations for primary index
     private int primaryFieldCount = 6;
-    private ITypeTrait[] primaryTypeTraits = new ITypeTrait[primaryFieldCount];
+    private ITypeTraits[] primaryTypeTraits = new ITypeTraits[primaryFieldCount];
     private int primaryKeyFieldCount = 1;
     private IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[primaryKeyFieldCount];
     private TypeAwareTupleWriterFactory primaryTupleWriterFactory = new TypeAwareTupleWriterFactory(primaryTypeTraits);
@@ -98,13 +99,13 @@
     @Before
     public void setup() throws Exception {
         // field, type and key declarations for primary index
-        primaryTypeTraits[0] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryTypeTraits[1] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryTypeTraits[2] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryTypeTraits[3] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryTypeTraits[4] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryTypeTraits[5] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryComparatorFactories[0] = UTF8StringBinaryComparatorFactory.INSTANCE;
+        primaryTypeTraits[0] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[1] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[2] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[3] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[4] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[5] = UTF8StringPointable.TYPE_TRAITS;
+        primaryComparatorFactories[0] = PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY);
 
         loadPrimaryIndexTest();
     }
@@ -131,7 +132,8 @@
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID);
 
         ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, 1000, new int[] { 0 },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, ordersDesc);
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID);
 
         int[] fieldPermutation = { 0, 1, 2, 4, 5, 7 };
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreePrimaryIndexStatsOperatorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreePrimaryIndexStatsOperatorTest.java
index 51d3db1..a804bae 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreePrimaryIndexStatsOperatorTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreePrimaryIndexStatsOperatorTest.java
@@ -27,11 +27,12 @@
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITypeTrait;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
@@ -73,7 +74,7 @@
 
     // field, type and key declarations for primary index
     private int primaryFieldCount = 6;
-    private ITypeTrait[] primaryTypeTraits = new ITypeTrait[primaryFieldCount];
+    private ITypeTraits[] primaryTypeTraits = new ITypeTraits[primaryFieldCount];
     private int primaryKeyFieldCount = 1;
     private IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[primaryKeyFieldCount];
     private TypeAwareTupleWriterFactory primaryTupleWriterFactory = new TypeAwareTupleWriterFactory(primaryTypeTraits);
@@ -95,13 +96,13 @@
     @Before
     public void setup() throws Exception {
         // field, type and key declarations for primary index
-        primaryTypeTraits[0] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryTypeTraits[1] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryTypeTraits[2] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryTypeTraits[3] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryTypeTraits[4] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryTypeTraits[5] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryComparatorFactories[0] = UTF8StringBinaryComparatorFactory.INSTANCE;
+        primaryTypeTraits[0] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[1] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[2] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[3] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[4] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[5] = UTF8StringPointable.TYPE_TRAITS;
+        primaryComparatorFactories[0] = PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY);
 
         loadPrimaryIndexTest();
     }
@@ -128,7 +129,8 @@
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID);
 
         ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, 1000, new int[] { 0 },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, ordersDesc);
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID);
 
         int[] fieldPermutation = { 0, 1, 2, 4, 5, 7 };
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreeSecondaryIndexInsertOperatorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreeSecondaryIndexInsertOperatorTest.java
index 3b6b202..a16bbe4 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreeSecondaryIndexInsertOperatorTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreeSecondaryIndexInsertOperatorTest.java
@@ -28,12 +28,13 @@
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITypeTrait;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
@@ -79,7 +80,7 @@
 
     // field, type and key declarations for primary index
     private int primaryFieldCount = 6;
-    private ITypeTrait[] primaryTypeTraits = new ITypeTrait[primaryFieldCount];
+    private ITypeTraits[] primaryTypeTraits = new ITypeTraits[primaryFieldCount];
     private int primaryKeyFieldCount = 1;
     private IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[primaryKeyFieldCount];
     private TypeAwareTupleWriterFactory primaryTupleWriterFactory = new TypeAwareTupleWriterFactory(primaryTypeTraits);
@@ -100,7 +101,7 @@
 
     // field, type and key declarations for secondary indexes
     private int secondaryFieldCount = 2;
-    private ITypeTrait[] secondaryTypeTraits = new ITypeTrait[secondaryFieldCount];
+    private ITypeTraits[] secondaryTypeTraits = new ITypeTraits[secondaryFieldCount];
     private int secondaryKeyFieldCount = 2;
     private IBinaryComparatorFactory[] secondaryComparatorFactories = new IBinaryComparatorFactory[secondaryKeyFieldCount];
     private TypeAwareTupleWriterFactory secondaryTupleWriterFactory = new TypeAwareTupleWriterFactory(
@@ -121,19 +122,19 @@
     @Before
     public void setup() throws Exception {
         // field, type and key declarations for primary index
-        primaryTypeTraits[0] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryTypeTraits[1] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryTypeTraits[2] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryTypeTraits[3] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryTypeTraits[4] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryTypeTraits[5] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryComparatorFactories[0] = UTF8StringBinaryComparatorFactory.INSTANCE;
+        primaryTypeTraits[0] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[1] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[2] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[3] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[4] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[5] = UTF8StringPointable.TYPE_TRAITS;
+        primaryComparatorFactories[0] = PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY);
 
         // field, type and key declarations for secondary indexes
-        secondaryTypeTraits[0] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        secondaryTypeTraits[1] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        secondaryComparatorFactories[0] = UTF8StringBinaryComparatorFactory.INSTANCE;
-        secondaryComparatorFactories[1] = UTF8StringBinaryComparatorFactory.INSTANCE;
+        secondaryTypeTraits[0] = UTF8StringPointable.TYPE_TRAITS;
+        secondaryTypeTraits[1] = UTF8StringPointable.TYPE_TRAITS;
+        secondaryComparatorFactories[0] = PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY);
+        secondaryComparatorFactories[1] = PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY);
 
         loadPrimaryIndexTest();
         loadSecondaryIndexTest();
@@ -162,7 +163,8 @@
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID);
 
         ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, 1000, new int[] { 0 },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, ordersDesc);
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID);
 
         int[] fieldPermutation = { 0, 1, 2, 4, 5, 7 };
@@ -211,7 +213,8 @@
 
         // sort based on secondary keys
         ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, 1000, new int[] { 3, 0 },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, primaryRecDesc);
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                primaryRecDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID);
 
         // load secondary index
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreeSecondaryIndexSearchOperatorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreeSecondaryIndexSearchOperatorTest.java
index 1e070a8..ef1c6f6 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreeSecondaryIndexSearchOperatorTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/btree/BTreeSecondaryIndexSearchOperatorTest.java
@@ -28,12 +28,13 @@
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITypeTrait;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
@@ -62,308 +63,238 @@
 import edu.uci.ics.hyracks.test.support.TestStorageManagerInterface;
 import edu.uci.ics.hyracks.tests.integration.AbstractIntegrationTest;
 
-public class BTreeSecondaryIndexSearchOperatorTest extends
-		AbstractIntegrationTest {
-	static {
-		TestStorageManagerComponentHolder.init(8192, 20, 20);
-	}
+public class BTreeSecondaryIndexSearchOperatorTest extends AbstractIntegrationTest {
+    static {
+        TestStorageManagerComponentHolder.init(8192, 20, 20);
+    }
 
-	private IStorageManagerInterface storageManager = new TestStorageManagerInterface();
-	private IIndexRegistryProvider<IIndex> indexRegistryProvider = new TestIndexRegistryProvider();
-	private IIndexDataflowHelperFactory dataflowHelperFactory = new BTreeDataflowHelperFactory();
+    private IStorageManagerInterface storageManager = new TestStorageManagerInterface();
+    private IIndexRegistryProvider<IIndex> indexRegistryProvider = new TestIndexRegistryProvider();
+    private IIndexDataflowHelperFactory dataflowHelperFactory = new BTreeDataflowHelperFactory();
 
-	private final static SimpleDateFormat simpleDateFormat = new SimpleDateFormat(
-			"ddMMyy-hhmmssSS");
-	private final static String sep = System.getProperty("file.separator");
+    private final static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("ddMMyy-hhmmssSS");
+    private final static String sep = System.getProperty("file.separator");
 
-	// field, type and key declarations for primary index
-	private int primaryFieldCount = 6;
-	private ITypeTrait[] primaryTypeTraits = new ITypeTrait[primaryFieldCount];
-	private int primaryKeyFieldCount = 1;
-	private IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[primaryKeyFieldCount];
-	private TypeAwareTupleWriterFactory primaryTupleWriterFactory = new TypeAwareTupleWriterFactory(
-			primaryTypeTraits);
-	private ITreeIndexFrameFactory primaryInteriorFrameFactory = new BTreeNSMInteriorFrameFactory(
-			primaryTupleWriterFactory);
-	private ITreeIndexFrameFactory primaryLeafFrameFactory = new BTreeNSMLeafFrameFactory(
-			primaryTupleWriterFactory);
+    // field, type and key declarations for primary index
+    private int primaryFieldCount = 6;
+    private ITypeTraits[] primaryTypeTraits = new ITypeTraits[primaryFieldCount];
+    private int primaryKeyFieldCount = 1;
+    private IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[primaryKeyFieldCount];
+    private TypeAwareTupleWriterFactory primaryTupleWriterFactory = new TypeAwareTupleWriterFactory(primaryTypeTraits);
+    private ITreeIndexFrameFactory primaryInteriorFrameFactory = new BTreeNSMInteriorFrameFactory(
+            primaryTupleWriterFactory);
+    private ITreeIndexFrameFactory primaryLeafFrameFactory = new BTreeNSMLeafFrameFactory(primaryTupleWriterFactory);
 
-	private static String primaryBtreeName = "primary"
-			+ simpleDateFormat.format(new Date());
-	private static String primaryFileName = System
-			.getProperty("java.io.tmpdir") + sep + primaryBtreeName;
+    private static String primaryBtreeName = "primary" + simpleDateFormat.format(new Date());
+    private static String primaryFileName = System.getProperty("java.io.tmpdir") + sep + primaryBtreeName;
 
-	private IFileSplitProvider primaryBtreeSplitProvider = new ConstantFileSplitProvider(
-			new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
-					primaryFileName))) });
+    private IFileSplitProvider primaryBtreeSplitProvider = new ConstantFileSplitProvider(
+            new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(primaryFileName))) });
 
-	private RecordDescriptor primaryRecDesc = new RecordDescriptor(
-			new ISerializerDeserializer[] {
-					UTF8StringSerializerDeserializer.INSTANCE,
-					UTF8StringSerializerDeserializer.INSTANCE,
-					UTF8StringSerializerDeserializer.INSTANCE,
-					UTF8StringSerializerDeserializer.INSTANCE,
-					UTF8StringSerializerDeserializer.INSTANCE,
-					UTF8StringSerializerDeserializer.INSTANCE });
+    private RecordDescriptor primaryRecDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
 
-	// field, type and key declarations for secondary indexes
-	private int secondaryFieldCount = 2;
-	private ITypeTrait[] secondaryTypeTraits = new ITypeTrait[secondaryFieldCount];
-	private int secondaryKeyFieldCount = 2;
-	private IBinaryComparatorFactory[] secondaryComparatorFactories = new IBinaryComparatorFactory[secondaryKeyFieldCount];
-	private TypeAwareTupleWriterFactory secondaryTupleWriterFactory = new TypeAwareTupleWriterFactory(
-			secondaryTypeTraits);
-	private ITreeIndexFrameFactory secondaryInteriorFrameFactory = new BTreeNSMInteriorFrameFactory(
-			secondaryTupleWriterFactory);
-	private ITreeIndexFrameFactory secondaryLeafFrameFactory = new BTreeNSMLeafFrameFactory(
-			secondaryTupleWriterFactory);
+    // field, type and key declarations for secondary indexes
+    private int secondaryFieldCount = 2;
+    private ITypeTraits[] secondaryTypeTraits = new ITypeTraits[secondaryFieldCount];
+    private int secondaryKeyFieldCount = 2;
+    private IBinaryComparatorFactory[] secondaryComparatorFactories = new IBinaryComparatorFactory[secondaryKeyFieldCount];
+    private TypeAwareTupleWriterFactory secondaryTupleWriterFactory = new TypeAwareTupleWriterFactory(
+            secondaryTypeTraits);
+    private ITreeIndexFrameFactory secondaryInteriorFrameFactory = new BTreeNSMInteriorFrameFactory(
+            secondaryTupleWriterFactory);
+    private ITreeIndexFrameFactory secondaryLeafFrameFactory = new BTreeNSMLeafFrameFactory(secondaryTupleWriterFactory);
 
-	private static String secondaryBtreeName = "secondary"
-			+ simpleDateFormat.format(new Date());
-	private static String secondaryFileName = System
-			.getProperty("java.io.tmpdir") + sep + secondaryBtreeName;
+    private static String secondaryBtreeName = "secondary" + simpleDateFormat.format(new Date());
+    private static String secondaryFileName = System.getProperty("java.io.tmpdir") + sep + secondaryBtreeName;
 
-	private IFileSplitProvider secondaryBtreeSplitProvider = new ConstantFileSplitProvider(
-			new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
-					secondaryFileName))) });
+    private IFileSplitProvider secondaryBtreeSplitProvider = new ConstantFileSplitProvider(
+            new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(secondaryFileName))) });
 
-	private RecordDescriptor secondaryRecDesc = new RecordDescriptor(
-			new ISerializerDeserializer[] {
-					UTF8StringSerializerDeserializer.INSTANCE,
-					UTF8StringSerializerDeserializer.INSTANCE });
+    private RecordDescriptor secondaryRecDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
 
-	@Before
-	public void setup() throws Exception {
-		// field, type and key declarations for primary index
-		primaryTypeTraits[0] = ITypeTrait.VARLEN_TYPE_TRAIT;
-		primaryTypeTraits[1] = ITypeTrait.VARLEN_TYPE_TRAIT;
-		primaryTypeTraits[2] = ITypeTrait.VARLEN_TYPE_TRAIT;
-		primaryTypeTraits[3] = ITypeTrait.VARLEN_TYPE_TRAIT;
-		primaryTypeTraits[4] = ITypeTrait.VARLEN_TYPE_TRAIT;
-		primaryTypeTraits[5] = ITypeTrait.VARLEN_TYPE_TRAIT;
-		primaryComparatorFactories[0] = UTF8StringBinaryComparatorFactory.INSTANCE;
+    @Before
+    public void setup() throws Exception {
+        // field, type and key declarations for primary index
+        primaryTypeTraits[0] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[1] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[2] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[3] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[4] = UTF8StringPointable.TYPE_TRAITS;
+        primaryTypeTraits[5] = UTF8StringPointable.TYPE_TRAITS;
+        primaryComparatorFactories[0] = PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY);
 
-		// field, type and key declarations for secondary indexes
-		secondaryTypeTraits[0] = ITypeTrait.VARLEN_TYPE_TRAIT;
-		secondaryTypeTraits[1] = ITypeTrait.VARLEN_TYPE_TRAIT;
-		secondaryComparatorFactories[0] = UTF8StringBinaryComparatorFactory.INSTANCE;
-		secondaryComparatorFactories[1] = UTF8StringBinaryComparatorFactory.INSTANCE;
+        // field, type and key declarations for secondary indexes
+        secondaryTypeTraits[0] = UTF8StringPointable.TYPE_TRAITS;
+        secondaryTypeTraits[1] = UTF8StringPointable.TYPE_TRAITS;
+        secondaryComparatorFactories[0] = PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY);
+        secondaryComparatorFactories[1] = PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY);
 
-		loadPrimaryIndexTest();
-		loadSecondaryIndexTest();
-	}
+        loadPrimaryIndexTest();
+        loadSecondaryIndexTest();
+    }
 
-	public void loadPrimaryIndexTest() throws Exception {
-		JobSpecification spec = new JobSpecification();
+    public void loadPrimaryIndexTest() throws Exception {
+        JobSpecification spec = new JobSpecification();
 
-		FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC1_ID,
-				new FileReference(new File("data/tpch0.001/orders-part1.tbl"))) };
-		IFileSplitProvider ordersSplitProvider = new ConstantFileSplitProvider(
-				ordersSplits);
-		RecordDescriptor ordersDesc = new RecordDescriptor(
-				new ISerializerDeserializer[] {
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE,
-						UTF8StringSerializerDeserializer.INSTANCE });
+        FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
+                "data/tpch0.001/orders-part1.tbl"))) };
+        IFileSplitProvider ordersSplitProvider = new ConstantFileSplitProvider(ordersSplits);
+        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
 
-		FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(
-				spec, ordersSplitProvider, new DelimitedDataTupleParserFactory(
-						new IValueParserFactory[] {
-								UTF8StringParserFactory.INSTANCE,
-								UTF8StringParserFactory.INSTANCE,
-								UTF8StringParserFactory.INSTANCE,
-								UTF8StringParserFactory.INSTANCE,
-								UTF8StringParserFactory.INSTANCE,
-								UTF8StringParserFactory.INSTANCE,
-								UTF8StringParserFactory.INSTANCE,
-								UTF8StringParserFactory.INSTANCE,
-								UTF8StringParserFactory.INSTANCE }, '|'),
-				ordersDesc);
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-				ordScanner, NC1_ID);
+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID);
 
-		ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(
-				spec,
-				1000,
-				new int[] { 0 },
-				new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-				ordersDesc);
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter,
-				NC1_ID);
+        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, 1000, new int[] { 0 },
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                ordersDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID);
 
-		int[] fieldPermutation = { 0, 1, 2, 4, 5, 7 };
-		TreeIndexBulkLoadOperatorDescriptor primaryBtreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(
-				spec, storageManager, indexRegistryProvider,
-				primaryBtreeSplitProvider, primaryInteriorFrameFactory,
-				primaryLeafFrameFactory, primaryTypeTraits,
-				primaryComparatorFactories, fieldPermutation, 0.7f,
-				dataflowHelperFactory);
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-				primaryBtreeBulkLoad, NC1_ID);
+        int[] fieldPermutation = { 0, 1, 2, 4, 5, 7 };
+        TreeIndexBulkLoadOperatorDescriptor primaryBtreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
+                storageManager, indexRegistryProvider, primaryBtreeSplitProvider, primaryInteriorFrameFactory,
+                primaryLeafFrameFactory, primaryTypeTraits, primaryComparatorFactories, fieldPermutation, 0.7f,
+                dataflowHelperFactory);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeBulkLoad, NC1_ID);
 
-		spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0,
-				sorter, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
 
-		spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0,
-				primaryBtreeBulkLoad, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0, primaryBtreeBulkLoad, 0);
 
-		spec.addRoot(primaryBtreeBulkLoad);
-		runTest(spec);
-	}
+        spec.addRoot(primaryBtreeBulkLoad);
+        runTest(spec);
+    }
 
-	public void loadSecondaryIndexTest() throws Exception {
-		JobSpecification spec = new JobSpecification();
+    public void loadSecondaryIndexTest() throws Exception {
+        JobSpecification spec = new JobSpecification();
 
-		// build dummy tuple containing nothing
-		ArrayTupleBuilder tb = new ArrayTupleBuilder(primaryKeyFieldCount * 2);
-		DataOutput dos = tb.getDataOutput();
+        // build dummy tuple containing nothing
+        ArrayTupleBuilder tb = new ArrayTupleBuilder(primaryKeyFieldCount * 2);
+        DataOutput dos = tb.getDataOutput();
 
-		tb.reset();
-		UTF8StringSerializerDeserializer.INSTANCE.serialize("0", dos);
-		tb.addFieldEndOffset();
+        tb.reset();
+        UTF8StringSerializerDeserializer.INSTANCE.serialize("0", dos);
+        tb.addFieldEndOffset();
 
-		ISerializerDeserializer[] keyRecDescSers = {
-				UTF8StringSerializerDeserializer.INSTANCE,
-				UTF8StringSerializerDeserializer.INSTANCE };
-		RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+        ISerializerDeserializer[] keyRecDescSers = { UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE };
+        RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
 
-		ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(
-				spec, keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(),
-				tb.getSize());
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-				keyProviderOp, NC1_ID);
+        ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(spec,
+                keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, keyProviderOp, NC1_ID);
 
-		int[] lowKeyFields = null; // - infinity
-		int[] highKeyFields = null; // + infinity
+        int[] lowKeyFields = null; // - infinity
+        int[] highKeyFields = null; // + infinity
 
-		// scan primary index
-		BTreeSearchOperatorDescriptor primaryBtreeSearchOp = new BTreeSearchOperatorDescriptor(
-				spec, primaryRecDesc, storageManager,
-				indexRegistryProvider, primaryBtreeSplitProvider,
-				primaryInteriorFrameFactory, primaryLeafFrameFactory,
-				primaryTypeTraits, primaryComparatorFactories, true,
-				lowKeyFields, highKeyFields, true, true, dataflowHelperFactory);
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-				primaryBtreeSearchOp, NC1_ID);
+        // scan primary index
+        BTreeSearchOperatorDescriptor primaryBtreeSearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
+                storageManager, indexRegistryProvider, primaryBtreeSplitProvider, primaryInteriorFrameFactory,
+                primaryLeafFrameFactory, primaryTypeTraits, primaryComparatorFactories, true, lowKeyFields,
+                highKeyFields, true, true, dataflowHelperFactory);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeSearchOp, NC1_ID);
 
-		// sort based on secondary keys
-		ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(
-				spec,
-				1000,
-				new int[] { 3, 0 },
-				new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-				primaryRecDesc);
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter,
-				NC1_ID);
+        // sort based on secondary keys
+        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, 1000, new int[] { 3, 0 },
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                primaryRecDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID);
 
-		// load secondary index
-		int[] fieldPermutation = { 3, 0 };
-		TreeIndexBulkLoadOperatorDescriptor secondaryBtreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(
-				spec, storageManager, indexRegistryProvider,
-				secondaryBtreeSplitProvider, secondaryInteriorFrameFactory,
-				secondaryLeafFrameFactory, secondaryTypeTraits,
-				secondaryComparatorFactories, fieldPermutation, 0.7f,
-				dataflowHelperFactory);
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-				secondaryBtreeBulkLoad, NC1_ID);
+        // load secondary index
+        int[] fieldPermutation = { 3, 0 };
+        TreeIndexBulkLoadOperatorDescriptor secondaryBtreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
+                storageManager, indexRegistryProvider, secondaryBtreeSplitProvider, secondaryInteriorFrameFactory,
+                secondaryLeafFrameFactory, secondaryTypeTraits, secondaryComparatorFactories, fieldPermutation, 0.7f,
+                dataflowHelperFactory);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryBtreeBulkLoad, NC1_ID);
 
-		spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0,
-				primaryBtreeSearchOp, 0);
-		spec.connect(new OneToOneConnectorDescriptor(spec),
-				primaryBtreeSearchOp, 0, sorter, 0);
-		spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0,
-				secondaryBtreeBulkLoad, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryBtreeSearchOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), primaryBtreeSearchOp, 0, sorter, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0, secondaryBtreeBulkLoad, 0);
 
-		spec.addRoot(secondaryBtreeBulkLoad);
-		runTest(spec);
-	}
+        spec.addRoot(secondaryBtreeBulkLoad);
+        runTest(spec);
+    }
 
-	@Test
-	public void searchSecondaryIndexTest() throws Exception {
-		JobSpecification spec = new JobSpecification();
+    @Test
+    public void searchSecondaryIndexTest() throws Exception {
+        JobSpecification spec = new JobSpecification();
 
-		// build tuple containing search keys (only use the first key as search
-		// key)
-		ArrayTupleBuilder tb = new ArrayTupleBuilder(secondaryKeyFieldCount);
-		DataOutput dos = tb.getDataOutput();
+        // build tuple containing search keys (only use the first key as search
+        // key)
+        ArrayTupleBuilder tb = new ArrayTupleBuilder(secondaryKeyFieldCount);
+        DataOutput dos = tb.getDataOutput();
 
-		tb.reset();
-		// low key
-		UTF8StringSerializerDeserializer.INSTANCE.serialize("1998-07-21", dos);
-		tb.addFieldEndOffset();
-		// high key
-		UTF8StringSerializerDeserializer.INSTANCE.serialize("2000-10-18", dos);
-		tb.addFieldEndOffset();
+        tb.reset();
+        // low key
+        UTF8StringSerializerDeserializer.INSTANCE.serialize("1998-07-21", dos);
+        tb.addFieldEndOffset();
+        // high key
+        UTF8StringSerializerDeserializer.INSTANCE.serialize("2000-10-18", dos);
+        tb.addFieldEndOffset();
 
-		ISerializerDeserializer[] keyRecDescSers = {
-				UTF8StringSerializerDeserializer.INSTANCE,
-				UTF8StringSerializerDeserializer.INSTANCE };
-		RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+        ISerializerDeserializer[] keyRecDescSers = { UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE };
+        RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
 
-		ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(
-				spec, keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(),
-				tb.getSize());
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-				keyProviderOp, NC1_ID);
+        ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(spec,
+                keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, keyProviderOp, NC1_ID);
 
-		int[] secondaryLowKeyFields = { 0 };
-		int[] secondaryHighKeyFields = { 1 };
+        int[] secondaryLowKeyFields = { 0 };
+        int[] secondaryHighKeyFields = { 1 };
 
-		// search secondary index
-		BTreeSearchOperatorDescriptor secondaryBtreeSearchOp = new BTreeSearchOperatorDescriptor(
-				spec, secondaryRecDesc, storageManager,
-				indexRegistryProvider, secondaryBtreeSplitProvider,
-				secondaryInteriorFrameFactory, secondaryLeafFrameFactory,
-				secondaryTypeTraits, secondaryComparatorFactories, true,
-				secondaryLowKeyFields, secondaryHighKeyFields, true, true,
-				dataflowHelperFactory);
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-				secondaryBtreeSearchOp, NC1_ID);
+        // search secondary index
+        BTreeSearchOperatorDescriptor secondaryBtreeSearchOp = new BTreeSearchOperatorDescriptor(spec,
+                secondaryRecDesc, storageManager, indexRegistryProvider, secondaryBtreeSplitProvider,
+                secondaryInteriorFrameFactory, secondaryLeafFrameFactory, secondaryTypeTraits,
+                secondaryComparatorFactories, true, secondaryLowKeyFields, secondaryHighKeyFields, true, true,
+                dataflowHelperFactory);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryBtreeSearchOp, NC1_ID);
 
-		int[] primaryLowKeyFields = { 1 }; // second field from the tuples
-		// coming from secondary index
-		int[] primaryHighKeyFields = { 1 }; // second field from the tuples
-		// coming from secondary index
+        int[] primaryLowKeyFields = { 1 }; // second field from the tuples
+        // coming from secondary index
+        int[] primaryHighKeyFields = { 1 }; // second field from the tuples
+        // coming from secondary index
 
-		// search primary index
-		BTreeSearchOperatorDescriptor primaryBtreeSearchOp = new BTreeSearchOperatorDescriptor(
-				spec, primaryRecDesc, storageManager,
-				indexRegistryProvider, primaryBtreeSplitProvider,
-				primaryInteriorFrameFactory, primaryLeafFrameFactory,
-				primaryTypeTraits, primaryComparatorFactories, true,
-				primaryLowKeyFields, primaryHighKeyFields, true, true,
-				dataflowHelperFactory);
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-				primaryBtreeSearchOp, NC1_ID);
+        // search primary index
+        BTreeSearchOperatorDescriptor primaryBtreeSearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
+                storageManager, indexRegistryProvider, primaryBtreeSplitProvider, primaryInteriorFrameFactory,
+                primaryLeafFrameFactory, primaryTypeTraits, primaryComparatorFactories, true, primaryLowKeyFields,
+                primaryHighKeyFields, true, true, dataflowHelperFactory);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeSearchOp, NC1_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
                 createTempFile().getAbsolutePath()) });
         IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-				NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
-		spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0,
-				secondaryBtreeSearchOp, 0);
-		spec.connect(new OneToOneConnectorDescriptor(spec),
-				secondaryBtreeSearchOp, 0, primaryBtreeSearchOp, 0);
-		spec.connect(new OneToOneConnectorDescriptor(spec),
-				primaryBtreeSearchOp, 0, printer, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, secondaryBtreeSearchOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), secondaryBtreeSearchOp, 0, primaryBtreeSearchOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), primaryBtreeSearchOp, 0, printer, 0);
 
-		spec.addRoot(printer);
-		runTest(spec);
-	}
+        spec.addRoot(printer);
+        runTest(spec);
+    }
 
-	@AfterClass
-	public static void cleanup() throws Exception {
-		File primary = new File(primaryFileName);
-		primary.deleteOnExit();
-		File secondary = new File(secondaryFileName);
-		secondary.deleteOnExit();
-	}
+    @AfterClass
+    public static void cleanup() throws Exception {
+        File primary = new File(primaryFileName);
+        primary.deleteOnExit();
+        File secondary = new File(secondaryFileName);
+        secondary.deleteOnExit();
+    }
 }
\ No newline at end of file
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
index abedf4a..fac141c 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -28,7 +28,7 @@
 import org.junit.Rule;
 import org.junit.rules.TemporaryFolder;
 
-import edu.uci.ics.hyracks.api.client.HyracksLocalConnection;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
@@ -61,7 +61,10 @@
     @BeforeClass
     public static void init() throws Exception {
         CCConfig ccConfig = new CCConfig();
-        ccConfig.port = 39001;
+        ccConfig.clientNetIpAddress = "127.0.0.1";
+        ccConfig.clientNetPort = 39000;
+        ccConfig.clusterNetIpAddress = "127.0.0.1";
+        ccConfig.clusterNetPort = 39001;
         ccConfig.profileDumpPeriod = 10000;
         File outDir = new File("target/ClusterController");
         outDir.mkdirs();
@@ -75,6 +78,7 @@
         NCConfig ncConfig1 = new NCConfig();
         ncConfig1.ccHost = "localhost";
         ncConfig1.ccPort = 39001;
+        ncConfig1.clusterNetIPAddress = "127.0.0.1";
         ncConfig1.dataIPAddress = "127.0.0.1";
         ncConfig1.nodeId = NC1_ID;
         nc1 = new NodeControllerService(ncConfig1);
@@ -83,12 +87,13 @@
         NCConfig ncConfig2 = new NCConfig();
         ncConfig2.ccHost = "localhost";
         ncConfig2.ccPort = 39001;
+        ncConfig2.clusterNetIPAddress = "127.0.0.1";
         ncConfig2.dataIPAddress = "127.0.0.1";
         ncConfig2.nodeId = NC2_ID;
         nc2 = new NodeControllerService(ncConfig2);
         nc2.start();
 
-        hcc = new HyracksLocalConnection(cc);
+        hcc = new HyracksConnection(ccConfig.clientNetIpAddress, ccConfig.clientNetPort);
         hcc.createApplication("test", null);
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Starting CC in " + ccRoot.getAbsolutePath());
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
index 042c919..1c5c014 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTests.java
@@ -27,8 +27,9 @@
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.hash.UTF8StringBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.FloatSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
@@ -59,61 +60,40 @@
 import edu.uci.ics.hyracks.dataflow.std.group.aggregators.IntSumFieldAggregatorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.aggregators.MinMaxStringFieldAggregatorFactory;
 import edu.uci.ics.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 
 /**
  *
  */
 public class AggregationTests extends AbstractIntegrationTest {
 
-    final IFileSplitProvider splitProvider = new ConstantFileSplitProvider(
-            new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
-             "data/tpch0.001/lineitem.tbl"))) });
+    final IFileSplitProvider splitProvider = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC2_ID,
+            new FileReference(new File("data/tpch0.001/lineitem.tbl"))) });
 
-    final RecordDescriptor desc = new RecordDescriptor(
-            new ISerializerDeserializer[] {
-                    UTF8StringSerializerDeserializer.INSTANCE,
-                    IntegerSerializerDeserializer.INSTANCE,
-                    IntegerSerializerDeserializer.INSTANCE,
-                    IntegerSerializerDeserializer.INSTANCE,
-                    IntegerSerializerDeserializer.INSTANCE,
-                    FloatSerializerDeserializer.INSTANCE,
-                    FloatSerializerDeserializer.INSTANCE,
-                    FloatSerializerDeserializer.INSTANCE,
-                    UTF8StringSerializerDeserializer.INSTANCE,
-                    UTF8StringSerializerDeserializer.INSTANCE,
-                    UTF8StringSerializerDeserializer.INSTANCE,
-                    UTF8StringSerializerDeserializer.INSTANCE,
-                    UTF8StringSerializerDeserializer.INSTANCE,
-                    UTF8StringSerializerDeserializer.INSTANCE,
-                    UTF8StringSerializerDeserializer.INSTANCE,
-                    UTF8StringSerializerDeserializer.INSTANCE });
+    final RecordDescriptor desc = new RecordDescriptor(new ISerializerDeserializer[] {
+            UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+            IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+            IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+            FloatSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
 
-    final ITupleParserFactory tupleParserFactory = new DelimitedDataTupleParserFactory(
-            new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
-                    IntegerParserFactory.INSTANCE,
-                    IntegerParserFactory.INSTANCE,
-                    IntegerParserFactory.INSTANCE,
-                    IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
-                    FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
-                    UTF8StringParserFactory.INSTANCE,
-                    UTF8StringParserFactory.INSTANCE,
-                    UTF8StringParserFactory.INSTANCE,
-                    UTF8StringParserFactory.INSTANCE,
-                    UTF8StringParserFactory.INSTANCE,
-                    UTF8StringParserFactory.INSTANCE,
-                    UTF8StringParserFactory.INSTANCE,
-                    UTF8StringParserFactory.INSTANCE, }, '|');
+    final ITupleParserFactory tupleParserFactory = new DelimitedDataTupleParserFactory(new IValueParserFactory[] {
+            UTF8StringParserFactory.INSTANCE, IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
+            IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
+            FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+            UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+            UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+            UTF8StringParserFactory.INSTANCE, }, '|');
 
-    private AbstractSingleActivityOperatorDescriptor getPrinter(
-            JobSpecification spec, String prefix) throws IOException {
+    private AbstractSingleActivityOperatorDescriptor getPrinter(JobSpecification spec, String prefix)
+            throws IOException {
 
-        AbstractSingleActivityOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(
-                spec, new ConstantFileSplitProvider(new FileSplit[] {
-                        new FileSplit(NC1_ID, createTempFile()
-                                .getAbsolutePath()),
-                        new FileSplit(NC2_ID, createTempFile()
-                                .getAbsolutePath()) }), "\t");
+        AbstractSingleActivityOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec,
+                new ConstantFileSplitProvider(new FileSplit[] {
+                        new FileSplit(NC1_ID, createTempFile().getAbsolutePath()),
+                        new FileSplit(NC2_ID, createTempFile().getAbsolutePath()) }), "\t");
 
         return printer;
     }
@@ -122,49 +102,38 @@
     public void singleKeySumInmemGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+                desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 0 };
         int tableSize = 8;
 
-        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
-                spec,
-                keyFields,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-                new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] {
-                                new IntSumFieldAggregatorFactory(1, true),
-                                new IntSumFieldAggregatorFactory(3, true) }),
+        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(spec, keyFields,
+                new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }),
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                        new IntSumFieldAggregatorFactory(1, true), new IntSumFieldAggregatorFactory(3, true) }),
                 outputRec, tableSize);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-                spec,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                "singleKeySumInmemGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeySumInmemGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -177,64 +146,34 @@
     public void singleKeySumPreClusterGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
-
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, NC2_ID);
-
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE });
-
-        int[] keyFields = new int[] { 0 };
-        
-        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(
-                spec,
-                4,
-                keyFields,
-                null,
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
                 desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
 
-        IConnectorDescriptor conn0 = new MToNPartitioningConnectorDescriptor(
-                spec,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-        
-        spec.connect(conn0, csvScanner, 0, sorter, 0);
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE });
 
-        PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
-                spec,
-                keyFields,
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-                new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] {
-                                new IntSumFieldAggregatorFactory(1, true),
-                                new IntSumFieldAggregatorFactory(3, true) }),
+        int[] keyFields = new int[] { 0 };
+
+        PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                        new IntSumFieldAggregatorFactory(1, true), new IntSumFieldAggregatorFactory(3, true) }),
                 outputRec);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-                spec,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-        spec.connect(conn1, sorter, 0, grouper, 0);
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
+        spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                "singleKeySumInmemGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeySumInmemGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -247,58 +186,41 @@
     public void singleKeySumExtGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+                desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 0 };
         int frameLimits = 4;
         int tableSize = 8;
 
-        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
-                spec,
-                keyFields,
-                frameLimits,
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-                new UTF8StringNormalizedKeyComputerFactory(),
-                new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] {
-                                new IntSumFieldAggregatorFactory(1, false),
-                                new IntSumFieldAggregatorFactory(3, false) }),
-                new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] {
-                                new IntSumFieldAggregatorFactory(1, false),
-                                new IntSumFieldAggregatorFactory(2, false) }),
-                outputRec,
-                new HashSpillableTableFactory(
-                        new FieldHashPartitionComputerFactory(
-                                keyFields,
-                                new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-                        tableSize), true);
+        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                new UTF8StringNormalizedKeyComputerFactory(), new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
+                                new IntSumFieldAggregatorFactory(3, false) }), new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
+                                new IntSumFieldAggregatorFactory(2, false) }), outputRec,
+                new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }), tableSize), true);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-                spec,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                "singleKeySumExtGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeySumExtGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -311,51 +233,38 @@
     public void singleKeyAvgInmemGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+                desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        FloatSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 0 };
         int tableSize = 8;
 
-        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
-                spec,
-                keyFields,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-                new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] {
-                                new IntSumFieldAggregatorFactory(1, true),
-                                new CountFieldAggregatorFactory(true),
-                                new AvgFieldGroupAggregatorFactory(1, true) }),
-                outputRec, tableSize);
+        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(spec, keyFields,
+                new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }),
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                        new IntSumFieldAggregatorFactory(1, true), new CountFieldAggregatorFactory(true),
+                        new AvgFieldGroupAggregatorFactory(1, true) }), outputRec, tableSize);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-                spec,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                "singleKeyAvgInmemGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeyAvgInmemGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -368,66 +277,34 @@
     public void singleKeyAvgPreClusterGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+                desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        FloatSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 0 };
 
-        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(
-                spec,
-                4,
-                keyFields,
-                null,
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-                desc);
+        PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                        new IntSumFieldAggregatorFactory(1, true), new CountFieldAggregatorFactory(true),
+                        new AvgFieldGroupAggregatorFactory(1, true) }), outputRec);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn0 = new MToNPartitioningConnectorDescriptor(
-                spec,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-        
-        spec.connect(conn0, csvScanner, 0, sorter, 0);
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
+        spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
-                spec,
-                keyFields,
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-                new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] {
-                                new IntSumFieldAggregatorFactory(1, true),
-                                new CountFieldAggregatorFactory(true),
-                                new AvgFieldGroupAggregatorFactory(1, true) }),
-                outputRec);
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeyAvgInmemGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                NC2_ID, NC1_ID);
-
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-                spec,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-        spec.connect(conn1, sorter, 0, grouper, 0);
-
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                "singleKeyAvgInmemGroupTest");
-
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -440,61 +317,43 @@
     public void singleKeyAvgExtGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+                desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        FloatSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 0 };
         int frameLimits = 4;
         int tableSize = 8;
 
-        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
-                spec,
-                keyFields,
-                frameLimits,
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
                 new UTF8StringNormalizedKeyComputerFactory(),
-                new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] {
-                                new IntSumFieldAggregatorFactory(1, false),
-                                new CountFieldAggregatorFactory(false),
-                                new AvgFieldGroupAggregatorFactory(1, false) }),
-                new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] {
-                                new IntSumFieldAggregatorFactory(1, false),
+                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                        new IntSumFieldAggregatorFactory(1, false), new CountFieldAggregatorFactory(false),
+                        new AvgFieldGroupAggregatorFactory(1, false) }), new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
                                 new IntSumFieldAggregatorFactory(2, false),
-                                new AvgFieldMergeAggregatorFactory(3, false) }),
-                outputRec,
-                new HashSpillableTableFactory(
-                        new FieldHashPartitionComputerFactory(
-                                keyFields,
-                                new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-                        tableSize), true);
+                                new AvgFieldMergeAggregatorFactory(3, false) }), outputRec,
+                new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }), tableSize), true);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-                spec,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                "singleKeyAvgExtGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeyAvgExtGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -507,49 +366,38 @@
     public void singleKeyMinMaxStringInmemGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+                desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 0 };
         int tableSize = 8;
 
-        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
-                spec,
-                keyFields,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-                new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] {
-                                new IntSumFieldAggregatorFactory(1, true),
-                                new MinMaxStringFieldAggregatorFactory(15,
-                                        true, false) }), outputRec, tableSize);
+        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(spec, keyFields,
+                new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }),
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                        new IntSumFieldAggregatorFactory(1, true),
+                        new MinMaxStringFieldAggregatorFactory(15, true, false) }), outputRec, tableSize);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-                spec,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                "singleKeyAvgInmemGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeyAvgInmemGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -562,64 +410,34 @@
     public void singleKeyMinMaxStringPreClusterGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
-
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, NC2_ID);
-
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE });
-
-        int[] keyFields = new int[] { 0 };
-        
-        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(
-                spec,
-                4,
-                keyFields,
-                null,
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
                 desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
 
-        IConnectorDescriptor conn0 = new MToNPartitioningConnectorDescriptor(
-                spec,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-        
-        spec.connect(conn0, csvScanner, 0, sorter, 0);
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
 
-        PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
-                spec,
-                keyFields,
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-                new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] {
-                                new IntSumFieldAggregatorFactory(1, true),
-                                new MinMaxStringFieldAggregatorFactory(15,
-                                        true, false) }), outputRec);
+        int[] keyFields = new int[] { 0 };
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                NC2_ID, NC1_ID);
+        PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                        new IntSumFieldAggregatorFactory(1, true),
+                        new MinMaxStringFieldAggregatorFactory(15, true, false) }), outputRec);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-                spec,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-        spec.connect(conn1, sorter, 0, grouper, 0);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                "singleKeyAvgInmemGroupTest");
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
+        spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                NC2_ID, NC1_ID);
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeyAvgInmemGroupTest");
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -632,60 +450,42 @@
     public void singleKeyMinMaxStringExtGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+                desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 0 };
         int frameLimits = 4;
         int tableSize = 8;
 
-        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
-                spec,
-                keyFields,
-                frameLimits,
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-                new UTF8StringNormalizedKeyComputerFactory(),
-                new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] {
-                                new IntSumFieldAggregatorFactory(1, false),
-                                new MinMaxStringFieldAggregatorFactory(15,
-                                        true, true) }),
-                new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] {
-                                new IntSumFieldAggregatorFactory(1, false),
-                                new MinMaxStringFieldAggregatorFactory(2, true,
-                                        true) }),
-                outputRec,
-                new HashSpillableTableFactory(
-                        new FieldHashPartitionComputerFactory(
-                                keyFields,
-                                new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-                        tableSize), true);
+        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                new UTF8StringNormalizedKeyComputerFactory(), new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
+                                new MinMaxStringFieldAggregatorFactory(15, true, true) }),
+                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                        new IntSumFieldAggregatorFactory(1, false),
+                        new MinMaxStringFieldAggregatorFactory(2, true, true) }), outputRec,
+                new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }), tableSize), true);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-                spec,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                "singleKeyAvgExtGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "singleKeyAvgExtGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -698,53 +498,39 @@
     public void multiKeySumInmemGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+                desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 8, 0 };
         int tableSize = 8;
 
-        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
-                spec,
-                keyFields,
-                new FieldHashPartitionComputerFactory(keyFields,
-                        new IBinaryHashFunctionFactory[] {
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE,
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-                new IBinaryComparatorFactory[] {
-                        UTF8StringBinaryComparatorFactory.INSTANCE,
-                        UTF8StringBinaryComparatorFactory.INSTANCE },
-                new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] {
-                                new IntSumFieldAggregatorFactory(1, true),
-                                new IntSumFieldAggregatorFactory(3, true) }),
+        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(spec, keyFields,
+                new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }),
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                        new IntSumFieldAggregatorFactory(1, true), new IntSumFieldAggregatorFactory(3, true) }),
                 outputRec, tableSize);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-                spec, new FieldHashPartitionComputerFactory(keyFields,
-                        new IBinaryHashFunctionFactory[] {
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE,
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                "multiKeySumInmemGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeySumInmemGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -757,67 +543,35 @@
     public void multiKeySumPreClusterGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
-
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, NC2_ID);
-
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE });
-
-        int[] keyFields = new int[] { 8, 0 };
-        
-        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(
-                spec,
-                4,
-                keyFields,
-                null,
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE,
-                        UTF8StringBinaryComparatorFactory.INSTANCE},
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
                 desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
 
-        IConnectorDescriptor conn0 = new MToNPartitioningConnectorDescriptor(
-                spec,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE,
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE}));
-        
-        spec.connect(conn0, csvScanner, 0, sorter, 0);
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
 
-        PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
-                spec, keyFields, new IBinaryComparatorFactory[] {
-                        UTF8StringBinaryComparatorFactory.INSTANCE,
-                        UTF8StringBinaryComparatorFactory.INSTANCE },
-                new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] {
-                                new IntSumFieldAggregatorFactory(1, true),
-                                new IntSumFieldAggregatorFactory(3, true) }),
+        int[] keyFields = new int[] { 8, 0 };
+
+        PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                        new IntSumFieldAggregatorFactory(1, true), new IntSumFieldAggregatorFactory(3, true) }),
                 outputRec);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-                spec, new FieldHashPartitionComputerFactory(keyFields,
-                        new IBinaryHashFunctionFactory[] {
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE,
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-        spec.connect(conn1, sorter, 0, grouper, 0);
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
+        spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                "multiKeySumInmemGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeySumInmemGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -830,64 +584,43 @@
     public void multiKeySumExtGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+                desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 8, 0 };
-        int[] keys = new int[] { 0, 1 };
         int frameLimits = 4;
         int tableSize = 8;
 
-        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
-                spec,
-                keyFields,
-                frameLimits,
-                new IBinaryComparatorFactory[] {
-                        UTF8StringBinaryComparatorFactory.INSTANCE,
-                        UTF8StringBinaryComparatorFactory.INSTANCE },
-                new UTF8StringNormalizedKeyComputerFactory(),
-                new MultiFieldsAggregatorFactory(keyFields,
-                        new IFieldAggregateDescriptorFactory[] {
-                                new IntSumFieldAggregatorFactory(1, false),
-                                new IntSumFieldAggregatorFactory(3, false) }),
-                new MultiFieldsAggregatorFactory(keys,
-                        new IFieldAggregateDescriptorFactory[] {
-                                new IntSumFieldAggregatorFactory(2, false),
-                                new IntSumFieldAggregatorFactory(3, false) }),
-                outputRec,
-                new HashSpillableTableFactory(
-                        new FieldHashPartitionComputerFactory(
-                                keyFields,
-                                new IBinaryHashFunctionFactory[] {
-                                        UTF8StringBinaryHashFunctionFactory.INSTANCE,
-                                        UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-                        tableSize), true);
-
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                NC2_ID, NC1_ID);
-
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-                spec, new FieldHashPartitionComputerFactory(keyFields,
+        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                new UTF8StringNormalizedKeyComputerFactory(), new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
+                                new IntSumFieldAggregatorFactory(3, false) }), new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(2, false),
+                                new IntSumFieldAggregatorFactory(3, false) }), outputRec,
+                new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(keyFields,
                         new IBinaryHashFunctionFactory[] {
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE,
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+                                PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+                                PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }), tableSize), true);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                "multiKeySumExtGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeySumExtGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -900,55 +633,40 @@
     public void multiKeyAvgInmemGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+                desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        FloatSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                FloatSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 8, 0 };
         int tableSize = 8;
 
-        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
-                spec,
-                keyFields,
-                new FieldHashPartitionComputerFactory(keyFields,
-                        new IBinaryHashFunctionFactory[] {
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE,
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-                new IBinaryComparatorFactory[] {
-                        UTF8StringBinaryComparatorFactory.INSTANCE,
-                        UTF8StringBinaryComparatorFactory.INSTANCE },
-                new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] {
-                                new IntSumFieldAggregatorFactory(1, true),
-                                new CountFieldAggregatorFactory(true),
-                                new AvgFieldGroupAggregatorFactory(1, true) }),
-                outputRec, tableSize);
+        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(spec, keyFields,
+                new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }),
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                        new IntSumFieldAggregatorFactory(1, true), new CountFieldAggregatorFactory(true),
+                        new AvgFieldGroupAggregatorFactory(1, true) }), outputRec, tableSize);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-                spec, new FieldHashPartitionComputerFactory(keyFields,
-                        new IBinaryHashFunctionFactory[] {
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE,
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                "multiKeyAvgInmemGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeyAvgInmemGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -961,69 +679,36 @@
     public void multiKeyAvgPreClusterGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
-
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, NC2_ID);
-
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        FloatSerializerDeserializer.INSTANCE });
-
-        int[] keyFields = new int[] { 8, 0 };
-        
-        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(
-                spec,
-                4,
-                keyFields,
-                null,
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE,
-                        UTF8StringBinaryComparatorFactory.INSTANCE},
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
                 desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
 
-        IConnectorDescriptor conn0 = new MToNPartitioningConnectorDescriptor(
-                spec,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE,
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE}));
-        
-        spec.connect(conn0, csvScanner, 0, sorter, 0);
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                FloatSerializerDeserializer.INSTANCE });
 
-        PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
-                spec, keyFields, new IBinaryComparatorFactory[] {
-                        UTF8StringBinaryComparatorFactory.INSTANCE,
-                        UTF8StringBinaryComparatorFactory.INSTANCE },
-                new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] {
-                                new IntSumFieldAggregatorFactory(1, true),
-                                new CountFieldAggregatorFactory(true),
-                                new AvgFieldGroupAggregatorFactory(1, true) }),
-                outputRec);
+        int[] keyFields = new int[] { 8, 0 };
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                NC2_ID, NC1_ID);
+        PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                        new IntSumFieldAggregatorFactory(1, true), new CountFieldAggregatorFactory(true),
+                        new AvgFieldGroupAggregatorFactory(1, true) }), outputRec);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-                spec, new FieldHashPartitionComputerFactory(keyFields,
-                        new IBinaryHashFunctionFactory[] {
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE,
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-        spec.connect(conn1, sorter, 0, grouper, 0);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                "multiKeyAvgInmemGroupTest");
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
+        spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                NC2_ID, NC1_ID);
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeyAvgInmemGroupTest");
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -1036,66 +721,46 @@
     public void multiKeyAvgExtGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+                desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        FloatSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+                FloatSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 8, 0 };
         int frameLimits = 4;
         int tableSize = 8;
 
-        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
-                spec,
-                keyFields,
-                frameLimits,
-                new IBinaryComparatorFactory[] {
-                        UTF8StringBinaryComparatorFactory.INSTANCE,
-                        UTF8StringBinaryComparatorFactory.INSTANCE },
+        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
                 new UTF8StringNormalizedKeyComputerFactory(),
-                new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] {
-                                new IntSumFieldAggregatorFactory(1, false),
-                                new CountFieldAggregatorFactory(false),
-                                new AvgFieldGroupAggregatorFactory(1, false) }),
-                new MultiFieldsAggregatorFactory(new int[] { 0, 1 },
-                        new IFieldAggregateDescriptorFactory[] {
-                                new IntSumFieldAggregatorFactory(2, false),
+                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                        new IntSumFieldAggregatorFactory(1, false), new CountFieldAggregatorFactory(false),
+                        new AvgFieldGroupAggregatorFactory(1, false) }), new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(2, false),
                                 new IntSumFieldAggregatorFactory(3, false),
-                                new AvgFieldMergeAggregatorFactory(4, false) }),
-                outputRec,
-                new HashSpillableTableFactory(
-                        new FieldHashPartitionComputerFactory(
-                                keyFields,
-                                new IBinaryHashFunctionFactory[] {
-                                        UTF8StringBinaryHashFunctionFactory.INSTANCE,
-                                        UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-                        tableSize), true);
-
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                NC2_ID, NC1_ID);
-
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-                spec, new FieldHashPartitionComputerFactory(keyFields,
+                                new AvgFieldMergeAggregatorFactory(4, false) }), outputRec,
+                new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(keyFields,
                         new IBinaryHashFunctionFactory[] {
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE,
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+                                PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+                                PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }), tableSize), true);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                "multiKeyAvgExtGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeyAvgExtGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -1108,53 +773,39 @@
     public void multiKeyMinMaxStringInmemGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+                desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 8, 0 };
         int tableSize = 8;
 
-        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
-                spec,
-                keyFields,
-                new FieldHashPartitionComputerFactory(keyFields,
-                        new IBinaryHashFunctionFactory[] {
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE,
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-                new IBinaryComparatorFactory[] {
-                        UTF8StringBinaryComparatorFactory.INSTANCE,
-                        UTF8StringBinaryComparatorFactory.INSTANCE },
-                new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] {
-                                new IntSumFieldAggregatorFactory(1, true),
-                                new MinMaxStringFieldAggregatorFactory(15,
-                                        true, false) }), outputRec, tableSize);
+        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(spec, keyFields,
+                new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }),
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                        new IntSumFieldAggregatorFactory(1, true),
+                        new MinMaxStringFieldAggregatorFactory(15, true, false) }), outputRec, tableSize);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-                spec, new FieldHashPartitionComputerFactory(keyFields,
-                        new IBinaryHashFunctionFactory[] {
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE,
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                "multiKeyMinMaxStringInmemGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeyMinMaxStringInmemGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -1167,67 +818,35 @@
     public void multiKeyMinMaxStringPreClusterGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
-
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, NC2_ID);
-
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE });
-
-        int[] keyFields = new int[] { 8, 0 };
-        
-        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(
-                spec,
-                4,
-                keyFields,
-                null,
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE,
-                        UTF8StringBinaryComparatorFactory.INSTANCE},
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
                 desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
 
-        IConnectorDescriptor conn0 = new MToNPartitioningConnectorDescriptor(
-                spec,
-                new FieldHashPartitionComputerFactory(
-                        keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE,
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE}));
-        
-        spec.connect(conn0, csvScanner, 0, sorter, 0);
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
 
-        PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(
-                spec, keyFields, new IBinaryComparatorFactory[] {
-                        UTF8StringBinaryComparatorFactory.INSTANCE,
-                        UTF8StringBinaryComparatorFactory.INSTANCE },
-                new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] {
-                                new IntSumFieldAggregatorFactory(1, true),
-                                new MinMaxStringFieldAggregatorFactory(15,
-                                        true, false) }), outputRec);
+        int[] keyFields = new int[] { 8, 0 };
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                NC2_ID, NC1_ID);
+        PreclusteredGroupOperatorDescriptor grouper = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                        new IntSumFieldAggregatorFactory(1, true),
+                        new MinMaxStringFieldAggregatorFactory(15, true, false) }), outputRec);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-                spec, new FieldHashPartitionComputerFactory(keyFields,
-                        new IBinaryHashFunctionFactory[] {
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE,
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-        spec.connect(conn1, sorter, 0, grouper, 0);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                "multiKeyMinMaxStringPreClusterGroupTest");
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
+        spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                NC2_ID, NC1_ID);
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeyMinMaxStringPreClusterGroupTest");
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -1240,65 +859,44 @@
     public void multiKeyMinMaxStringExtGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
-                spec, splitProvider, tupleParserFactory, desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+                desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
-                csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        IntegerSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 8, 0 };
         int frameLimits = 4;
         int tableSize = 8;
 
-        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
-                spec,
-                keyFields,
-                frameLimits,
-                new IBinaryComparatorFactory[] {
-                        UTF8StringBinaryComparatorFactory.INSTANCE,
-                        UTF8StringBinaryComparatorFactory.INSTANCE },
-                new UTF8StringNormalizedKeyComputerFactory(),
-                new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] {
-                                new IntSumFieldAggregatorFactory(1, false),
-                                new MinMaxStringFieldAggregatorFactory(15,
-                                        true, true) }),
-                new MultiFieldsAggregatorFactory(new int[] { 0, 1 },
-                        new IFieldAggregateDescriptorFactory[] {
-                                new IntSumFieldAggregatorFactory(2, false),
-                                new MinMaxStringFieldAggregatorFactory(3, true,
-                                        true) }),
-                outputRec,
-                new HashSpillableTableFactory(
-                        new FieldHashPartitionComputerFactory(
-                                keyFields,
-                                new IBinaryHashFunctionFactory[] {
-                                        UTF8StringBinaryHashFunctionFactory.INSTANCE,
-                                        UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-                        tableSize), true);
-
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
-                NC2_ID, NC1_ID);
-
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
-                spec, new FieldHashPartitionComputerFactory(keyFields,
+        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                new UTF8StringNormalizedKeyComputerFactory(), new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
+                                new MinMaxStringFieldAggregatorFactory(15, true, true) }),
+                new MultiFieldsAggregatorFactory(new int[] { 0, 1 }, new IFieldAggregateDescriptorFactory[] {
+                        new IntSumFieldAggregatorFactory(2, false),
+                        new MinMaxStringFieldAggregatorFactory(3, true, true) }), outputRec,
+                new HashSpillableTableFactory(new FieldHashPartitionComputerFactory(keyFields,
                         new IBinaryHashFunctionFactory[] {
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE,
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+                                PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+                                PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }), tableSize), true);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
-                "multiKeyMinMaxStringExtGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "multiKeyMinMaxStringExtGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
-                NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java
index 1c96755..ce72ec5 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java
@@ -27,9 +27,10 @@
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.IntegerBinaryComparatorFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.hash.UTF8StringBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
@@ -69,7 +70,8 @@
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC1_ID);
 
         InMemorySortOperatorDescriptor sorter = new InMemorySortOperatorDescriptor(spec, new int[] { 0 },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, desc);
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                desc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID);
 
         RecordDescriptor desc2 = new RecordDescriptor(new ISerializerDeserializer[] {
@@ -77,19 +79,19 @@
         PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(
                 spec,
                 new int[] { 0 },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
                 new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
                 desc2);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, group, NC1_ID);
 
         InMemorySortOperatorDescriptor sorter2 = new InMemorySortOperatorDescriptor(spec, new int[] { 1 },
-                new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, desc2);
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) }, desc2);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter2, NC1_ID);
 
         RecordDescriptor desc3 = new RecordDescriptor(new ISerializerDeserializer[] {
                 IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
         PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 1 },
-                new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, new MultiFieldsAggregatorFactory(
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) }, new MultiFieldsAggregatorFactory(
                         new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }), desc3);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, group2, NC1_ID);
 
@@ -100,7 +102,8 @@
 
         IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 0 },
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn1, csvScanner, 0, sorter, 0);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
@@ -108,7 +111,8 @@
 
         IConnectorDescriptor conn3 = new MToNPartitioningConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 1 },
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn3, group, 0, sorter2, 0);
 
         IConnectorDescriptor conn4 = new OneToOneConnectorDescriptor(spec);
@@ -138,7 +142,8 @@
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC1_ID);
 
         InMemorySortOperatorDescriptor sorter = new InMemorySortOperatorDescriptor(spec, new int[] { 0 },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, desc);
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                desc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID, NC1_ID, NC2_ID);
 
         RecordDescriptor desc2 = new RecordDescriptor(new ISerializerDeserializer[] {
@@ -146,19 +151,19 @@
         PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(
                 spec,
                 new int[] { 0 },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
                 new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
                 desc2);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, group, NC1_ID, NC2_ID, NC1_ID, NC2_ID);
 
         InMemorySortOperatorDescriptor sorter2 = new InMemorySortOperatorDescriptor(spec, new int[] { 1 },
-                new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, desc2);
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) }, desc2);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter2, NC1_ID, NC2_ID);
 
         RecordDescriptor desc3 = new RecordDescriptor(new ISerializerDeserializer[] {
                 IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
         PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 1 },
-                new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, new MultiFieldsAggregatorFactory(
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) }, new MultiFieldsAggregatorFactory(
                         new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }), desc3);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, group2, NC1_ID, NC2_ID);
 
@@ -169,7 +174,8 @@
 
         IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 0 },
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn1, csvScanner, 0, sorter, 0);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
@@ -177,7 +183,8 @@
 
         IConnectorDescriptor conn3 = new MToNPartitioningConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 1 },
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn3, group, 0, sorter2, 0);
 
         IConnectorDescriptor conn4 = new OneToOneConnectorDescriptor(spec);
@@ -207,7 +214,8 @@
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC1_ID);
 
         ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, 3, new int[] { 0 },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, desc);
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                desc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID, NC1_ID, NC2_ID);
 
         RecordDescriptor desc2 = new RecordDescriptor(new ISerializerDeserializer[] {
@@ -215,19 +223,19 @@
         PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(
                 spec,
                 new int[] { 0 },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
                 new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
                 desc2);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, group, NC1_ID, NC2_ID, NC1_ID, NC2_ID);
 
         InMemorySortOperatorDescriptor sorter2 = new InMemorySortOperatorDescriptor(spec, new int[] { 1 },
-                new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, desc2);
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) }, desc2);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter2, NC1_ID, NC2_ID);
 
         RecordDescriptor desc3 = new RecordDescriptor(new ISerializerDeserializer[] {
                 IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
         PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 1 },
-                new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, new MultiFieldsAggregatorFactory(
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) }, new MultiFieldsAggregatorFactory(
                         new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }), desc3);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, group2, NC1_ID, NC2_ID);
 
@@ -238,7 +246,8 @@
 
         IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 0 },
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn1, csvScanner, 0, sorter, 0);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
@@ -246,7 +255,8 @@
 
         IConnectorDescriptor conn3 = new MToNPartitioningConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 1 },
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn3, group, 0, sorter2, 0);
 
         IConnectorDescriptor conn4 = new OneToOneConnectorDescriptor(spec);
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/ScanPrintTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/ScanPrintTest.java
index 55a0c1c..9355110 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/ScanPrintTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/ScanPrintTest.java
@@ -26,8 +26,9 @@
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.data.hash.IntegerBinaryHashFunctionFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.hash.UTF8StringBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
@@ -104,7 +105,8 @@
 
         IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 0 },
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
         spec.connect(conn1, ordScanner, 0, printer, 0);
 
         spec.addRoot(printer);
@@ -140,7 +142,8 @@
 
         IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 0 },
-                        new IBinaryHashFunctionFactory[] { IntegerBinaryHashFunctionFactory.INSTANCE }));
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(IntegerPointable.FACTORY) }));
         spec.connect(conn1, ordScanner, 0, printer, 0);
 
         spec.addRoot(printer);
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
index 57888c8..2c3fddf 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
@@ -26,8 +26,9 @@
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.hash.UTF8StringBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
@@ -68,7 +69,8 @@
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
 
         InMemorySortOperatorDescriptor sorter = new InMemorySortOperatorDescriptor(spec, new int[] { 1 },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, ordersDesc);
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
@@ -78,10 +80,12 @@
 
         spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
 
-        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
-                new int[] { 1 }, new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-                new int[] { 1 }, new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }),
-                sorter, 0, printer, 0);
+        spec.connect(
+                new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
+                        new int[] { 1 }, new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }), new int[] { 1 },
+                        new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+                                .of(UTF8StringPointable.FACTORY) }), sorter, 0, printer, 0);
 
         runTest(spec);
     }
@@ -110,8 +114,8 @@
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
 
         ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, 4, new int[] { 1, 0 },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE,
-                        UTF8StringBinaryComparatorFactory.INSTANCE }, ordersDesc);
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
@@ -121,11 +125,15 @@
 
         spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
 
-        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
-                new int[] { 1, 0 }, new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE,
-                        UTF8StringBinaryHashFunctionFactory.INSTANCE }), new int[] { 1, 0 },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE,
-                        UTF8StringBinaryComparatorFactory.INSTANCE }), sorter, 0, printer, 0);
+        spec.connect(
+                new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(new int[] {
+                        1, 0 }, new IBinaryHashFunctionFactory[] {
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }), new int[] { 1, 0 },
+                        new IBinaryComparatorFactory[] {
+                                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+                                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }), sorter, 0,
+                printer, 0);
 
         runTest(spec);
     }
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
index 2fbf6d2..61d4696 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
@@ -32,8 +32,9 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.hash.UTF8StringBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
@@ -138,9 +139,13 @@
                         UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
 
-        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 },
-                new int[] { 0 }, new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, 128);
+        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(
+                spec,
+                new int[] { 1 },
+                new int[] { 0 },
+                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                custOrderJoinDesc, 128);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
@@ -211,10 +216,17 @@
                         UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
 
-        GraceHashJoinOperatorDescriptor join = new GraceHashJoinOperatorDescriptor(spec, 4, 10, 200, 1.2,
-                new int[] { 1 }, new int[] { 0 },
-                new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc);
+        GraceHashJoinOperatorDescriptor join = new GraceHashJoinOperatorDescriptor(
+                spec,
+                4,
+                10,
+                200,
+                1.2,
+                new int[] { 1 },
+                new int[] { 0 },
+                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                custOrderJoinDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
@@ -285,10 +297,17 @@
                         UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
 
-        HybridHashJoinOperatorDescriptor join = new HybridHashJoinOperatorDescriptor(spec, 5, 20, 200, 1.2,
-                new int[] { 1 }, new int[] { 0 },
-                new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc);
+        HybridHashJoinOperatorDescriptor join = new HybridHashJoinOperatorDescriptor(
+                spec,
+                5,
+                20,
+                200,
+                1.2,
+                new int[] { 1 },
+                new int[] { 0 },
+                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                custOrderJoinDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
@@ -359,15 +378,18 @@
                         UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
 
-        INullWriterFactory[] nullWriterFactories = new INullWriterFactory[ordersDesc.getFields().length];
+        INullWriterFactory[] nullWriterFactories = new INullWriterFactory[ordersDesc.getFieldCount()];
         for (int j = 0; j < nullWriterFactories.length; j++) {
             nullWriterFactories[j] = NoopNullWriterFactory.INSTANCE;
         }
 
-        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 0 },
-                new int[] { 1 }, new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, true,
-                nullWriterFactories, 128);
+        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(
+                spec,
+                new int[] { 0 },
+                new int[] { 1 },
+                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                custOrderJoinDesc, true, nullWriterFactories, 128);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
@@ -438,16 +460,22 @@
                         UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
 
-        INullWriterFactory[] nullWriterFactories = new INullWriterFactory[ordersDesc.getFields().length];
+        INullWriterFactory[] nullWriterFactories = new INullWriterFactory[ordersDesc.getFieldCount()];
         for (int j = 0; j < nullWriterFactories.length; j++) {
             nullWriterFactories[j] = NoopNullWriterFactory.INSTANCE;
         }
 
-        GraceHashJoinOperatorDescriptor join = new GraceHashJoinOperatorDescriptor(spec, 5, 20, 200, 1.2,
-                new int[] { 0 }, new int[] { 1 },
-                new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, true,
-                nullWriterFactories);
+        GraceHashJoinOperatorDescriptor join = new GraceHashJoinOperatorDescriptor(
+                spec,
+                5,
+                20,
+                200,
+                1.2,
+                new int[] { 0 },
+                new int[] { 1 },
+                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                custOrderJoinDesc, true, nullWriterFactories);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
@@ -518,16 +546,22 @@
                         UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
 
-        INullWriterFactory[] nullWriterFactories = new INullWriterFactory[ordersDesc.getFields().length];
+        INullWriterFactory[] nullWriterFactories = new INullWriterFactory[ordersDesc.getFieldCount()];
         for (int j = 0; j < nullWriterFactories.length; j++) {
             nullWriterFactories[j] = NoopNullWriterFactory.INSTANCE;
         }
 
-        HybridHashJoinOperatorDescriptor join = new HybridHashJoinOperatorDescriptor(spec, 5, 20, 200, 1.2,
-                new int[] { 0 }, new int[] { 1 },
-                new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, true,
-                nullWriterFactories);
+        HybridHashJoinOperatorDescriptor join = new HybridHashJoinOperatorDescriptor(
+                spec,
+                5,
+                20,
+                200,
+                1.2,
+                new int[] { 0 },
+                new int[] { 1 },
+                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                custOrderJoinDesc, true, nullWriterFactories);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
@@ -600,9 +634,13 @@
                         UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID);
 
-        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 },
-                new int[] { 0 }, new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, 128);
+        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(
+                spec,
+                new int[] { 1 },
+                new int[] { 0 },
+                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                custOrderJoinDesc, 128);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
@@ -612,12 +650,14 @@
 
         IConnectorDescriptor ordJoinConn = new MToNPartitioningConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 1 },
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
         spec.connect(ordJoinConn, ordScanner, 0, join, 0);
 
         IConnectorDescriptor custJoinConn = new MToNPartitioningConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 0 },
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
         spec.connect(custJoinConn, custScanner, 0, join, 1);
 
         IConnectorDescriptor joinPrinterConn = new MToNReplicatingConnectorDescriptor(spec);
@@ -679,10 +719,17 @@
                         UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID);
 
-        GraceHashJoinOperatorDescriptor join = new GraceHashJoinOperatorDescriptor(spec, 3, 20, 100, 1.2,
-                new int[] { 1 }, new int[] { 0 },
-                new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc);
+        GraceHashJoinOperatorDescriptor join = new GraceHashJoinOperatorDescriptor(
+                spec,
+                3,
+                20,
+                100,
+                1.2,
+                new int[] { 1 },
+                new int[] { 0 },
+                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                custOrderJoinDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
@@ -692,12 +739,14 @@
 
         IConnectorDescriptor ordJoinConn = new MToNPartitioningConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 1 },
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
         spec.connect(ordJoinConn, ordScanner, 0, join, 0);
 
         IConnectorDescriptor custJoinConn = new MToNPartitioningConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 0 },
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
         spec.connect(custJoinConn, custScanner, 0, join, 1);
 
         IConnectorDescriptor joinPrinterConn = new MToNReplicatingConnectorDescriptor(spec);
@@ -759,10 +808,17 @@
                         UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID);
 
-        HybridHashJoinOperatorDescriptor join = new HybridHashJoinOperatorDescriptor(spec, 3, 20, 100, 1.2,
-                new int[] { 1 }, new int[] { 0 },
-                new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc);
+        HybridHashJoinOperatorDescriptor join = new HybridHashJoinOperatorDescriptor(
+                spec,
+                3,
+                20,
+                100,
+                1.2,
+                new int[] { 1 },
+                new int[] { 0 },
+                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                custOrderJoinDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
@@ -772,12 +828,14 @@
 
         IConnectorDescriptor ordJoinConn = new MToNPartitioningConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 1 },
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
         spec.connect(ordJoinConn, ordScanner, 0, join, 0);
 
         IConnectorDescriptor custJoinConn = new MToNPartitioningConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 0 },
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
         spec.connect(custJoinConn, custScanner, 0, join, 1);
 
         IConnectorDescriptor joinPrinterConn = new MToNReplicatingConnectorDescriptor(spec);
@@ -839,9 +897,13 @@
                         UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID);
 
-        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 },
-                new int[] { 0 }, new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, 128);
+        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(
+                spec,
+                new int[] { 1 },
+                new int[] { 0 },
+                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                custOrderJoinDesc, 128);
         PartitionConstraintHelper.addPartitionCountConstraint(spec, join, 2);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
@@ -851,12 +913,14 @@
 
         IConnectorDescriptor ordJoinConn = new MToNPartitioningConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 1 },
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
         spec.connect(ordJoinConn, ordScanner, 0, join, 0);
 
         IConnectorDescriptor custJoinConn = new MToNPartitioningConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 0 },
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
         spec.connect(custJoinConn, custScanner, 0, join, 1);
 
         IConnectorDescriptor joinPrinterConn = new MToNReplicatingConnectorDescriptor(spec);
@@ -924,9 +988,13 @@
         MaterializingOperatorDescriptor custMat = new MaterializingOperatorDescriptor(spec, custDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custMat, NC1_ID, NC2_ID);
 
-        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 },
-                new int[] { 0 }, new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, 128);
+        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(
+                spec,
+                new int[] { 1 },
+                new int[] { 0 },
+                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                custOrderJoinDesc, 128);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
@@ -936,12 +1004,14 @@
 
         IConnectorDescriptor ordPartConn = new MToNPartitioningConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 1 },
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
         spec.connect(ordPartConn, ordScanner, 0, ordMat, 0);
 
         IConnectorDescriptor custPartConn = new MToNPartitioningConnectorDescriptor(spec,
                 new FieldHashPartitionComputerFactory(new int[] { 0 },
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
         spec.connect(custPartConn, custScanner, 0, custMat, 0);
 
         IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
index 135f1fe..6411390 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
@@ -30,7 +30,8 @@
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
@@ -163,7 +164,7 @@
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
 
         NestedLoopJoinOperatorDescriptor join = new NestedLoopJoinOperatorDescriptor(spec, new JoinComparatorFactory(
-                UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), custOrderJoinDesc, 4);
+                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), custOrderJoinDesc, 4);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
@@ -237,7 +238,7 @@
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID);
 
         NestedLoopJoinOperatorDescriptor join = new NestedLoopJoinOperatorDescriptor(spec, new JoinComparatorFactory(
-                UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), custOrderJoinDesc, 5);
+                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), custOrderJoinDesc, 5);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
@@ -311,7 +312,7 @@
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID);
 
         NestedLoopJoinOperatorDescriptor join = new NestedLoopJoinOperatorDescriptor(spec, new JoinComparatorFactory(
-                UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), custOrderJoinDesc, 6);
+                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), custOrderJoinDesc, 6);
         PartitionConstraintHelper.addPartitionCountConstraint(spec, join, 2);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/invertedindex/WordInvertedIndexTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/invertedindex/WordInvertedIndexTest.java
index 6440db3..298c146 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/invertedindex/WordInvertedIndexTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/invertedindex/WordInvertedIndexTest.java
@@ -27,14 +27,15 @@
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITypeTrait;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.IntegerBinaryComparatorFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
@@ -99,7 +100,7 @@
 
     // Primary BTree index.
     private int primaryFieldCount = 2;
-    private ITypeTrait[] primaryTypeTraits = new ITypeTrait[primaryFieldCount];
+    private ITypeTraits[] primaryTypeTraits = new ITypeTraits[primaryFieldCount];
     private int primaryKeyFieldCount = 1;
     private IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[primaryKeyFieldCount];
     private TypeAwareTupleWriterFactory primaryTupleWriterFactory = new TypeAwareTupleWriterFactory(primaryTypeTraits);
@@ -110,12 +111,12 @@
             IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
 
     // Inverted index BTree dictionary.
-    private ITypeTrait[] tokenTypeTraits = new ITypeTrait[1];
+    private ITypeTraits[] tokenTypeTraits = new ITypeTraits[1];
     private IBinaryComparatorFactory[] tokenComparatorFactories = new IBinaryComparatorFactory[1];
 
     // Inverted index stuff.
     private int invListElementFieldCount = 1;
-    private ITypeTrait[] invListsTypeTraits = new ITypeTrait[invListElementFieldCount];
+    private ITypeTraits[] invListsTypeTraits = new ITypeTraits[invListElementFieldCount];
     private IBinaryComparatorFactory[] invListsComparatorFactories = new IBinaryComparatorFactory[invListElementFieldCount];
     private RecordDescriptor tokenizerRecDesc = new RecordDescriptor(new ISerializerDeserializer[] {
             UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
@@ -130,17 +131,17 @@
     @Before
     public void setup() throws Exception {
         // Field declarations and comparators for primary BTree index.
-        primaryTypeTraits[0] = ITypeTrait.INTEGER_TYPE_TRAIT;
-        primaryTypeTraits[1] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryComparatorFactories[0] = IntegerBinaryComparatorFactory.INSTANCE;
+        primaryTypeTraits[0] = IntegerPointable.TYPE_TRAITS;
+        primaryTypeTraits[1] = UTF8StringPointable.TYPE_TRAITS;
+        primaryComparatorFactories[0] = PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY);
 
         // Field declarations and comparators for tokens.
-        tokenTypeTraits[0] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        tokenComparatorFactories[0] = UTF8StringBinaryComparatorFactory.INSTANCE;
+        tokenTypeTraits[0] = UTF8StringPointable.TYPE_TRAITS;
+        tokenComparatorFactories[0] = PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY);
 
         // Field declarations and comparators for inverted lists.
-        invListsTypeTraits[0] = ITypeTrait.INTEGER_TYPE_TRAIT;
-        invListsComparatorFactories[0] = IntegerBinaryComparatorFactory.INSTANCE;
+        invListsTypeTraits[0] = IntegerPointable.TYPE_TRAITS;
+        invListsComparatorFactories[0] = PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY);
 
         loadPrimaryIndex();
         printPrimaryIndex();
@@ -232,8 +233,8 @@
     private IOperatorDescriptor createExternalSortOp(JobSpecification spec, int[] sortFields,
             RecordDescriptor outputRecDesc) {
         ExternalSortOperatorDescriptor externalSortOp = new ExternalSortOperatorDescriptor(spec, 1000, sortFields,
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE,
-                        IntegerBinaryComparatorFactory.INSTANCE }, outputRecDesc);
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) }, outputRecDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, externalSortOp, NC1_ID);
         return externalSortOp;
     }
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/rtree/RTreePrimaryIndexSearchOperatorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/rtree/RTreePrimaryIndexSearchOperatorTest.java
index 74dc4a8..0c0df42 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/rtree/RTreePrimaryIndexSearchOperatorTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/rtree/RTreePrimaryIndexSearchOperatorTest.java
@@ -28,12 +28,14 @@
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITypeTrait;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.primitive.DoublePointable;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.DoubleBinaryComparatorFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.DoubleSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.DoubleParserFactory;
@@ -80,7 +82,7 @@
     // field, type and key declarations for primary R-tree index
     private int primaryFieldCount = 5;
     private int primaryKeyFieldCount = 4;
-    private ITypeTrait[] primaryTypeTraits = new ITypeTrait[primaryFieldCount];
+    private ITypeTraits[] primaryTypeTraits = new ITypeTraits[primaryFieldCount];
     private IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[primaryKeyFieldCount];
 
     private RTreeTypeAwareTupleWriterFactory primaryTupleWriterFactory = new RTreeTypeAwareTupleWriterFactory(
@@ -103,18 +105,18 @@
     @Before
     public void setup() throws Exception {
         // field, type and key declarations for primary R-tree index
-        primaryTypeTraits[0] = ITypeTrait.DOUBLE_TYPE_TRAIT;
-        primaryTypeTraits[1] = ITypeTrait.DOUBLE_TYPE_TRAIT;
-        primaryTypeTraits[2] = ITypeTrait.DOUBLE_TYPE_TRAIT;
-        primaryTypeTraits[3] = ITypeTrait.DOUBLE_TYPE_TRAIT;
-        primaryTypeTraits[4] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryComparatorFactories[0] = DoubleBinaryComparatorFactory.INSTANCE;
+        primaryTypeTraits[0] = DoublePointable.TYPE_TRAITS;
+        primaryTypeTraits[1] = DoublePointable.TYPE_TRAITS;
+        primaryTypeTraits[2] = DoublePointable.TYPE_TRAITS;
+        primaryTypeTraits[3] = DoublePointable.TYPE_TRAITS;
+        primaryTypeTraits[4] = UTF8StringPointable.TYPE_TRAITS;
+        primaryComparatorFactories[0] = PointableBinaryComparatorFactory.of(DoublePointable.FACTORY);
         primaryComparatorFactories[1] = primaryComparatorFactories[0];
         primaryComparatorFactories[2] = primaryComparatorFactories[0];
         primaryComparatorFactories[3] = primaryComparatorFactories[0];
 
         IPrimitiveValueProviderFactory[] primaryValueProviderFactories = RTreeUtils
-                .comparatorFactoriesToPrimitiveValueProviderFactories(primaryComparatorFactories);
+                .createPrimitiveValueProviderFactories(primaryComparatorFactories.length, DoublePointable.FACTORY);
 
         primaryInteriorFrameFactory = new RTreeNSMInteriorFrameFactory(primaryTupleWriterFactory,
                 primaryValueProviderFactories);
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/rtree/RTreePrimaryIndexStatsOperatorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/rtree/RTreePrimaryIndexStatsOperatorTest.java
index 62fc150..a4912e8 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/rtree/RTreePrimaryIndexStatsOperatorTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/rtree/RTreePrimaryIndexStatsOperatorTest.java
@@ -27,11 +27,12 @@
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITypeTrait;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.DoubleBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.primitive.DoublePointable;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.DoubleSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.DoubleParserFactory;
@@ -77,7 +78,7 @@
     // field, type and key declarations for primary R-tree index
     private int primaryFieldCount = 5;
     private int primaryKeyFieldCount = 4;
-    private ITypeTrait[] primaryTypeTraits = new ITypeTrait[primaryFieldCount];
+    private ITypeTraits[] primaryTypeTraits = new ITypeTraits[primaryFieldCount];
     private IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[primaryKeyFieldCount];
 
     private RTreeTypeAwareTupleWriterFactory primaryTupleWriterFactory = new RTreeTypeAwareTupleWriterFactory(
@@ -100,18 +101,18 @@
     @Before
     public void setup() throws Exception {
         // field, type and key declarations for primary R-tree index
-        primaryTypeTraits[0] = ITypeTrait.DOUBLE_TYPE_TRAIT;
-        primaryTypeTraits[1] = ITypeTrait.DOUBLE_TYPE_TRAIT;
-        primaryTypeTraits[2] = ITypeTrait.DOUBLE_TYPE_TRAIT;
-        primaryTypeTraits[3] = ITypeTrait.DOUBLE_TYPE_TRAIT;
-        primaryTypeTraits[4] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryComparatorFactories[0] = DoubleBinaryComparatorFactory.INSTANCE;
+        primaryTypeTraits[0] = DoublePointable.TYPE_TRAITS;
+        primaryTypeTraits[1] = DoublePointable.TYPE_TRAITS;
+        primaryTypeTraits[2] = DoublePointable.TYPE_TRAITS;
+        primaryTypeTraits[3] = DoublePointable.TYPE_TRAITS;
+        primaryTypeTraits[4] = DoublePointable.TYPE_TRAITS;
+        primaryComparatorFactories[0] = PointableBinaryComparatorFactory.of(DoublePointable.FACTORY);
         primaryComparatorFactories[1] = primaryComparatorFactories[0];
         primaryComparatorFactories[2] = primaryComparatorFactories[0];
         primaryComparatorFactories[3] = primaryComparatorFactories[0];
 
         IPrimitiveValueProviderFactory[] primaryValueProviderFactories = RTreeUtils
-                .comparatorFactoriesToPrimitiveValueProviderFactories(primaryComparatorFactories);
+                .createPrimitiveValueProviderFactories(primaryComparatorFactories.length, DoublePointable.FACTORY);
 
         primaryInteriorFrameFactory = new RTreeNSMInteriorFrameFactory(primaryTupleWriterFactory,
                 primaryValueProviderFactories);
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/rtree/RTreeSecondaryIndexSearchOperatorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/rtree/RTreeSecondaryIndexSearchOperatorTest.java
index 300a726..025f675 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/rtree/RTreeSecondaryIndexSearchOperatorTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/rtree/RTreeSecondaryIndexSearchOperatorTest.java
@@ -28,13 +28,14 @@
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITypeTrait;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.primitive.DoublePointable;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.DoubleBinaryComparatorFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.DoubleSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.DoubleParserFactory;
@@ -88,7 +89,7 @@
 
     // field, type and key declarations for primary B-tree index
     private int primaryBTreeFieldCount = 10;
-    private ITypeTrait[] primaryBTreeTypeTraits = new ITypeTrait[primaryBTreeFieldCount];
+    private ITypeTraits[] primaryBTreeTypeTraits = new ITypeTraits[primaryBTreeFieldCount];
     private int primaryBTreeKeyFieldCount = 1;
     private IBinaryComparatorFactory[] primaryBTreeComparatorFactories = new IBinaryComparatorFactory[primaryBTreeKeyFieldCount];
     private TypeAwareTupleWriterFactory primaryBTreeTupleWriterFactory = new TypeAwareTupleWriterFactory(
@@ -113,7 +114,7 @@
 
     // field, type and key declarations for secondary indexes
     private int secondaryFieldCount = 5;
-    private ITypeTrait[] secondaryTypeTraits = new ITypeTrait[secondaryFieldCount];
+    private ITypeTraits[] secondaryTypeTraits = new ITypeTraits[secondaryFieldCount];
     private int secondaryKeyFieldCount = 4;
     private IBinaryComparatorFactory[] secondaryComparatorFactories = new IBinaryComparatorFactory[secondaryKeyFieldCount];
     private IPrimitiveValueProviderFactory[] secondaryValueProviderFactories = new IPrimitiveValueProviderFactory[secondaryKeyFieldCount];
@@ -138,25 +139,25 @@
     @Before
     public void setup() throws Exception {
         // field, type and key declarations for primary B-tree index
-        primaryBTreeTypeTraits[0] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryBTreeTypeTraits[1] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryBTreeTypeTraits[2] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryBTreeTypeTraits[3] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryBTreeTypeTraits[4] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryBTreeTypeTraits[5] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        primaryBTreeTypeTraits[6] = ITypeTrait.DOUBLE_TYPE_TRAIT;
-        primaryBTreeTypeTraits[7] = ITypeTrait.DOUBLE_TYPE_TRAIT;
-        primaryBTreeTypeTraits[8] = ITypeTrait.DOUBLE_TYPE_TRAIT;
-        primaryBTreeTypeTraits[9] = ITypeTrait.DOUBLE_TYPE_TRAIT;
-        primaryBTreeComparatorFactories[0] = UTF8StringBinaryComparatorFactory.INSTANCE;
+        primaryBTreeTypeTraits[0] = UTF8StringPointable.TYPE_TRAITS;
+        primaryBTreeTypeTraits[1] = UTF8StringPointable.TYPE_TRAITS;
+        primaryBTreeTypeTraits[2] = UTF8StringPointable.TYPE_TRAITS;
+        primaryBTreeTypeTraits[3] = UTF8StringPointable.TYPE_TRAITS;
+        primaryBTreeTypeTraits[4] = UTF8StringPointable.TYPE_TRAITS;
+        primaryBTreeTypeTraits[5] = UTF8StringPointable.TYPE_TRAITS;
+        primaryBTreeTypeTraits[6] = DoublePointable.TYPE_TRAITS;
+        primaryBTreeTypeTraits[7] = DoublePointable.TYPE_TRAITS;
+        primaryBTreeTypeTraits[8] = DoublePointable.TYPE_TRAITS;
+        primaryBTreeTypeTraits[9] = DoublePointable.TYPE_TRAITS;
+        primaryBTreeComparatorFactories[0] = PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY);
 
         // field, type and key declarations for secondary indexes
-        secondaryTypeTraits[0] = ITypeTrait.DOUBLE_TYPE_TRAIT;
-        secondaryTypeTraits[1] = ITypeTrait.DOUBLE_TYPE_TRAIT;
-        secondaryTypeTraits[2] = ITypeTrait.DOUBLE_TYPE_TRAIT;
-        secondaryTypeTraits[3] = ITypeTrait.DOUBLE_TYPE_TRAIT;
-        secondaryTypeTraits[4] = ITypeTrait.VARLEN_TYPE_TRAIT;
-        secondaryComparatorFactories[0] = DoubleBinaryComparatorFactory.INSTANCE;
+        secondaryTypeTraits[0] = DoublePointable.TYPE_TRAITS;
+        secondaryTypeTraits[1] = DoublePointable.TYPE_TRAITS;
+        secondaryTypeTraits[2] = DoublePointable.TYPE_TRAITS;
+        secondaryTypeTraits[3] = DoublePointable.TYPE_TRAITS;
+        secondaryTypeTraits[4] = UTF8StringPointable.TYPE_TRAITS;
+        secondaryComparatorFactories[0] = PointableBinaryComparatorFactory.of(DoublePointable.FACTORY);
         secondaryComparatorFactories[1] = secondaryComparatorFactories[0];
         secondaryComparatorFactories[2] = secondaryComparatorFactories[0];
         secondaryComparatorFactories[3] = secondaryComparatorFactories[0];
@@ -166,7 +167,7 @@
         secondaryValueProviderFactories[3] = secondaryValueProviderFactories[0];
 
         IPrimitiveValueProviderFactory[] secondaryValueProviderFactories = RTreeUtils
-                .comparatorFactoriesToPrimitiveValueProviderFactories(secondaryComparatorFactories);
+                .createPrimitiveValueProviderFactories(secondaryComparatorFactories.length, DoublePointable.FACTORY);
 
         secondaryInteriorFrameFactory = new RTreeNSMInteriorFrameFactory(secondaryTupleWriterFactory,
                 secondaryValueProviderFactories);
@@ -203,7 +204,8 @@
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID);
 
         ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, 1000, new int[] { 0 },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, ordersDesc);
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID);
 
         int[] fieldPermutation = { 0, 1, 2, 4, 5, 7, 9, 10, 11, 12 };