Fixed NC memory leak. Fixed duplicate job cleanup. Cleaned up data treatment in Hyracks

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@865 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-examples/tpch-example/tpchapp/pom.xml b/hyracks-examples/tpch-example/tpchapp/pom.xml
index c65aaf5..05256a3 100644
--- a/hyracks-examples/tpch-example/tpchapp/pom.xml
+++ b/hyracks-examples/tpch-example/tpchapp/pom.xml
@@ -2,8 +2,6 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks.examples.tpch</groupId>
   <artifactId>tpchapp</artifactId>
-  <version>0.2.0-SNAPSHOT</version>
-
   <parent>
     <groupId>edu.uci.ics.hyracks.examples</groupId>
     <artifactId>tpch-example</artifactId>
@@ -83,5 +81,10 @@
         <version>0.2.0-SNAPSHOT</version>
         <scope>compile</scope>
     </dependency>
+    <dependency>
+    	<groupId>edu.uci.ics.hyracks</groupId>
+    	<artifactId>hyracks-data-std</artifactId>
+    	<version>0.2.0-SNAPSHOT</version>
+    </dependency>
   </dependencies>
 </project>
diff --git a/hyracks-examples/tpch-example/tpchclient/pom.xml b/hyracks-examples/tpch-example/tpchclient/pom.xml
index c7a138a..d9b5fce 100644
--- a/hyracks-examples/tpch-example/tpchclient/pom.xml
+++ b/hyracks-examples/tpch-example/tpchclient/pom.xml
@@ -2,8 +2,6 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks.examples.tpch</groupId>
   <artifactId>tpchclient</artifactId>
-  <version>0.2.0-SNAPSHOT</version>
-
   <parent>
     <groupId>edu.uci.ics.hyracks.examples</groupId>
     <artifactId>tpch-example</artifactId>
@@ -17,6 +15,11 @@
   		<version>0.2.0-SNAPSHOT</version>
   		<scope>compile</scope>
   	</dependency>
+  	<dependency>
+  		<groupId>edu.uci.ics.hyracks</groupId>
+  		<artifactId>hyracks-data-std</artifactId>
+  		<version>0.2.0-SNAPSHOT</version>
+  	</dependency>
   </dependencies>
   <build>
     <plugins>
diff --git a/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java b/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
index a43aafb..cffa0f4 100644
--- a/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
+++ b/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
@@ -38,8 +38,9 @@
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.hash.UTF8StringBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
@@ -117,24 +118,16 @@
         CmdLineParser parser = new CmdLineParser(options);
         parser.parseArgument(args);
 
-        IHyracksClientConnection hcc = new HyracksRMIConnection(options.host,
-                options.port);
+        IHyracksClientConnection hcc = new HyracksRMIConnection(options.host, options.port);
 
-        JobSpecification job = createJob(
-                parseFileSplits(options.inFileCustomerSplits),
-                parseFileSplits(options.inFileOrderSplits),
-                parseFileSplits(options.outFileSplits),
-                options.numJoinPartitions, options.algo,
-                options.graceInputSize, options.graceRecordsPerFrame,
-                options.graceFactor, options.memSize, options.tableSize,
-                options.hasGroupBy);
+        JobSpecification job = createJob(parseFileSplits(options.inFileCustomerSplits),
+                parseFileSplits(options.inFileOrderSplits), parseFileSplits(options.outFileSplits),
+                options.numJoinPartitions, options.algo, options.graceInputSize, options.graceRecordsPerFrame,
+                options.graceFactor, options.memSize, options.tableSize, options.hasGroupBy);
 
         long start = System.currentTimeMillis();
-        JobId jobId = hcc.createJob(
-                options.app,
-                job,
-                options.profile ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet
-                        .noneOf(JobFlag.class));
+        JobId jobId = hcc.createJob(options.app, job,
+                options.profile ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
         hcc.start(jobId);
         hcc.waitForCompletion(jobId);
         long end = System.currentTimeMillis();
@@ -148,104 +141,65 @@
             String s = splits[i].trim();
             int idx = s.indexOf(':');
             if (idx < 0) {
-                throw new IllegalArgumentException("File split " + s
-                        + " not well formed");
+                throw new IllegalArgumentException("File split " + s + " not well formed");
             }
-            fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(
-                    new File(s.substring(idx + 1))));
+            fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(new File(s.substring(idx + 1))));
         }
         return fSplits;
     }
 
-    private static JobSpecification createJob(FileSplit[] customerSplits,
-            FileSplit[] orderSplits, FileSplit[] resultSplits,
-            int numJoinPartitions, String algo, int graceInputSize,
-            int graceRecordsPerFrame, double graceFactor, int memSize,
-            int tableSize, boolean hasGroupBy) throws HyracksDataException {
+    private static JobSpecification createJob(FileSplit[] customerSplits, FileSplit[] orderSplits,
+            FileSplit[] resultSplits, int numJoinPartitions, String algo, int graceInputSize, int graceRecordsPerFrame,
+            double graceFactor, int memSize, int tableSize, boolean hasGroupBy) throws HyracksDataException {
         JobSpecification spec = new JobSpecification();
 
-        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(
-                customerSplits);
-        RecordDescriptor custDesc = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE });
+        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(customerSplits);
+        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
 
-        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(
-                orderSplits);
-        RecordDescriptor ordersDesc = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE });
+        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(orderSplits);
+        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
 
-        RecordDescriptor custOrderJoinDesc = new RecordDescriptor(
-                new ISerializerDeserializer[] {
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE,
-                        UTF8StringSerializerDeserializer.INSTANCE });
+        RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
 
-        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(
-                spec, ordersSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] {
-                        UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
         createPartitionConstraint(spec, ordScanner, orderSplits);
 
-        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(
-                spec, custSplitsProvider, new DelimitedDataTupleParserFactory(
-                        new IValueParserFactory[] {
-                                UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE }, '|'),
-                custDesc);
+        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
         createPartitionConstraint(spec, custScanner, customerSplits);
 
         IOperatorDescriptor join;
 
         if ("nestedloop".equalsIgnoreCase(algo)) {
-            join = new NestedLoopJoinOperatorDescriptor(spec,
-                    new JoinComparatorFactory(
-                            UTF8StringBinaryComparatorFactory.INSTANCE, 0, 1),
-                    custOrderJoinDesc, memSize);
+            join = new NestedLoopJoinOperatorDescriptor(spec, new JoinComparatorFactory(
+                    PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1), custOrderJoinDesc, memSize);
 
         } else if ("gracehash".equalsIgnoreCase(algo)) {
             join = new GraceHashJoinOperatorDescriptor(
@@ -256,8 +210,9 @@
                     graceFactor,
                     new int[] { 0 },
                     new int[] { 1 },
-                    new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
-                    new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                    new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                            .of(UTF8StringPointable.FACTORY) },
+                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
                     custOrderJoinDesc);
 
         } else if ("hybridhash".equalsIgnoreCase(algo)) {
@@ -269,8 +224,9 @@
                     graceFactor,
                     new int[] { 0 },
                     new int[] { 1 },
-                    new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
-                    new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                    new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                            .of(UTF8StringPointable.FACTORY) },
+                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
                     custOrderJoinDesc);
 
         } else {
@@ -278,81 +234,71 @@
                     spec,
                     new int[] { 0 },
                     new int[] { 1 },
-                    new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
-                    new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                    new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                            .of(UTF8StringPointable.FACTORY) },
+                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
                     custOrderJoinDesc, 6000000);
         }
 
-        PartitionConstraintHelper.addPartitionCountConstraint(spec, join,
-                numJoinPartitions);
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, join, numJoinPartitions);
 
-        IConnectorDescriptor ordJoinConn = new MToNPartitioningConnectorDescriptor(
-                spec,
-                new FieldHashPartitionComputerFactory(
-                        new int[] { 1 },
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        IConnectorDescriptor ordJoinConn = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(new int[] { 1 },
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
         spec.connect(ordJoinConn, ordScanner, 0, join, 1);
 
-        IConnectorDescriptor custJoinConn = new MToNPartitioningConnectorDescriptor(
-                spec,
-                new FieldHashPartitionComputerFactory(
-                        new int[] { 0 },
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+        IConnectorDescriptor custJoinConn = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(new int[] { 0 },
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
         spec.connect(custJoinConn, custScanner, 0, join, 0);
 
         IOperatorDescriptor endingOp = join;
 
         if (hasGroupBy) {
 
-            RecordDescriptor groupResultDesc = new RecordDescriptor(
-                    new ISerializerDeserializer[] {
-                            UTF8StringSerializerDeserializer.INSTANCE,
-                            IntegerSerializerDeserializer.INSTANCE });
+            RecordDescriptor groupResultDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                    UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
 
             HashGroupOperatorDescriptor gby = new HashGroupOperatorDescriptor(
                     spec,
                     new int[] { 6 },
-                    new FieldHashPartitionComputerFactory(
-                            new int[] { 6 },
-                            new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-                    new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                    new FieldHashPartitionComputerFactory(new int[] { 6 },
+                            new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                    .of(UTF8StringPointable.FACTORY) }),
+                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
                     new MultiAggregatorFactory(
                             new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
                     groupResultDesc, 16);
             createPartitionConstraint(spec, gby, resultSplits);
 
-            IConnectorDescriptor joinGroupConn = new MToNPartitioningConnectorDescriptor(
-                    spec,
-                    new FieldHashPartitionComputerFactory(
-                            new int[] { 6 },
-                            new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+            IConnectorDescriptor joinGroupConn = new MToNPartitioningConnectorDescriptor(spec,
+                    new FieldHashPartitionComputerFactory(new int[] { 6 },
+                            new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                    .of(UTF8StringPointable.FACTORY) }));
             spec.connect(joinGroupConn, join, 0, gby, 0);
 
             endingOp = gby;
         }
 
-        IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(
-                resultSplits);
-        FrameFileWriterOperatorDescriptor writer = new FrameFileWriterOperatorDescriptor(
-                spec, outSplitProvider);
+        IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(resultSplits);
+        FrameFileWriterOperatorDescriptor writer = new FrameFileWriterOperatorDescriptor(spec, outSplitProvider);
         createPartitionConstraint(spec, writer, resultSplits);
 
-        IConnectorDescriptor endingPrinterConn = new OneToOneConnectorDescriptor(
-                spec);
+        IConnectorDescriptor endingPrinterConn = new OneToOneConnectorDescriptor(spec);
         spec.connect(endingPrinterConn, endingOp, 0, writer, 0);
 
         spec.addRoot(writer);
         return spec;
     }
 
-    private static void createPartitionConstraint(JobSpecification spec,
-            IOperatorDescriptor op, FileSplit[] splits) {
+    private static void createPartitionConstraint(JobSpecification spec, IOperatorDescriptor op, FileSplit[] splits) {
         String[] parts = new String[splits.length];
         for (int i = 0; i < splits.length; ++i) {
             parts[i] = splits[i].getNodeName();
         }
-        PartitionConstraintHelper
-                .addAbsoluteLocationConstraint(spec, op, parts);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, op, parts);
     }
 
     static class JoinComparatorFactory implements ITuplePairComparatorFactory {
@@ -362,8 +308,7 @@
         private final int pos0;
         private final int pos1;
 
-        public JoinComparatorFactory(IBinaryComparatorFactory bFactory,
-                int pos0, int pos1) {
+        public JoinComparatorFactory(IBinaryComparatorFactory bFactory, int pos0, int pos1) {
             this.bFactory = bFactory;
             this.pos0 = pos0;
             this.pos1 = pos1;
@@ -371,8 +316,7 @@
 
         @Override
         public ITuplePairComparator createTuplePairComparator() {
-            return new JoinComparator(bFactory.createBinaryComparator(), pos0,
-                    pos1);
+            return new JoinComparator(bFactory.createBinaryComparator(), pos0, pos1);
         }
     }
 
@@ -382,16 +326,14 @@
         private final int field0;
         private final int field1;
 
-        public JoinComparator(IBinaryComparator bComparator, int field0,
-                int field1) {
+        public JoinComparator(IBinaryComparator bComparator, int field0, int field1) {
             this.bComparator = bComparator;
             this.field0 = field0;
             this.field1 = field1;
         }
 
         @Override
-        public int compare(IFrameTupleAccessor accessor0, int tIndex0,
-                IFrameTupleAccessor accessor1, int tIndex1) {
+        public int compare(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1, int tIndex1) {
             int tStart0 = accessor0.getTupleStartOffset(tIndex0);
             int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0;
 
@@ -406,9 +348,8 @@
             int fEnd1 = accessor1.getFieldEndOffset(tIndex1, field1);
             int fLen1 = fEnd1 - fStart1;
 
-            int c = bComparator.compare(accessor0.getBuffer().array(), fStart0
-                    + fStartOffset0, fLen0, accessor1.getBuffer().array(),
-                    fStart1 + fStartOffset1, fLen1);
+            int c = bComparator.compare(accessor0.getBuffer().array(), fStart0 + fStartOffset0, fLen0, accessor1
+                    .getBuffer().array(), fStart1 + fStartOffset1, fLen1);
             if (c != 0) {
                 return c;
             }