[ASTERIXDB-2149] Refactor key normalizer with longer keys
- user model changes: no
- storage format changes: no
- interface changes: yes. The interface of key normalized is changed.
Details:
- Refactored key normalizer to work with longer normalized keys composed
of multiple integers.
- Add tests for key normalizers
- Add key normalizer for UUID type to improve sort performance.
Change-Id: Idba747285af74195ef9953ed9bf5f6f217511380
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2225
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/keynormalizers/AUUIDNormalizedKeyComputerFactory.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/keynormalizers/AUUIDNormalizedKeyComputerFactory.java
new file mode 100644
index 0000000..c8a774d
--- /dev/null
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/keynormalizers/AUUIDNormalizedKeyComputerFactory.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.dataflow.data.nontagged.keynormalizers;
+
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyProperties;
+import org.apache.hyracks.dataflow.common.data.normalizers.Integer64NormalizedKeyComputerFactory;
+
+public class AUUIDNormalizedKeyComputerFactory implements INormalizedKeyComputerFactory {
+ private static final long serialVersionUID = 1L;
+
+ public static final INormalizedKeyProperties PROPERTIES = new INormalizedKeyProperties() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public int getNormalizedKeyLength() {
+ return 4;
+ }
+
+ @Override
+ public boolean isDecisive() {
+ return true;
+ }
+ };
+
+ private final INormalizedKeyComputerFactory int64NormalizerFactory;
+ private final int int64NormalizedKeyLength;
+
+ public AUUIDNormalizedKeyComputerFactory() {
+ int64NormalizerFactory = new Integer64NormalizedKeyComputerFactory();
+ int64NormalizedKeyLength = int64NormalizerFactory.getNormalizedKeyProperties().getNormalizedKeyLength();
+ }
+
+ @Override
+ public INormalizedKeyComputer createNormalizedKeyComputer() {
+ final INormalizedKeyComputer nkc = int64NormalizerFactory.createNormalizedKeyComputer();
+ return new INormalizedKeyComputer() {
+
+ @Override
+ public void normalize(byte[] bytes, int start, int length, int[] normalizedKeys, int keyStart) {
+ // normalize msb
+ nkc.normalize(bytes, start, length, normalizedKeys, keyStart);
+ // normalize lsb
+ nkc.normalize(bytes, start + Long.BYTES, length - Long.BYTES, normalizedKeys,
+ keyStart + int64NormalizedKeyLength);
+ }
+
+ @Override
+ public INormalizedKeyProperties getNormalizedKeyProperties() {
+ return PROPERTIES;
+ }
+ };
+
+ }
+
+ @Override
+ public INormalizedKeyProperties getNormalizedKeyProperties() {
+ return PROPERTIES;
+ }
+
+}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/keynormalizers/AWrappedAscNormalizedKeyComputerFactory.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/keynormalizers/AWrappedAscNormalizedKeyComputerFactory.java
index de214fc..c676d13 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/keynormalizers/AWrappedAscNormalizedKeyComputerFactory.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/keynormalizers/AWrappedAscNormalizedKeyComputerFactory.java
@@ -21,6 +21,7 @@
import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyProperties;
/**
* This class uses a decorator pattern to wrap an ASC ordered INomralizedKeyComputerFactory implementation to
@@ -41,11 +42,22 @@
return new INormalizedKeyComputer() {
@Override
- public int normalize(byte[] bytes, int start, int length) {
+ public void normalize(byte[] bytes, int start, int length, int[] normalizedKeys, int keyStart) {
// start +1, length -1 is because in ASTERIX data format, there is always a type tag before the value
- return nkc.normalize(bytes, start + 1, length - 1);
+ nkc.normalize(bytes, start + 1, length - 1, normalizedKeys, keyStart);
+ }
+
+ @Override
+ public INormalizedKeyProperties getNormalizedKeyProperties() {
+ return nkc.getNormalizedKeyProperties();
}
};
+
+ }
+
+ @Override
+ public INormalizedKeyProperties getNormalizedKeyProperties() {
+ return nkcf.getNormalizedKeyProperties();
}
}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/keynormalizers/AWrappedDescNormalizedKeyComputerFactory.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/keynormalizers/AWrappedDescNormalizedKeyComputerFactory.java
index 6e02029..86b33e8 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/keynormalizers/AWrappedDescNormalizedKeyComputerFactory.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/keynormalizers/AWrappedDescNormalizedKeyComputerFactory.java
@@ -21,6 +21,7 @@
import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyProperties;
/**
* This class uses a decorator pattern to wrap an ASC ordered INomralizedKeyComputerFactory implementation
@@ -30,9 +31,11 @@
private static final long serialVersionUID = 1L;
private final INormalizedKeyComputerFactory nkcf;
+ private final int normalizedKeyLength;
public AWrappedDescNormalizedKeyComputerFactory(INormalizedKeyComputerFactory nkcf) {
this.nkcf = nkcf;
+ this.normalizedKeyLength = nkcf.getNormalizedKeyProperties().getNormalizedKeyLength();
}
@Override
@@ -41,11 +44,24 @@
return new INormalizedKeyComputer() {
@Override
- public int normalize(byte[] bytes, int start, int length) {
- int key = nkc.normalize(bytes, start + 1, length - 1);
- return (int) ((long) 0xffffffff - (long) key);
+ public void normalize(byte[] bytes, int start, int length, int[] normalizedKeys, int keyStart) {
+ nkc.normalize(bytes, start + 1, length - 1, normalizedKeys, keyStart);
+ for (int i = 0; i < normalizedKeyLength; i++) {
+ int key = normalizedKeys[keyStart + i];
+ normalizedKeys[keyStart + i] = ~key;
+ }
+ }
+
+ @Override
+ public INormalizedKeyProperties getNormalizedKeyProperties() {
+ return nkc.getNormalizedKeyProperties();
}
};
}
+ @Override
+ public INormalizedKeyProperties getNormalizedKeyProperties() {
+ return nkcf.getNormalizedKeyProperties();
+ }
+
}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/NormalizedKeyComputerFactoryProvider.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/NormalizedKeyComputerFactoryProvider.java
index d17fc5a..d372062 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/NormalizedKeyComputerFactoryProvider.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/NormalizedKeyComputerFactoryProvider.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.formats.nontagged;
+import org.apache.asterix.dataflow.data.nontagged.keynormalizers.AUUIDNormalizedKeyComputerFactory;
import org.apache.asterix.dataflow.data.nontagged.keynormalizers.AWrappedAscNormalizedKeyComputerFactory;
import org.apache.asterix.dataflow.data.nontagged.keynormalizers.AWrappedDescNormalizedKeyComputerFactory;
import org.apache.asterix.om.types.IAType;
@@ -59,6 +60,8 @@
return new AWrappedAscNormalizedKeyComputerFactory(new UTF8StringNormalizedKeyComputerFactory());
case BINARY:
return new AWrappedAscNormalizedKeyComputerFactory(new ByteArrayNormalizedKeyComputerFactory());
+ case UUID:
+ return new AWrappedAscNormalizedKeyComputerFactory(new AUUIDNormalizedKeyComputerFactory());
default:
return null;
}
@@ -81,6 +84,8 @@
return new AWrappedDescNormalizedKeyComputerFactory(new UTF8StringNormalizedKeyComputerFactory());
case BINARY:
return new AWrappedDescNormalizedKeyComputerFactory(new ByteArrayNormalizedKeyComputerFactory());
+ case UUID:
+ return new AWrappedDescNormalizedKeyComputerFactory(new AUUIDNormalizedKeyComputerFactory());
default:
return null;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputer.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputer.java
index 7bf8255..7364bea 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputer.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputer.java
@@ -19,10 +19,7 @@
package org.apache.hyracks.api.dataflow.value;
public interface INormalizedKeyComputer {
- public int normalize(byte[] bytes, int start, int length);
+ void normalize(byte[] bytes, int start, int length, int[] normalizedKeys, int keyStart);
- default void normalize(byte[] bytes, int start, int length, int[] normalizedKeys, int keyStart) {
- int key = normalize(bytes, start, length);
- normalizedKeys[keyStart] = key;
- }
+ INormalizedKeyProperties getNormalizedKeyProperties();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputerFactory.java
index 901702e..a1339a9 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputerFactory.java
@@ -23,20 +23,5 @@
public interface INormalizedKeyComputerFactory extends Serializable {
public INormalizedKeyComputer createNormalizedKeyComputer();
- /**
- *
- * @return The length of the normalized key in terms of integers
- */
- default int getNormalizedKeyLength() {
- return 1;
- }
-
- /**
- *
- * @return Whether we can solely rely on this normalized key to complete comparison,
- * even when two normalized keys are equal
- */
- default boolean isDecisive() {
- return false;
- }
+ public INormalizedKeyProperties getNormalizedKeyProperties();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyProperties.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyProperties.java
new file mode 100644
index 0000000..63e799a
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyProperties.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.dataflow.value;
+
+import java.io.Serializable;
+
+public interface INormalizedKeyProperties extends Serializable {
+ /**
+ *
+ * @return The length of the normalized key in terms of integers
+ */
+ public int getNormalizedKeyLength();
+
+ /**
+ *
+ * @return Whether we can solely rely on this normalized key to complete comparison,
+ * even when two normalized keys are equal
+ */
+ public boolean isDecisive();
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/ByteArrayNormalizedKeyComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/ByteArrayNormalizedKeyComputerFactory.java
index 3d081af..9557245 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/ByteArrayNormalizedKeyComputerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/ByteArrayNormalizedKeyComputerFactory.java
@@ -21,18 +21,45 @@
import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyProperties;
import org.apache.hyracks.data.std.primitive.ByteArrayPointable;
public class ByteArrayNormalizedKeyComputerFactory implements INormalizedKeyComputerFactory {
+ private static final long serialVersionUID = 1L;
+
+ public static final INormalizedKeyProperties PROPERTIES = new INormalizedKeyProperties() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public int getNormalizedKeyLength() {
+ return 1;
+ }
+
+ @Override
+ public boolean isDecisive() {
+ return false;
+ }
+ };
+
public static ByteArrayNormalizedKeyComputerFactory INSTANCE = new ByteArrayNormalizedKeyComputerFactory();
@Override
public INormalizedKeyComputer createNormalizedKeyComputer() {
return new INormalizedKeyComputer() {
@Override
- public int normalize(byte[] bytes, int start, int length) {
- return ByteArrayPointable.normalize(bytes, start);
+ public void normalize(byte[] bytes, int start, int length, int[] normalizedKeys, int keyStart) {
+ normalizedKeys[keyStart] = ByteArrayPointable.normalize(bytes, start);
+ }
+
+ @Override
+ public INormalizedKeyProperties getNormalizedKeyProperties() {
+ return PROPERTIES;
}
};
}
+
+ @Override
+ public INormalizedKeyProperties getNormalizedKeyProperties() {
+ return PROPERTIES;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/DoubleNormalizedKeyComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/DoubleNormalizedKeyComputerFactory.java
index 353793b..4d5d57c 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/DoubleNormalizedKeyComputerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/DoubleNormalizedKeyComputerFactory.java
@@ -20,28 +20,53 @@
import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
-import org.apache.hyracks.data.std.primitive.IntegerPointable;
-import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyProperties;
+import org.apache.hyracks.data.std.primitive.LongPointable;
+import org.apache.hyracks.dataflow.common.utils.NormalizedKeyUtils;
public class DoubleNormalizedKeyComputerFactory implements INormalizedKeyComputerFactory {
private static final long serialVersionUID = 1L;
+ public static final INormalizedKeyProperties PROPERTIES = new INormalizedKeyProperties() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public int getNormalizedKeyLength() {
+ return 2;
+ }
+
+ @Override
+ public boolean isDecisive() {
+ return true;
+ }
+ };
+
@Override
public INormalizedKeyComputer createNormalizedKeyComputer() {
return new INormalizedKeyComputer() {
@Override
- public int normalize(byte[] bytes, int start, int length) {
- int prefix = IntegerPointable.getInteger(bytes, start);
- if (prefix >= 0) {
- return prefix ^ Integer.MIN_VALUE;
+ public void normalize(byte[] bytes, int start, int length, int[] normalizedKeys, int keyStart) {
+ long value = LongPointable.getLong(bytes, start);
+ if (value >= 0) {
+ value = value ^ Long.MIN_VALUE;
} else {
- return (int) ((long) 0xffffffff - (long) prefix);
+ value = ~value;
}
+ NormalizedKeyUtils.putLongIntoNormalizedKeys(normalizedKeys, keyStart, value);
+ }
+
+ @Override
+ public INormalizedKeyProperties getNormalizedKeyProperties() {
+ return PROPERTIES;
}
};
}
+ @Override
+ public INormalizedKeyProperties getNormalizedKeyProperties() {
+ return PROPERTIES;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/FloatNormalizedKeyComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/FloatNormalizedKeyComputerFactory.java
index eb571a5..3b0d9d7 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/FloatNormalizedKeyComputerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/FloatNormalizedKeyComputerFactory.java
@@ -20,28 +20,51 @@
import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyProperties;
import org.apache.hyracks.data.std.primitive.IntegerPointable;
-import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
public class FloatNormalizedKeyComputerFactory implements INormalizedKeyComputerFactory {
private static final long serialVersionUID = 1L;
+ public static final INormalizedKeyProperties PROPERTIES = new INormalizedKeyProperties() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public int getNormalizedKeyLength() {
+ return 1;
+ }
+
+ @Override
+ public boolean isDecisive() {
+ return true;
+ }
+ };
+
@Override
public INormalizedKeyComputer createNormalizedKeyComputer() {
return new INormalizedKeyComputer() {
-
@Override
- public int normalize(byte[] bytes, int start, int length) {
- int prefix = IntegerPointable.getInteger(bytes, start);
- if (prefix >= 0) {
- return prefix ^ Integer.MIN_VALUE;
+ public void normalize(byte[] bytes, int start, int length, int[] normalizedKeys, int keyStart) {
+ int value = IntegerPointable.getInteger(bytes, start);
+ if (value >= 0) {
+ normalizedKeys[keyStart] = value ^ Integer.MIN_VALUE;
} else {
- return (int) ((long) 0xffffffff - (long) prefix);
+ // invert the key
+ normalizedKeys[keyStart] = ~value;
}
}
+ @Override
+ public INormalizedKeyProperties getNormalizedKeyProperties() {
+ return PROPERTIES;
+ }
};
}
+ @Override
+ public INormalizedKeyProperties getNormalizedKeyProperties() {
+ return PROPERTIES;
+ }
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/Integer64NormalizedKeyComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/Integer64NormalizedKeyComputerFactory.java
index fa74665..853499f 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/Integer64NormalizedKeyComputerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/Integer64NormalizedKeyComputerFactory.java
@@ -20,55 +20,47 @@
import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyProperties;
import org.apache.hyracks.data.std.primitive.LongPointable;
-import org.apache.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;
+import org.apache.hyracks.dataflow.common.utils.NormalizedKeyUtils;
public class Integer64NormalizedKeyComputerFactory implements INormalizedKeyComputerFactory {
- private static final long serialVersionUID = 8735044913496854551L;
+ private static final long serialVersionUID = 1L;
+
+ public static final INormalizedKeyProperties PROPERTIES = new INormalizedKeyProperties() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public int getNormalizedKeyLength() {
+ return 2;
+ }
+
+ @Override
+ public boolean isDecisive() {
+ return true;
+ }
+ };
@Override
public INormalizedKeyComputer createNormalizedKeyComputer() {
return new INormalizedKeyComputer() {
- private static final int POSTIVE_LONG_MASK = (3 << 30);
- private static final int NON_NEGATIVE_INT_MASK = (2 << 30);
- private static final int NEGATIVE_LONG_MASK = (0 << 30);
+ @Override
+ public void normalize(byte[] bytes, int start, int length, int[] normalizedKeys, int keyStart) {
+ long value = LongPointable.getLong(bytes, start);
+ value = value ^ Long.MIN_VALUE;
+ NormalizedKeyUtils.putLongIntoNormalizedKeys(normalizedKeys, keyStart, value);
+ }
@Override
- public int normalize(byte[] bytes, int start, int length) {
- long value = LongPointable.getLong(bytes, start);
- int highValue = (int) (value >> 32);
- if (value > Integer.MAX_VALUE) {
- /**
- * larger than Integer.MAX
- */
- int highNmk = getKey(highValue);
- highNmk >>= 2;
- highNmk |= POSTIVE_LONG_MASK;
- return highNmk;
- } else if (value >=0 && value <= Integer.MAX_VALUE) {
- /**
- * smaller than Integer.MAX but >=0
- */
- int lowNmk = (int) value;
- lowNmk >>= 2;
- lowNmk |= NON_NEGATIVE_INT_MASK;
- return lowNmk;
- } else {
- /**
- * less than 0: have not optimized for that
- */
- int highNmk = getKey(highValue);
- highNmk >>= 2;
- highNmk |= NEGATIVE_LONG_MASK;
- return highNmk;
- }
+ public INormalizedKeyProperties getNormalizedKeyProperties() {
+ return PROPERTIES;
}
-
- private int getKey(int value) {
- return value ^ Integer.MIN_VALUE;
- }
-
};
}
+
+ @Override
+ public INormalizedKeyProperties getNormalizedKeyProperties() {
+ return PROPERTIES;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/IntegerNormalizedKeyComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/IntegerNormalizedKeyComputerFactory.java
index 41c0740..60b8d7d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/IntegerNormalizedKeyComputerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/IntegerNormalizedKeyComputerFactory.java
@@ -20,24 +20,44 @@
import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyProperties;
import org.apache.hyracks.data.std.primitive.IntegerPointable;
public class IntegerNormalizedKeyComputerFactory implements INormalizedKeyComputerFactory {
private static final long serialVersionUID = 1L;
+ public static final INormalizedKeyProperties PROPERTIES = new INormalizedKeyProperties() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public int getNormalizedKeyLength() {
+ return 1;
+ }
+
+ @Override
+ public boolean isDecisive() {
+ return true;
+ }
+ };
+
@Override
public INormalizedKeyComputer createNormalizedKeyComputer() {
return new INormalizedKeyComputer() {
@Override
- public int normalize(byte[] bytes, int start, int length) {
+ public void normalize(byte[] bytes, int start, int length, int[] normalizedKeys, int keyStart) {
int value = IntegerPointable.getInteger(bytes, start);
- return value ^ Integer.MIN_VALUE;
+ normalizedKeys[keyStart] = value ^ Integer.MIN_VALUE;
+ }
+
+ @Override
+ public INormalizedKeyProperties getNormalizedKeyProperties() {
+ return PROPERTIES;
}
};
}
@Override
- public boolean isDecisive() {
- return true;
+ public INormalizedKeyProperties getNormalizedKeyProperties() {
+ return PROPERTIES;
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/UTF8StringNormalizedKeyComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/UTF8StringNormalizedKeyComputerFactory.java
index 589ae68..fefcebd 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/UTF8StringNormalizedKeyComputerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/UTF8StringNormalizedKeyComputerFactory.java
@@ -20,18 +20,44 @@
import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyProperties;
import org.apache.hyracks.util.string.UTF8StringUtil;
public class UTF8StringNormalizedKeyComputerFactory implements INormalizedKeyComputerFactory {
private static final long serialVersionUID = 1L;
+ public static final INormalizedKeyProperties PROPERTIES = new INormalizedKeyProperties() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public int getNormalizedKeyLength() {
+ // TODO optimize normalize key for UTF8 to use multiple integers
+ return 1;
+ }
+
+ @Override
+ public boolean isDecisive() {
+ return false;
+ }
+ };
+
@Override
public INormalizedKeyComputer createNormalizedKeyComputer() {
return new INormalizedKeyComputer() {
@Override
- public int normalize(byte[] bytes, int start, int length) {
- return UTF8StringUtil.normalize(bytes, start);
+ public void normalize(byte[] bytes, int start, int length, int[] normalizedKeys, int keyStart) {
+ normalizedKeys[keyStart] = UTF8StringUtil.normalize(bytes, start);
+ }
+
+ @Override
+ public INormalizedKeyProperties getNormalizedKeyProperties() {
+ return PROPERTIES;
}
};
}
+
+ @Override
+ public INormalizedKeyProperties getNormalizedKeyProperties() {
+ return PROPERTIES;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/NormalizedKeyUtils.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/NormalizedKeyUtils.java
new file mode 100644
index 0000000..175b04a
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/NormalizedKeyUtils.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.common.utils;
+
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+
+public class NormalizedKeyUtils {
+
+ private NormalizedKeyUtils() {
+ }
+
+ public static int compareNormalizeKeys(int[] keys1, int start1, int[] keys2, int start2, int length) {
+ for (int i = 0; i < length; i++) {
+ int key1 = keys1[start1 + i];
+ int key2 = keys2[start2 + i];
+ if (key1 != key2) {
+ return (((key1) & 0xffffffffL) < ((key2) & 0xffffffffL)) ? -1 : 1;
+ }
+ }
+ return 0;
+ }
+
+ public static int getDecisivePrefixLength(INormalizedKeyComputerFactory[] keyNormalizerFactories) {
+ if (keyNormalizerFactories == null) {
+ return 0;
+ }
+ for (int i = 0; i < keyNormalizerFactories.length; i++) {
+ if (!keyNormalizerFactories[i].getNormalizedKeyProperties().isDecisive()) {
+ return i;
+ }
+ }
+ return keyNormalizerFactories.length;
+ }
+
+ public static void putLongIntoNormalizedKeys(int[] normalizedKeys, int keyStart, long key) {
+ int high = (int) (key >> 32);
+ normalizedKeys[keyStart] = high;
+ int low = (int) key;
+ normalizedKeys[keyStart + 1] = low;
+ }
+
+ public static int[] createNormalizedKeyArray(INormalizedKeyComputer normalizer) {
+ if (normalizer == null) {
+ return null; //NOSONAR
+ }
+ return new int[normalizer.getNormalizedKeyProperties().getNormalizedKeyLength()];
+ }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/AbstractNormalizedKeyComputerFactoryTest.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/AbstractNormalizedKeyComputerFactoryTest.java
new file mode 100644
index 0000000..2107574
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/AbstractNormalizedKeyComputerFactoryTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.common.data.normalizers;
+
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.dataflow.common.utils.NormalizedKeyUtils;
+import org.junit.Test;
+
+import junit.framework.Assert;
+
+public abstract class AbstractNormalizedKeyComputerFactoryTest {
+
+ @Test
+ public void testPositive() {
+ IPointable large = getLargePositive();
+ IPointable small = getSmallPositive();
+
+ normalizeAndCompare(large, small, 1);
+ normalizeAndCompare(small, large, -1);
+ normalizeAndCompare(large, large, 0);
+ }
+
+ @Test
+ public void testNegative() {
+ IPointable small = getSmallNegative();
+ IPointable large = getLargeNegative();
+
+ normalizeAndCompare(large, small, 1);
+ normalizeAndCompare(small, large, -1);
+ normalizeAndCompare(large, large, 0);
+ }
+
+ @Test
+ public void testPositiveAndNegative() {
+ IPointable smallNegative = getSmallNegative();
+ IPointable largeNegative = getLargeNegative();
+ IPointable smallPositive = getSmallPositive();
+ IPointable largePositive = getLargePositive();
+
+ normalizeAndCompare(smallNegative, smallPositive, -1);
+ normalizeAndCompare(smallNegative, largePositive, -1);
+ normalizeAndCompare(largeNegative, smallPositive, -1);
+ normalizeAndCompare(largeNegative, largePositive, -1);
+ }
+
+ protected void normalizeAndCompare(IPointable p1, IPointable p2, int result) {
+ int[] key1 = normalize(p1);
+ int[] key2 = normalize(p2);
+ int comp = NormalizedKeyUtils.compareNormalizeKeys(key1, 0, key2, 0, key1.length);
+ if (result > 0) {
+ Assert.assertTrue(comp > 0);
+ } else if (result == 0) {
+ Assert.assertTrue(comp == 0);
+ } else {
+ Assert.assertTrue(comp < 0);
+ }
+ }
+
+ protected abstract IPointable getLargePositive();
+
+ protected abstract IPointable getSmallPositive();
+
+ protected abstract IPointable getLargeNegative();
+
+ protected abstract IPointable getSmallNegative();
+
+ protected abstract int[] normalize(IPointable value);
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/ByteArrayNormalizedKeyComputerFactoryTest.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/ByteArrayNormalizedKeyComputerFactoryTest.java
index a362e60..675bf36 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/ByteArrayNormalizedKeyComputerFactoryTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/ByteArrayNormalizedKeyComputerFactoryTest.java
@@ -19,7 +19,7 @@
package org.apache.hyracks.dataflow.common.data.normalizers;
-import static junit.framework.Assert.assertTrue;
+import static junit.framework.Assert.*;
import java.util.Random;
@@ -36,10 +36,10 @@
@Test
public void testRandomNormalizedKey() {
for (int i = 0; i < 10; ++i) {
- ByteArrayPointable pointable1 = generateRandomByteArrayPointableWithFixLength(
- Math.abs(random.nextInt((i + 1) * 10)), random);
- ByteArrayPointable pointable2 = generateRandomByteArrayPointableWithFixLength(
- Math.abs(random.nextInt((i + 1) * 10)), random);
+ ByteArrayPointable pointable1 =
+ generateRandomByteArrayPointableWithFixLength(Math.abs(random.nextInt((i + 1) * 10)), random);
+ ByteArrayPointable pointable2 =
+ generateRandomByteArrayPointableWithFixLength(Math.abs(random.nextInt((i + 1) * 10)), random);
assertNormalizeValue(pointable1, pointable2, computer);
}
}
@@ -52,11 +52,13 @@
public static void assertNormalizeValue(ByteArrayPointable pointable1, ByteArrayPointable pointable2,
INormalizedKeyComputer computer) {
- int n1 = computer.normalize(pointable1.getByteArray(), pointable1.getStartOffset(), pointable1.getLength());
- int n2 = computer.normalize(pointable2.getByteArray(), pointable2.getStartOffset(), pointable2.getLength());
- if (n1 < n2) {
+ int[] key1 = new int[ByteArrayNormalizedKeyComputerFactory.PROPERTIES.getNormalizedKeyLength()];
+ int[] key2 = new int[ByteArrayNormalizedKeyComputerFactory.PROPERTIES.getNormalizedKeyLength()];
+ computer.normalize(pointable1.getByteArray(), pointable1.getStartOffset(), pointable1.getLength(), key1, 0);
+ computer.normalize(pointable2.getByteArray(), pointable2.getStartOffset(), pointable2.getLength(), key2, 0);
+ if (key1[0] < key2[0]) {
assertTrue(pointable1.compareTo(pointable2) < 0);
- } else if (n1 > n2) {
+ } else if (key1[0] > key2[0]) {
assertTrue(pointable1.compareTo(pointable2) > 0);
}
}
@@ -70,12 +72,15 @@
}
ByteArrayPointable ptr1 = ByteArrayPointable.generatePointableFromPureBytes(new byte[] { 0, 25, 34, 42 });
- ByteArrayPointable ptr2 = ByteArrayPointable.generatePointableFromPureBytes(
- new byte[] { (byte) 130, 25, 34, 42 });
+ ByteArrayPointable ptr2 =
+ ByteArrayPointable.generatePointableFromPureBytes(new byte[] { (byte) 130, 25, 34, 42 });
- int n1 = computer.normalize(ptr1.getByteArray(), ptr1.getStartOffset(), ptr1.getLength());
- int n2 = computer.normalize(ptr2.getByteArray(), ptr2.getStartOffset(), ptr2.getLength());
- assertTrue(n1 < n2);
+ int[] key1 = new int[ByteArrayNormalizedKeyComputerFactory.PROPERTIES.getNormalizedKeyLength()];
+ int[] key2 = new int[ByteArrayNormalizedKeyComputerFactory.PROPERTIES.getNormalizedKeyLength()];
+ computer.normalize(ptr1.getByteArray(), ptr1.getStartOffset(), ptr1.getLength(), key1, 0);
+ computer.normalize(ptr2.getByteArray(), ptr2.getStartOffset(), ptr2.getLength(), key2, 0);
+
+ assertTrue(key1[0] < key2[0]);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/DoubleNormalizedKeyComputerFactoryTest.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/DoubleNormalizedKeyComputerFactoryTest.java
new file mode 100644
index 0000000..012ef07
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/DoubleNormalizedKeyComputerFactoryTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.common.data.normalizers;
+
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.DoublePointable;
+import org.apache.hyracks.dataflow.common.utils.NormalizedKeyUtils;
+
+public class DoubleNormalizedKeyComputerFactoryTest extends AbstractNormalizedKeyComputerFactoryTest {
+
+ private final INormalizedKeyComputer normalizer =
+ new DoubleNormalizedKeyComputerFactory().createNormalizedKeyComputer();
+
+ @Override
+ protected IPointable getLargePositive() {
+ return getDoublePointable(Double.MAX_VALUE - 1);
+ }
+
+ @Override
+ protected IPointable getSmallPositive() {
+ return getDoublePointable(Double.MIN_VALUE);
+ }
+
+ @Override
+ protected IPointable getLargeNegative() {
+ return getDoublePointable(Double.MIN_VALUE * -1);
+
+ }
+
+ @Override
+ protected IPointable getSmallNegative() {
+ return getDoublePointable(Double.MAX_VALUE * -1);
+ }
+
+ private IPointable getDoublePointable(double value) {
+ DoublePointable pointable = (DoublePointable) DoublePointable.FACTORY.createPointable();
+ pointable.set(new byte[Double.BYTES], 0, Double.BYTES);
+ pointable.setDouble(value);
+ return pointable;
+ }
+
+ @Override
+ protected int[] normalize(IPointable value) {
+ int[] keys = NormalizedKeyUtils.createNormalizedKeyArray(normalizer);
+ normalizer.normalize(value.getByteArray(), value.getStartOffset(), value.getLength(), keys, 0);
+ return keys;
+ }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/FloatNormalizedKeyComputerFactoryTest.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/FloatNormalizedKeyComputerFactoryTest.java
new file mode 100644
index 0000000..cd6f4c4
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/FloatNormalizedKeyComputerFactoryTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.common.data.normalizers;
+
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.FloatPointable;
+import org.apache.hyracks.dataflow.common.utils.NormalizedKeyUtils;
+
+public class FloatNormalizedKeyComputerFactoryTest extends AbstractNormalizedKeyComputerFactoryTest {
+
+ private final INormalizedKeyComputer normalizer =
+ new FloatNormalizedKeyComputerFactory().createNormalizedKeyComputer();
+
+ @Override
+ protected IPointable getLargePositive() {
+ return getFloatPointable(Float.MAX_VALUE - 1);
+ }
+
+ @Override
+ protected IPointable getSmallPositive() {
+ return getFloatPointable(Float.MIN_VALUE);
+ }
+
+ @Override
+ protected IPointable getLargeNegative() {
+ return getFloatPointable(Float.MIN_VALUE * -1);
+
+ }
+
+ @Override
+ protected IPointable getSmallNegative() {
+ return getFloatPointable(Float.MAX_VALUE * -1);
+ }
+
+ private IPointable getFloatPointable(float value) {
+ FloatPointable pointable = (FloatPointable) FloatPointable.FACTORY.createPointable();
+ pointable.set(new byte[Float.BYTES], 0, Float.BYTES);
+ pointable.setFloat(value);
+ return pointable;
+ }
+
+ @Override
+ protected int[] normalize(IPointable value) {
+ int[] keys = NormalizedKeyUtils.createNormalizedKeyArray(normalizer);
+ normalizer.normalize(value.getByteArray(), value.getStartOffset(), value.getLength(), keys, 0);
+ return keys;
+ }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/Integer64NormalizedKeyComputerFactoryTest.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/Integer64NormalizedKeyComputerFactoryTest.java
new file mode 100644
index 0000000..6ca623d
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/Integer64NormalizedKeyComputerFactoryTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.common.data.normalizers;
+
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.LongPointable;
+import org.apache.hyracks.dataflow.common.utils.NormalizedKeyUtils;
+
+public class Integer64NormalizedKeyComputerFactoryTest extends AbstractNormalizedKeyComputerFactoryTest {
+
+ private final INormalizedKeyComputer normalizer =
+ new Integer64NormalizedKeyComputerFactory().createNormalizedKeyComputer();
+
+ @Override
+ protected IPointable getLargePositive() {
+ return getLongPointable(Long.MAX_VALUE - 1);
+ }
+
+ @Override
+ protected IPointable getSmallPositive() {
+ return getLongPointable(1);
+ }
+
+ @Override
+ protected IPointable getLargeNegative() {
+ return getLongPointable(-1);
+ }
+
+ @Override
+ protected IPointable getSmallNegative() {
+ return getLongPointable(Long.MIN_VALUE + 1);
+ }
+
+ private IPointable getLongPointable(long value) {
+ LongPointable pointable = LongPointable.FACTORY.createPointable();
+ pointable.set(new byte[Long.BYTES], 0, Long.BYTES);
+ pointable.setLong(value);
+ return pointable;
+ }
+
+ @Override
+ protected int[] normalize(IPointable value) {
+ int[] keys = NormalizedKeyUtils.createNormalizedKeyArray(normalizer);
+ normalizer.normalize(value.getByteArray(), value.getStartOffset(), value.getLength(), keys, 0);
+ return keys;
+ }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/IntegerNormalizedKeyComputerFactoryTest.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/IntegerNormalizedKeyComputerFactoryTest.java
new file mode 100644
index 0000000..9122721
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/IntegerNormalizedKeyComputerFactoryTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.common.data.normalizers;
+
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.IntegerPointable;
+import org.apache.hyracks.dataflow.common.utils.NormalizedKeyUtils;
+
+public class IntegerNormalizedKeyComputerFactoryTest extends AbstractNormalizedKeyComputerFactoryTest {
+
+ private final INormalizedKeyComputer normalizer =
+ new IntegerNormalizedKeyComputerFactory().createNormalizedKeyComputer();
+
+ @Override
+ protected IPointable getLargePositive() {
+ return getIntPointable(Integer.MAX_VALUE - 1);
+ }
+
+ @Override
+ protected IPointable getSmallPositive() {
+ return getIntPointable(1);
+ }
+
+ @Override
+ protected IPointable getLargeNegative() {
+ return getIntPointable(-1);
+
+ }
+
+ @Override
+ protected IPointable getSmallNegative() {
+ return getIntPointable(Integer.MIN_VALUE + 1);
+ }
+
+ private IPointable getIntPointable(int value) {
+ IntegerPointable pointable = (IntegerPointable) IntegerPointable.FACTORY.createPointable();
+ pointable.set(new byte[Integer.BYTES], 0, Integer.BYTES);
+ pointable.setInteger(value);
+ return pointable;
+ }
+
+ @Override
+ protected int[] normalize(IPointable value) {
+ int[] keys = NormalizedKeyUtils.createNormalizedKeyArray(normalizer);
+ normalizer.normalize(value.getByteArray(), value.getStartOffset(), value.getLength(), keys, 0);
+ return keys;
+ }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/intersect/IntersectOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/intersect/IntersectOperatorDescriptor.java
index 5cc854c..10cc954 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/intersect/IntersectOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/intersect/IntersectOperatorDescriptor.java
@@ -42,6 +42,7 @@
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.common.utils.NormalizedKeyUtils;
import org.apache.hyracks.dataflow.std.base.AbstractActivityNode;
import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputOperatorNodePushable;
@@ -61,12 +62,17 @@
/**
* @param spec
- * @param nInputs Number of inputs
- * @param compareFields The compare field list of each input.
- * All the fields order should be the same with the comparatorFactories
- * @param extraFields Extra field that
- * @param firstKeyNormalizerFactory Normalizer for the first comparison key.
- * @param comparatorFactories A list of comparators for each field
+ * @param nInputs
+ * Number of inputs
+ * @param compareFields
+ * The compare field list of each input.
+ * All the fields order should be the same with the comparatorFactories
+ * @param extraFields
+ * Extra field that
+ * @param firstKeyNormalizerFactory
+ * Normalizer for the first comparison key.
+ * @param comparatorFactories
+ * A list of comparators for each field
* @param recordDescriptor
* @throws HyracksException
*/
@@ -147,7 +153,10 @@
public static class IntersectOperatorNodePushable extends AbstractUnaryOutputOperatorNodePushable {
- private enum ACTION {FAILED, CLOSE}
+ private enum ACTION {
+ FAILED,
+ CLOSE
+ }
private final int inputArity;
private final int[][] compareFields;
@@ -158,6 +167,7 @@
private final FrameTupleAppender appender;
private final INormalizedKeyComputer firstKeyNormalizerComputer;
+ private final boolean normalizedKeyDecisive;
private final IBinaryComparator[] comparators;
private boolean done = false;
@@ -186,8 +196,12 @@
}
this.allProjectFields = projectedFields;
this.firstKeyNormalizerComputer =
- firstKeyNormalizerFactory == null ? null : firstKeyNormalizerFactory.createNormalizedKeyComputer();
-
+ firstKeyNormalizerFactory != null ? firstKeyNormalizerFactory.createNormalizedKeyComputer() : null;
+ this.normalizedKeyDecisive =
+ firstKeyNormalizerFactory != null
+ ? firstKeyNormalizerFactory.getNormalizedKeyProperties().isDecisive()
+ && compareFields[0].length == 1
+ : false;
comparators = new IBinaryComparator[compareFields[0].length];
for (int i = 0; i < comparators.length; i++) {
comparators[i] = comparatorFactory[i].createBinaryComparator();
@@ -213,6 +227,11 @@
@Override
public IFrameWriter getInputFrameWriter(final int index) {
return new IFrameWriter() {
+ private final int[] normalizedKey1 =
+ NormalizedKeyUtils.createNormalizedKeyArray(firstKeyNormalizerComputer);
+ private final int[] normalizedKey2 =
+ NormalizedKeyUtils.createNormalizedKeyArray(firstKeyNormalizerComputer);
+
@Override
public void open() throws HyracksDataException {
if (index == 0) {
@@ -273,9 +292,8 @@
continue;
}
while (tupleIndexMarker[i] < refAccessor[i].getTupleCount()) {
- int cmp =
- compare(i, refAccessor[i], tupleIndexMarker[i], maxInput, refAccessor[maxInput],
- tupleIndexMarker[maxInput]);
+ int cmp = compare(i, refAccessor[i], tupleIndexMarker[i], maxInput,
+ refAccessor[maxInput], tupleIndexMarker[maxInput]);
if (cmp == 0) {
match++;
break;
@@ -313,13 +331,14 @@
private int compare(int input1, FrameTupleAccessor frameTupleAccessor1, int tid1, int input2,
FrameTupleAccessor frameTupleAccessor2, int tid2) throws HyracksDataException {
- int firstNorm1 = getFirstNorm(input1, frameTupleAccessor1, tid1);
- int firstNorm2 = getFirstNorm(input2, frameTupleAccessor2, tid2);
-
- if (firstNorm1 < firstNorm2) {
- return -1;
- } else if (firstNorm1 > firstNorm2) {
- return 1;
+ if (firstKeyNormalizerComputer != null) {
+ getFirstNorm(input1, frameTupleAccessor1, tid1, normalizedKey1);
+ getFirstNorm(input2, frameTupleAccessor2, tid2, normalizedKey2);
+ int cmp = NormalizedKeyUtils.compareNormalizeKeys(normalizedKey1, 0, normalizedKey2, 0,
+ normalizedKey1.length);
+ if (cmp != 0 || normalizedKeyDecisive) {
+ return cmp;
+ }
}
for (int i = 0; i < comparators.length; i++) {
@@ -337,12 +356,12 @@
return 0;
}
- private int getFirstNorm(int inputId1, FrameTupleAccessor frameTupleAccessor1, int tid1) {
- return firstKeyNormalizerComputer == null ?
- 0 :
- firstKeyNormalizerComputer.normalize(frameTupleAccessor1.getBuffer().array(),
- frameTupleAccessor1.getAbsoluteFieldStartOffset(tid1, compareFields[inputId1][0]),
- frameTupleAccessor1.getFieldLength(tid1, compareFields[inputId1][0]));
+ private void getFirstNorm(int inputId1, FrameTupleAccessor frameTupleAccessor1, int tid1, int[] keys) {
+ if (firstKeyNormalizerComputer != null) {
+ firstKeyNormalizerComputer.normalize(frameTupleAccessor1.getBuffer().array(),
+ frameTupleAccessor1.getAbsoluteFieldStartOffset(tid1, compareFields[inputId1][0]),
+ frameTupleAccessor1.getFieldLength(tid1, compareFields[inputId1][0]), keys, 0);
+ }
}
private int findMaxInput() throws HyracksDataException {
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java
index eead09e..f51271e 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java
@@ -35,6 +35,7 @@
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.common.utils.NormalizedKeyUtils;
import org.apache.hyracks.dataflow.std.buffermanager.BufferInfo;
import org.apache.hyracks.dataflow.std.buffermanager.IFrameBufferManager;
import org.apache.hyracks.dataflow.std.buffermanager.VariableFramePool;
@@ -100,7 +101,7 @@
int runningNormalizedKeyTotalLength = 0;
if (normalizedKeyComputerFactories != null) {
- int decisivePrefixLength = getDecisivePrefixLength(normalizedKeyComputerFactories);
+ int decisivePrefixLength = NormalizedKeyUtils.getDecisivePrefixLength(normalizedKeyComputerFactories);
// we only take a prefix of the decisive normalized keys, plus at most indecisive normalized keys
// ideally, the caller should prepare normalizers in this way, but we just guard here to avoid
@@ -112,7 +113,8 @@
for (int i = 0; i < normalizedKeys; i++) {
this.nkcs[i] = normalizedKeyComputerFactories[i].createNormalizedKeyComputer();
- this.normalizedKeyLength[i] = normalizedKeyComputerFactories[i].getNormalizedKeyLength();
+ this.normalizedKeyLength[i] =
+ normalizedKeyComputerFactories[i].getNormalizedKeyProperties().getNormalizedKeyLength();
runningNormalizedKeyTotalLength += this.normalizedKeyLength[i];
}
this.normalizedKeysDecisive = decisivePrefixLength == comparatorFactories.length;
@@ -246,8 +248,9 @@
protected final int compare(int[] tPointers1, int tp1, int[] tPointers2, int tp2) throws HyracksDataException {
if (nkcs != null) {
- int cmpNormalizedKey = compareNormalizeKeys(tPointers1, tp1 * ptrSize + ID_NORMALIZED_KEY, tPointers2,
- tp2 * ptrSize + ID_NORMALIZED_KEY, normalizedKeyTotalLength);
+ int cmpNormalizedKey =
+ NormalizedKeyUtils.compareNormalizeKeys(tPointers1, tp1 * ptrSize + ID_NORMALIZED_KEY, tPointers2,
+ tp2 * ptrSize + ID_NORMALIZED_KEY, normalizedKeyTotalLength);
if (cmpNormalizedKey != 0 || normalizedKeysDecisive) {
return cmpNormalizedKey;
}
@@ -283,29 +286,6 @@
return 0;
}
- public static int compareNormalizeKeys(int[] keys1, int start1, int[] keys2, int start2, int length) {
- for (int i = 0; i < length; i++) {
- int key1 = keys1[start1 + i];
- int key2 = keys2[start2 + i];
- if (key1 != key2) {
- return (((key1) & 0xffffffffL) < ((key2) & 0xffffffffL)) ? -1 : 1;
- }
- }
- return 0;
- }
-
- public static int getDecisivePrefixLength(INormalizedKeyComputerFactory[] keyNormalizerFactories) {
- if (keyNormalizerFactories == null) {
- return 0;
- }
- for (int i = 0; i < keyNormalizerFactories.length; i++) {
- if (!keyNormalizerFactories[i].isDecisive()) {
- return i;
- }
- }
- return keyNormalizerFactories.length;
- }
-
protected void swap(int pointers1[], int pos1, int pointers2[], int pos2) {
System.arraycopy(pointers1, pos1 * ptrSize, tmpPointer, 0, ptrSize);
System.arraycopy(pointers2, pos2 * ptrSize, pointers1, pos1 * ptrSize, ptrSize);
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/RunMergingFrameReader.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/RunMergingFrameReader.java
index f001bed..3cbe86b 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/RunMergingFrameReader.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/RunMergingFrameReader.java
@@ -30,6 +30,7 @@
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.utils.NormalizedKeyUtils;
import org.apache.hyracks.dataflow.std.sort.util.GroupFrameAccessor;
import org.apache.hyracks.dataflow.std.util.ReferenceEntry;
import org.apache.hyracks.dataflow.std.util.ReferencedPriorityQueue;
@@ -41,6 +42,8 @@
private final int[] sortFields;
private final IBinaryComparator[] comparators;
private final INormalizedKeyComputer nmkComputer;
+ private final int normalizedKeyLength;
+ private final boolean normalizedKeyDecisive;
private final RecordDescriptor recordDesc;
private final int topK;
private int tupleCount;
@@ -64,6 +67,14 @@
this.sortFields = sortFields;
this.comparators = comparators;
this.nmkComputer = nmkComputer;
+ this.normalizedKeyLength =
+ nmkComputer != null ? nmkComputer.getNormalizedKeyProperties().getNormalizedKeyLength() : 0;
+ // right now we didn't take multiple key normalizers for frame merger, since during this step it won't be
+ // too many cache misses (merging multiple runs sequentially).
+ // but still, we can apply a special optimization if there is only 1 sort field
+ this.normalizedKeyDecisive =
+ nmkComputer != null ? nmkComputer.getNormalizedKeyProperties().isDecisive() && comparators.length == 1
+ : false;
this.recordDesc = recordDesc;
this.topK = topK;
}
@@ -153,8 +164,7 @@
}
private static void closeRun(int index, List<? extends IFrameReader> runCursors,
- IFrameTupleAccessor[] tupleAccessors)
- throws HyracksDataException {
+ IFrameTupleAccessor[] tupleAccessors) throws HyracksDataException {
if (runCursors.get(index) != null) {
runCursors.get(index).close();
runCursors.set(index, null);
@@ -164,32 +174,39 @@
private Comparator<ReferenceEntry> createEntryComparator(final IBinaryComparator[] comparators) {
return new Comparator<ReferenceEntry>() {
+ @Override
public int compare(ReferenceEntry tp1, ReferenceEntry tp2) {
- int nmk1 = tp1.getNormalizedKey();
- int nmk2 = tp2.getNormalizedKey();
- if (nmk1 != nmk2) {
- return ((((long) nmk1) & 0xffffffffL) < (((long) nmk2) & 0xffffffffL)) ? -1 : 1;
+ int[] tPointers1 = tp1.getTPointers();
+ int[] tPointers2 = tp2.getTPointers();
+ int cmp = NormalizedKeyUtils.compareNormalizeKeys(tPointers1, 0, tPointers2, 0, normalizedKeyLength);
+ if (cmp != 0) {
+ return cmp;
+ } else if (normalizedKeyDecisive) {
+ // we further compare the run id
+ return compareRun(tp1, tp2);
}
IFrameTupleAccessor fta1 = tp1.getAccessor();
IFrameTupleAccessor fta2 = tp2.getAccessor();
byte[] b1 = fta1.getBuffer().array();
byte[] b2 = fta2.getBuffer().array();
- int[] tPointers1 = tp1.getTPointers();
- int[] tPointers2 = tp2.getTPointers();
for (int f = 0; f < sortFields.length; ++f) {
int c;
try {
- c = comparators[f].compare(b1, tPointers1[2 * f + 1], tPointers1[2 * f + 2], b2,
- tPointers2[2 * f + 1], tPointers2[2 * f + 2]);
+ c = comparators[f].compare(b1, tPointers1[2 * f + normalizedKeyLength],
+ tPointers1[2 * f + normalizedKeyLength + 1], b2,
+ tPointers2[2 * f + normalizedKeyLength], tPointers2[2 * f + normalizedKeyLength + 1]);
if (c != 0) {
return c;
}
} catch (HyracksDataException e) {
throw new IllegalArgumentException(e);
}
-
}
+ return compareRun(tp1, tp2);
+ }
+
+ private int compareRun(ReferenceEntry tp1, ReferenceEntry tp2) {
int runid1 = tp1.getRunid();
int runid2 = tp2.getRunid();
return runid1 < runid2 ? -1 : (runid1 == runid2 ? 0 : 1);
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java
index b02f859..46d62a7 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java
@@ -34,6 +34,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.common.utils.NormalizedKeyUtils;
import org.apache.hyracks.dataflow.std.buffermanager.IDeletableTupleBufferManager;
import org.apache.hyracks.dataflow.std.buffermanager.ITuplePointerAccessor;
import org.apache.hyracks.dataflow.std.structures.IResetableComparable;
@@ -65,7 +66,7 @@
@Override
public int compareTo(HeapEntry o) {
- int cmpNormalizedKey = AbstractFrameSorter.compareNormalizeKeys(nmk, 0, o.nmk, 0, normalizedKeyTotalLength);
+ int cmpNormalizedKey = NormalizedKeyUtils.compareNormalizeKeys(nmk, 0, o.nmk, 0, normalizedKeyTotalLength);
if (cmpNormalizedKey != 0 || normalizedKeyDecisive) {
return cmpNormalizedKey;
}
@@ -141,7 +142,7 @@
int runningNormalizedKeyTotalLength = 0;
if (keyNormalizerFactories != null) {
- int decisivePrefixLength = AbstractFrameSorter.getDecisivePrefixLength(keyNormalizerFactories);
+ int decisivePrefixLength = NormalizedKeyUtils.getDecisivePrefixLength(keyNormalizerFactories);
// we only take a prefix of the decisive normalized keys, plus at most indecisive normalized keys
// ideally, the caller should prepare normalizers in this way, but we just guard here to avoid
@@ -153,7 +154,8 @@
for (int i = 0; i < normalizedKeys; i++) {
this.nkcs[i] = keyNormalizerFactories[i].createNormalizedKeyComputer();
- this.normalizedKeyLength[i] = keyNormalizerFactories[i].getNormalizedKeyLength();
+ this.normalizedKeyLength[i] =
+ keyNormalizerFactories[i].getNormalizedKeyProperties().getNormalizedKeyLength();
runningNormalizedKeyTotalLength += this.normalizedKeyLength[i];
}
this.normalizedKeyDecisive = decisivePrefixLength == comparatorFactories.length;
@@ -225,7 +227,7 @@
private int compareTuple(IFrameTupleAccessor frameTupleAccessor, int tid, int[] nmkey, HeapEntry maxEntry)
throws HyracksDataException {
int cmpNormalizedKey =
- AbstractFrameSorter.compareNormalizeKeys(nmkey, 0, maxEntry.nmk, 0, normalizedKeyTotalLength);
+ NormalizedKeyUtils.compareNormalizeKeys(nmkey, 0, maxEntry.nmk, 0, normalizedKeyTotalLength);
if (cmpNormalizedKey != 0 || normalizedKeyDecisive) {
return cmpNormalizedKey;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/util/ReferenceEntry.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/util/ReferenceEntry.java
index 8eb5c2e..4fa4fab 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/util/ReferenceEntry.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/util/ReferenceEntry.java
@@ -26,14 +26,17 @@
private final int runid;
private IFrameTupleAccessor acccessor;
private int tupleIndex;
- private int[] tPointers;
+ private final int[] tPointers;
+ private final int normalizedKeyLength;
public ReferenceEntry(int runid, FrameTupleAccessor fta, int tupleIndex, int[] keyFields,
INormalizedKeyComputer nmkComputer) {
super();
this.runid = runid;
this.acccessor = fta;
- this.tPointers = new int[1 + 2 * keyFields.length];
+ this.normalizedKeyLength =
+ nmkComputer != null ? nmkComputer.getNormalizedKeyProperties().getNormalizedKeyLength() : 0;
+ this.tPointers = new int[normalizedKeyLength + 2 * keyFields.length];
if (fta != null) {
initTPointer(fta, tupleIndex, keyFields, nmkComputer);
}
@@ -59,10 +62,6 @@
return tupleIndex;
}
- public int getNormalizedKey() {
- return tPointers[0];
- }
-
public void setTupleIndex(int tupleIndex, int[] keyFields, INormalizedKeyComputer nmkComputer) {
initTPointer(acccessor, tupleIndex, keyFields, nmkComputer);
}
@@ -73,14 +72,11 @@
byte[] b1 = fta.getBuffer().array();
for (int f = 0; f < keyFields.length; ++f) {
int fIdx = keyFields[f];
- tPointers[2 * f + 1] = fta.getAbsoluteFieldStartOffset(tupleIndex, fIdx);
- tPointers[2 * f + 2] = fta.getFieldLength(tupleIndex, fIdx);
- if (f == 0) {
- if (nmkComputer != null) {
- tPointers[0] = nmkComputer.normalize(b1, tPointers[1], tPointers[2]);
- } else {
- tPointers[0] = 0;
- }
+ tPointers[2 * f + normalizedKeyLength] = fta.getAbsoluteFieldStartOffset(tupleIndex, fIdx);
+ tPointers[2 * f + normalizedKeyLength + 1] = fta.getFieldLength(tupleIndex, fIdx);
+ if (f == 0 && nmkComputer != null) {
+ nmkComputer.normalize(b1, tPointers[normalizedKeyLength], tPointers[normalizedKeyLength + 1], tPointers,
+ 0);
}
}
}