address Vinayak's major comment

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_opentype@301 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/test/resources/runtimets/queries/dml/opentype-c2o.aql b/asterix-app/src/test/resources/runtimets/queries/dml/opentype-c2o.aql
index 73511a7..e29ccdc 100644
--- a/asterix-app/src/test/resources/runtimets/queries/dml/opentype-c2o.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/dml/opentype-c2o.aql
@@ -28,15 +28,15 @@
 create dataset testds2(testtype2) partitioned by key id;
  
 insert into dataset testds (
-{ "id": "001", "name": "Person Three", "hobby": "music"}
+{ "hobby": "music", "id": "001", "name": "Person Three"}
 );
 
 insert into dataset testds (
-{ "id": "002", "name": "Person One", "hobby": "sports"}
+{ "name": "Person One", "id": "002", "hobby": "sports"}
 );
 
 insert into dataset testds (
-{ "id": "003", "name": "Person Two", "hobby": "movie"}
+{ "id": "003", "hobby": "movie", "name": "Person Two"}
 );
 
 insert into dataset testds (
@@ -44,7 +44,7 @@
 );
  
 insert into dataset testds (
-{ "id": "005", "name": "Person Five"}
+{ "name": "Person Five", "id": "005"}
 ); 
  
 insert into dataset testds2 (
diff --git a/asterix-app/src/test/resources/runtimets/queries/dml/opentype-o2c.aql b/asterix-app/src/test/resources/runtimets/queries/dml/opentype-o2c.aql
index 927c1f0..a6ed9cc 100644
--- a/asterix-app/src/test/resources/runtimets/queries/dml/opentype-o2c.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/dml/opentype-o2c.aql
@@ -16,9 +16,9 @@
 }
 
 create type testtype2 as closed {
+  hobby: string?,
   id: string,
-  name: string,
-  hobby: string?
+  name: string
 }
 
 create dataset testds(testtype) partitioned by key id;
@@ -26,7 +26,7 @@
 create dataset testds2(testtype2) partitioned by key id; 
  
 insert into dataset testds (
-{ "id": "001", "name": "Person Three", "hobby": "music"}
+{ "id": "001",  "hobby": "music", "name": "Person Three"}
 );
 
 insert into dataset testds (
@@ -38,7 +38,7 @@
 );
 
 insert into dataset testds (
-{ "id": "004", "name": "Person Three", "hobby": "swimming"}
+{ "name": "Person Three", "hobby": "swimming", "id": "004"}
 );
 
 insert into dataset testds (
diff --git a/asterix-app/src/test/resources/runtimets/queries/dml/opentype-o2o.aql b/asterix-app/src/test/resources/runtimets/queries/dml/opentype-o2o.aql
index 0ed0096..545790b 100644
--- a/asterix-app/src/test/resources/runtimets/queries/dml/opentype-o2o.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/dml/opentype-o2o.aql
@@ -30,7 +30,7 @@
 );
 
 insert into dataset testds (
-{ "name": "Person Two", "id": "002",  "hobby": "football", "city":"irvine"}
+{ "hobby": "football", "city":"irvine", "name": "Person Two", "id": "002"}
 );
 
 insert into dataset testds (
@@ -38,7 +38,7 @@
 );
 
 insert into dataset testds (
-{ "name": "Person Four", "id": "004",  "hobby": "swimming", "phone":"102-304-506"}
+{ "hobby": "swimming", "name": "Person Four", "id": "004", "phone":"102-304-506"}
 );
 
 insert into dataset testds2 (
diff --git a/asterix-app/src/test/resources/runtimets/results/dml/opentype-o2c.adm b/asterix-app/src/test/resources/runtimets/results/dml/opentype-o2c.adm
index 34023cd..d9c778b 100644
--- a/asterix-app/src/test/resources/runtimets/results/dml/opentype-o2c.adm
+++ b/asterix-app/src/test/resources/runtimets/results/dml/opentype-o2c.adm
@@ -1,5 +1,5 @@
-{ "id": "001", "name": "Person Three", "hobby": "music" }
-{ "id": "002", "name": "Person Three", "hobby": "football" }
-{ "id": "003", "name": "Person Three", "hobby": "movie" }
-{ "id": "004", "name": "Person Three", "hobby": "swimming" }
-{ "id": "005", "name": "Person Five", "hobby": null }
+{ "hobby": "music", "id": "001", "name": "Person Three" }
+{ "hobby": "football", "id": "002", "name": "Person Three" }
+{ "hobby": "movie", "id": "003", "name": "Person Three" }
+{ "hobby": "swimming", "id": "004", "name": "Person Three" }
+{ "hobby": null, "id": "005", "name": "Person Five" }
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/util/ARecordCaster.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/util/ARecordCaster.java
index d3f03de..bb7d5e3 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/util/ARecordCaster.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/util/ARecordCaster.java
@@ -28,18 +28,24 @@
 import edu.uci.ics.asterix.om.types.AUnionType;
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.asterix.om.util.NonTaggedFormatUtil;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IValueReference;
 
 public class ARecordCaster {
 
     // describe closed fields in the required type
     private int[] fieldPermutation;
+    private boolean[] optionalFields;
 
     // describe fields (open or not) in the input records
     private boolean[] openFields;
-    private boolean[] optionalFields;
+    private int[] fieldNamesSortedIndex;
 
     private List<SimpleValueReference> reqFieldNames = new ArrayList<SimpleValueReference>();
+    private int[] reqFieldNamesSortedIndex;
     private List<SimpleValueReference> reqFieldTypeTags = new ArrayList<SimpleValueReference>();
     private ARecordType cachedReqType = null;
 
@@ -52,6 +58,8 @@
     private SimpleValueReference nullTypeTag = new SimpleValueReference();
 
     private int numInputFields = 0;
+    private IBinaryComparator fieldNameComparator = PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY)
+            .createBinaryComparator();
 
     public ARecordCaster() {
         try {
@@ -78,6 +86,7 @@
 
         if (openFields == null || numInputFields > openFields.length) {
             openFields = new boolean[numInputFields];
+            fieldNamesSortedIndex = new int[numInputFields];
         }
         if (cachedReqType == null || !reqType.equals(cachedReqType)) {
             loadRequiredType(reqType);
@@ -94,6 +103,8 @@
             openFields[i] = true;
         for (int i = 0; i < fieldPermutation.length; i++)
             fieldPermutation[i] = -1;
+        for (int i = 0; i < numInputFields; i++)
+            fieldNamesSortedIndex[i] = i;
     }
 
     private void loadRequiredType(ARecordType reqType) throws IOException {
@@ -138,62 +149,57 @@
             typeNamePointable.reset(buffer, nameStart, nameEnd - nameStart);
             reqFieldNames.add(typeNamePointable);
         }
+
+        reqFieldNamesSortedIndex = new int[reqFieldNames.size()];
+        for (int i = 0; i < reqFieldNamesSortedIndex.length; i++)
+            reqFieldNamesSortedIndex[i] = i;
+        // sort the field name index
+        quickSort(reqFieldNamesSortedIndex, reqFieldNames, 0, reqFieldNamesSortedIndex.length - 1);
     }
 
     private void matchClosedPart(List<SimpleValueReference> fieldNames, List<SimpleValueReference> fieldTypeTags,
             List<SimpleValueReference> fieldValues) {
-        // forward match: match from actual to required
-        boolean matched = false;
-        for (int i = 0; i < numInputFields; i++) {
-            SimpleValueReference fieldName = fieldNames.get(i);
-            SimpleValueReference fieldTypeTag = fieldTypeTags.get(i);
-            matched = false;
-            for (int j = 0; j < reqFieldNames.size(); j++) {
-                SimpleValueReference reqFieldName = reqFieldNames.get(j);
-                SimpleValueReference reqFieldTypeTag = reqFieldTypeTags.get(j);
-                if (fieldName.equals(reqFieldName) && fieldTypeTag.equals(reqFieldTypeTag)) {
-                    fieldPermutation[j] = i;
-                    openFields[i] = false;
-                    matched = true;
-                    break;
+        // sort-merge based match
+        quickSort(fieldNamesSortedIndex, fieldNames, 0, numInputFields-1);
+        int fnStart = 0;
+        int reqFnStart = 0;
+        while (fnStart < numInputFields && reqFnStart < reqFieldNames.size()) {
+            int fnPos = fieldNamesSortedIndex[fnStart];
+            int reqFnPos = reqFieldNamesSortedIndex[reqFnStart];
+            int c = compare(fieldNames.get(fnPos), reqFieldNames.get(reqFnPos));
+            if (c == 0) {
+                SimpleValueReference fieldTypeTag = fieldTypeTags.get(fnPos);
+                SimpleValueReference reqFieldTypeTag = reqFieldTypeTags.get(reqFnPos);
+                if (fieldTypeTag.equals(reqFieldTypeTag) || (
+                // match the null type of optional field
+                        optionalFields[reqFnPos] && fieldTypeTag.equals(nullTypeTag))) {
+                    fieldPermutation[reqFnPos] = fnPos;
+                    openFields[fnPos] = false;
                 }
+                fnStart++;
+                reqFnStart++;
             }
-            if (matched)
-                continue;
-            // the input has extra fields
-            if (!cachedReqType.isOpen())
+            if (c > 0)
+                reqFnStart++;
+            if (c < 0)
+                fnStart++;
+        }
+
+        // check unmatched fields in the input type
+        for (int i = 0; i < openFields.length; i++) {
+            if (openFields[i] == true && !cachedReqType.isOpen())
                 throw new IllegalStateException("type mismatch: including extra closed fields");
         }
 
-        // backward match: match from required to actual
-        for (int i = 0; i < reqFieldNames.size(); i++) {
-            SimpleValueReference reqFieldName = reqFieldNames.get(i);
-            SimpleValueReference reqFieldTypeTag = reqFieldTypeTags.get(i);
-            matched = false;
-            for (int j = 0; j < numInputFields; j++) {
-                SimpleValueReference fieldName = fieldNames.get(j);
-                SimpleValueReference fieldTypeTag = fieldTypeTags.get(j);
-                if (fieldName.equals(reqFieldName)) {
-                    if (fieldTypeTag.equals(reqFieldTypeTag)) {
-                        matched = true;
-                        break;
-                    }
-
-                    // match the null type of optional field
-                    if (optionalFields[i] && fieldTypeTag.equals(nullTypeTag)) {
-                        matched = true;
-                        break;
-                    }
+        // check unmatched fields in the required type
+        for (int i = 0; i < fieldPermutation.length; i++) {
+            if (fieldPermutation[i] < 0) {
+                IAType t = cachedReqType.getFieldTypes()[i];
+                if (!(t.getTypeTag() == ATypeTag.UNION && NonTaggedFormatUtil.isOptionalField((AUnionType) t))) {
+                    // no matched field in the input for a required closed field
+                    throw new IllegalStateException("type mismatch: miss a required closed field");
                 }
             }
-            if (matched)
-                continue;
-
-            IAType t = cachedReqType.getFieldTypes()[i];
-            if (!(t.getTypeTag() == ATypeTag.UNION && NonTaggedFormatUtil.isOptionalField((AUnionType) t))) {
-                // no matched field in the input for a required closed field
-                throw new IllegalStateException("type mismatch: miss a required closed field");
-            }
         }
     }
 
@@ -225,4 +231,44 @@
         }
         recBuilder.write(output, true);
     }
+
+    private void quickSort(int[] index, List<SimpleValueReference> names, int start, int end) {
+        if (end <= start)
+            return;
+        int i = partition(index, names, start, end);
+        quickSort(index, names, start, i - 1);
+        quickSort(index, names, i + 1, end);
+    }
+
+    private int partition(int[] index, List<SimpleValueReference> names, int left, int right) {
+        int i = left - 1;
+        int j = right;
+        while (true) {
+            // grow from the left
+            while (compare(names.get(index[++i]), names.get(index[right])) < 0)
+                ;
+            // lower from the right
+            while (compare(names.get(index[right]), names.get(index[--j])) < 0)
+                if (j == left)
+                    break;
+            if (i >= j)
+                break;
+            // swap i and j
+            swap(index, i, j);
+        }
+        // swap i and right
+        swap(index, i, right); // swap with partition element
+        return i;
+    }
+
+    private void swap(int[] array, int i, int j) {
+        int temp = array[i];
+        array[i] = array[j];
+        array[j] = temp;
+    }
+
+    private int compare(IValueReference a, IValueReference b) {
+        return fieldNameComparator.compare(a.getBytes(), a.getStartIndex() + 1, a.getLength() - 1, b.getBytes(),
+                b.getStartIndex() + 1, b.getLength() - 1);
+    }
 }