fix raw binary comparator
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index aa50645..88edc91 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -65,7 +65,6 @@
import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.sort.Algorithm;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs.lib.RawBinaryComparatorFactory;
import edu.uci.ics.hyracks.hdfs2.dataflow.HDFSReadOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
@@ -97,6 +96,7 @@
import edu.uci.ics.pregelix.core.data.TypeTraits;
import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.runtime.touchpoint.RawBinaryComparatorFactory;
import edu.uci.ics.pregelix.core.runtime.touchpoint.RawNormalizedKeyComputerFactory;
import edu.uci.ics.pregelix.core.util.DataflowUtils;
import edu.uci.ics.pregelix.dataflow.ClearStateOperatorDescriptor;
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index 32494dc..103c1b6 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -52,7 +52,6 @@
import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.sort.Algorithm;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs.lib.RawBinaryComparatorFactory;
import edu.uci.ics.hyracks.hdfs2.dataflow.HDFSReadOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
@@ -64,6 +63,7 @@
import edu.uci.ics.pregelix.core.data.TypeTraits;
import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.runtime.touchpoint.RawBinaryComparatorFactory;
import edu.uci.ics.pregelix.core.util.DataflowUtils;
import edu.uci.ics.pregelix.dataflow.ConnectorPolicyAssignmentPolicy;
import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index 26142a3..308f422 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -34,7 +34,6 @@
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.sort.Algorithm;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs.lib.RawBinaryComparatorFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
@@ -44,6 +43,7 @@
import edu.uci.ics.pregelix.core.data.TypeTraits;
import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.runtime.touchpoint.RawBinaryComparatorFactory;
import edu.uci.ics.pregelix.core.util.DataflowUtils;
import edu.uci.ics.pregelix.dataflow.ConnectorPolicyAssignmentPolicy;
import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index ec0cff4..718a271 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -33,7 +33,6 @@
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.sort.Algorithm;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs.lib.RawBinaryComparatorFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
@@ -43,6 +42,7 @@
import edu.uci.ics.pregelix.core.data.TypeTraits;
import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.runtime.touchpoint.RawBinaryComparatorFactory;
import edu.uci.ics.pregelix.core.util.DataflowUtils;
import edu.uci.ics.pregelix.dataflow.ConnectorPolicyAssignmentPolicy;
import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
index e918ede..75635c9 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -33,7 +33,6 @@
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.sort.Algorithm;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
-import edu.uci.ics.hyracks.hdfs.lib.RawBinaryComparatorFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
@@ -43,6 +42,7 @@
import edu.uci.ics.pregelix.core.data.TypeTraits;
import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.runtime.touchpoint.RawBinaryComparatorFactory;
import edu.uci.ics.pregelix.core.util.DataflowUtils;
import edu.uci.ics.pregelix.dataflow.ConnectorPolicyAssignmentPolicy;
import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenUtil.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenUtil.java
index 3478713..97cea99 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenUtil.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenUtil.java
@@ -19,9 +19,9 @@
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
-import edu.uci.ics.hyracks.hdfs.lib.RawBinaryComparatorFactory;
import edu.uci.ics.pregelix.api.graph.NormalizedKeyComputer;
import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.core.runtime.touchpoint.RawBinaryComparatorFactory;
import edu.uci.ics.pregelix.core.runtime.touchpoint.RawNormalizedKeyComputerFactory;
import edu.uci.ics.pregelix.core.runtime.touchpoint.WritableComparingBinaryComparatorFactory;
import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdNormalizedKeyComputerFactory;
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/runtime/touchpoint/RawBinaryComparatorFactory.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/runtime/touchpoint/RawBinaryComparatorFactory.java
new file mode 100644
index 0000000..c0173ed
--- /dev/null
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/runtime/touchpoint/RawBinaryComparatorFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.runtime.touchpoint;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+
+public class RawBinaryComparatorFactory implements IBinaryComparatorFactory {
+
+ private static final long serialVersionUID = 1L;
+ public static IBinaryComparatorFactory INSTANCE = new RawBinaryComparatorFactory();
+
+ private RawBinaryComparatorFactory() {
+ }
+
+ @Override
+ public IBinaryComparator createBinaryComparator() {
+ return new IBinaryComparator() {
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ int commonLength = Math.min(l1, l2);
+ for (int i = 0; i < commonLength; i++) {
+ if (b1[s1 + i] != b2[s2 + i]) {
+ return (b1[s1 + i] & 0xff) - (b2[s2 + i] & 0xff);
+ }
+ }
+ int difference = l1 - l2;
+ return difference == 0 ? 0 : (difference > 0 ? 1 : -1);
+ }
+
+ };
+ }
+}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/runtime/touchpoint/RawNormalizedKeyComputerFactory.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/runtime/touchpoint/RawNormalizedKeyComputerFactory.java
index 28efc90..6aa4af9 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/runtime/touchpoint/RawNormalizedKeyComputerFactory.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/runtime/touchpoint/RawNormalizedKeyComputerFactory.java
@@ -17,7 +17,6 @@
import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
public class RawNormalizedKeyComputerFactory implements INormalizedKeyComputerFactory {
private static final long serialVersionUID = 1L;
@@ -33,19 +32,14 @@
@Override
public int normalize(byte[] bytes, int start, int length) {
- int value = 0;
- if (length > 4) {
- value = IntegerSerializerDeserializer.getInt(bytes, start);
- } else {
- for (int i = start; i < start + length; i++) {
- value = value << 8;
- value += bytes[i] & 0xff;
- }
- for (int i = 0; i < 4 - length; i++) {
- value = value << 8;
+ int nk = 0;
+ for (int i = 0; i < 4; i++) {
+ nk <<= 8;
+ if (i < length) {
+ nk += (bytes[start + i] & 0xff);
}
}
- return value ^ Integer.MIN_VALUE;
+ return nk;
}
};