Changes to fix issue 802
commit fa829826d717123e415a994cc69e47c7ec1833f0
Author: Young-Seok <kisskys@gmail.com>
Date:   Thu Sep 25 10:09:20 2014 -0700

Change-Id: I8f6153e482e2079a0654263802ecd4c396291082
Reviewed-on: http://fulliautomatix.ics.uci.edu:8443/140
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Taewoo Kim <wangsaeu@gmail.com>
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index f8a43d0..516472d 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -48,6 +48,7 @@
 import edu.uci.ics.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory;
 import edu.uci.ics.asterix.formats.base.IDataFormat;
 import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
+import edu.uci.ics.asterix.formats.nontagged.AqlLinearizeComparatorFactoryProvider;
 import edu.uci.ics.asterix.formats.nontagged.AqlTypeTraitProvider;
 import edu.uci.ics.asterix.metadata.IDatasetDetails;
 import edu.uci.ics.asterix.metadata.MetadataException;
@@ -65,10 +66,10 @@
 import edu.uci.ics.asterix.metadata.entities.ExternalFile;
 import edu.uci.ics.asterix.metadata.entities.Feed;
 import edu.uci.ics.asterix.metadata.entities.FeedActivity;
-import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
 import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityDetails;
 import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
 import edu.uci.ics.asterix.metadata.entities.Index;
+import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
 import edu.uci.ics.asterix.metadata.external.IndexingConstants;
 import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies;
 import edu.uci.ics.asterix.metadata.feeds.EndFeedMessage;
@@ -170,9 +171,6 @@
 import edu.uci.ics.hyracks.storage.am.lsm.rtree.dataflow.LSMRTreeDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.rtree.dataflow.RTreeSearchOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.rtree.frames.RTreePolicyType;
-import edu.uci.ics.hyracks.storage.am.rtree.linearize.HilbertDoubleComparatorFactory;
-import edu.uci.ics.hyracks.storage.am.rtree.linearize.ZCurveDoubleComparatorFactory;
-import edu.uci.ics.hyracks.storage.am.rtree.linearize.ZCurveIntComparatorFactory;
 
 public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, String> {
     private static Logger LOGGER = Logger.getLogger(AqlMetadataProvider.class.getName());
@@ -1946,16 +1944,8 @@
 
     public static ILinearizeComparatorFactory proposeLinearizer(ATypeTag keyType, int numKeyFields)
             throws AlgebricksException {
-        if (numKeyFields / 2 == 2 && (keyType == ATypeTag.DOUBLE)) {
-            return new HilbertDoubleComparatorFactory(2);
-        } else if (keyType == ATypeTag.DOUBLE) {
-            return new ZCurveDoubleComparatorFactory(numKeyFields / 2);
-        } else if (keyType == ATypeTag.INT8 || keyType == ATypeTag.INT16 || keyType == ATypeTag.INT32
-                || keyType == ATypeTag.INT64) {
-            return new ZCurveIntComparatorFactory(numKeyFields / 2);
-        } else {
-            throw new AlgebricksException("Cannot propose linearizer for key with type " + keyType + ".");
-        }
+        return AqlLinearizeComparatorFactoryProvider.INSTANCE.getLinearizeComparatorFactory(keyType, true,
+                numKeyFields / 2);
     }
 
     /**
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/formats/nontagged/AqlLinearizeComparatorFactoryProvider.java b/asterix-om/src/main/java/edu/uci/ics/asterix/formats/nontagged/AqlLinearizeComparatorFactoryProvider.java
new file mode 100644
index 0000000..f331d34
--- /dev/null
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/formats/nontagged/AqlLinearizeComparatorFactoryProvider.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.formats.nontagged;
+
+import java.io.Serializable;
+
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.data.ILinearizeComparatorFactoryProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ILinearizeComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
+import edu.uci.ics.hyracks.storage.am.rtree.linearize.HilbertDoubleComparatorFactory;
+import edu.uci.ics.hyracks.storage.am.rtree.linearize.ZCurveDoubleComparatorFactory;
+import edu.uci.ics.hyracks.storage.am.rtree.linearize.ZCurveIntComparatorFactory;
+
+public class AqlLinearizeComparatorFactoryProvider implements ILinearizeComparatorFactoryProvider, Serializable {
+
+    private static final long serialVersionUID = 1L;
+    public static final AqlLinearizeComparatorFactoryProvider INSTANCE = new AqlLinearizeComparatorFactoryProvider();
+
+    private AqlLinearizeComparatorFactoryProvider() {
+    }
+
+    @Override
+    public ILinearizeComparatorFactory getLinearizeComparatorFactory(Object type, boolean ascending, int dimension)
+            throws AlgebricksException {
+        ATypeTag typeTag = (ATypeTag) type;
+
+        if (dimension == 2 && (typeTag == ATypeTag.DOUBLE)) {
+            return addOffset(new HilbertDoubleComparatorFactory(2), ascending);
+        } else if (typeTag == ATypeTag.DOUBLE) {
+            return addOffset(new ZCurveDoubleComparatorFactory(dimension), ascending);
+        } else if (typeTag == ATypeTag.INT8 || typeTag == ATypeTag.INT16 || typeTag == ATypeTag.INT32
+                || typeTag == ATypeTag.INT64) {
+            return addOffset(new ZCurveIntComparatorFactory(dimension), ascending);
+        } else {
+            throw new AlgebricksException("Cannot propose linearizer for key with type " + typeTag + " and dimension "
+                    + dimension + ".");
+        }
+    }
+
+    private ILinearizeComparatorFactory addOffset(final IBinaryComparatorFactory inst, final boolean ascending) {
+        return new ILinearizeComparatorFactory() {
+
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ILinearizeComparator createBinaryComparator() {
+                final ILinearizeComparator bc = (ILinearizeComparator) inst.createBinaryComparator();
+                final int dimension = bc.getDimensions();
+                if (ascending) {
+                    return new ILinearizeComparator() {
+
+                        @Override
+                        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+                            return bc.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+                        }
+
+                        @Override
+                        public int getDimensions() {
+                            // TODO Auto-generated method stub
+                            return dimension;
+                        }
+                    };
+                } else {
+                    return new ILinearizeComparator() {
+
+                        @Override
+                        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+                            return -bc.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+                        }
+
+                        @Override
+                        public int getDimensions() {
+                            // TODO Auto-generated method stub
+                            return dimension;
+                        }
+                    };
+                }
+            }
+        };
+    }
+}