[NO ISSUE][COMP] Honor select-list field order

- user model changes: no
- storage format changes: no
- interface changes: no

Ext-ref: MB-61971

Change-Id: Ia6e37080e581b92744ddd9090b291936513c75af
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18401
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/compiler/provider/SqlppCompilationProvider.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/compiler/provider/SqlppCompilationProvider.java
index 78b1e2b..eef5884 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/compiler/provider/SqlppCompilationProvider.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/compiler/provider/SqlppCompilationProvider.java
@@ -90,11 +90,12 @@
                 CompilerProperties.COMPILER_SORT_SAMPLES_KEY, CompilerProperties.COMPILER_EXTERNALSCANMEMORY_KEY,
                 CompilerProperties.COMPILER_INDEXONLY_KEY, CompilerProperties.COMPILER_INTERNAL_SANITYCHECK_KEY,
                 CompilerProperties.COMPILER_EXTERNAL_FIELD_PUSHDOWN_KEY, CompilerProperties.COMPILER_SUBPLAN_MERGE_KEY,
-                CompilerProperties.COMPILER_SUBPLAN_NESTEDPUSHDOWN_KEY, CompilerProperties.COMPILER_ARRAYINDEX_KEY,
-                CompilerProperties.COMPILER_CBO_KEY, CompilerProperties.COMPILER_CBO_TEST_KEY,
-                CompilerProperties.COMPILER_FORCE_JOIN_ORDER_KEY, CompilerProperties.COMPILER_QUERY_PLAN_SHAPE_KEY,
-                CompilerProperties.COMPILER_MIN_MEMORY_ALLOCATION_KEY, CompilerProperties.COMPILER_COLUMN_FILTER_KEY,
-                CompilerProperties.COMPILER_BATCH_LOOKUP_KEY, FunctionUtil.IMPORT_PRIVATE_FUNCTIONS,
+                CompilerProperties.COMPILER_SUBPLAN_NESTEDPUSHDOWN_KEY, CompilerProperties.COMPILER_ORDERFIELDS_KEY,
+                CompilerProperties.COMPILER_ARRAYINDEX_KEY, CompilerProperties.COMPILER_CBO_KEY,
+                CompilerProperties.COMPILER_CBO_TEST_KEY, CompilerProperties.COMPILER_FORCE_JOIN_ORDER_KEY,
+                CompilerProperties.COMPILER_QUERY_PLAN_SHAPE_KEY, CompilerProperties.COMPILER_MIN_MEMORY_ALLOCATION_KEY,
+                CompilerProperties.COMPILER_COLUMN_FILTER_KEY, CompilerProperties.COMPILER_BATCH_LOOKUP_KEY,
+                FunctionUtil.IMPORT_PRIVATE_FUNCTIONS,
                 CompilerProperties.COMPILER_MAX_VARIABLE_OCCURRENCES_INLINING_KEY, FuzzyUtils.SIM_FUNCTION_PROP_NAME,
                 FuzzyUtils.SIM_THRESHOLD_PROP_NAME, StartFeedStatement.WAIT_FOR_COMPLETION,
                 FeedActivityDetails.FEED_POLICY_NAME, FeedActivityDetails.COLLECT_LOCATIONS,
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index 47498ea..db7c21a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -234,9 +234,14 @@
 
         ICcApplicationContext ccAppContext = metadataProvider.getApplicationContext();
         CompilerProperties compilerProperties = ccAppContext.getCompilerProperties();
-        Map<String, Object> querySpecificConfig = validateConfig(metadataProvider.getConfig(), sourceLoc);
+        Map<String, Object> config = metadataProvider.getConfig();
+        Map<String, Object> querySpecificConfig = validateConfig(config, sourceLoc);
         final PhysicalOptimizationConfig physOptConf = OptimizationConfUtil.createPhysicalOptimizationConf(
                 compilerProperties, querySpecificConfig, configurableParameterNames, sourceLoc);
+        if (!config.containsKey(CompilerProperties.COMPILER_ORDERFIELDS_KEY)) {
+            config.put(CompilerProperties.COMPILER_ORDERFIELDS_KEY, Boolean.toString(physOptConf.isOrderField()));
+        }
+
         boolean cboMode = physOptConf.getCBOMode() || physOptConf.getCBOTestMode();
         HeuristicCompilerFactoryBuilder builder =
                 new HeuristicCompilerFactoryBuilder(OptimizationContextFactory.INSTANCE);
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
index c364a58..9d53805 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
@@ -47,6 +47,7 @@
     "compiler\.min\.memory\.allocation" : true,
     "compiler.min.sortmemory" : 524288,
     "compiler.min.windowmemory" : 524288,
+    "compiler.orderfields" : false,
     "compiler\.parallelism" : 0,
     "compiler.queryplanshape" : "zigzag",
     "compiler.runtime.memory.overhead" : 5,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
index 9e85426..0b23562 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
@@ -47,6 +47,7 @@
     "compiler\.min\.memory\.allocation" : true,
     "compiler.min.sortmemory" : 524288,
     "compiler.min.windowmemory" : 524288,
+    "compiler.orderfields" : false,
     "compiler\.parallelism" : -1,
     "compiler.queryplanshape" : "zigzag",
     "compiler.runtime.memory.overhead" : 5,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
index ac00566..0d77d05 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
@@ -47,6 +47,7 @@
     "compiler\.min\.memory\.allocation" : true,
     "compiler.min.sortmemory" : 524288,
     "compiler.min.windowmemory" : 524288,
+    "compiler.orderfields" : false,
     "compiler\.parallelism" : 3,
     "compiler.queryplanshape" : "zigzag",
     "compiler.runtime.memory.overhead" : 5,
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
index 957cb84..6f11e1e 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
@@ -156,7 +156,8 @@
         COMPILER_MAX_VARIABLE_OCCURRENCES_INLINING(
                 getRangedIntegerType(0, Integer.MAX_VALUE),
                 128,
-                "Maximum occurrences of a variable allowed in an expression for inlining");
+                "Maximum occurrences of a variable allowed in an expression for inlining"),
+        COMPILER_ORDERFIELDS(BOOLEAN, AlgebricksConfig.ORDERED_FIELDS, "Enable/disable select order list");
 
         private final IOptionType type;
         private final Object defaultValue;
@@ -241,6 +242,8 @@
     public static final String COMPILER_MAX_VARIABLE_OCCURRENCES_INLINING_KEY =
             Option.COMPILER_MAX_VARIABLE_OCCURRENCES_INLINING.ini();
 
+    public static final String COMPILER_ORDERFIELDS_KEY = Option.COMPILER_ORDERFIELDS.ini();
+
     public static final int COMPILER_PARALLELISM_AS_STORAGE = 0;
 
     public CompilerProperties(PropertiesAccessor accessor) {
@@ -369,6 +372,10 @@
         return accessor.getBoolean(Option.COMPILER_COLUMN_FILTER);
     }
 
+    public boolean isOrderedFields() {
+        return accessor.getBoolean(Option.COMPILER_ORDERFIELDS);
+    }
+
     public int getRuntimeMemoryOverheadPercentage() {
         return accessor.getInt(Option.COMPILER_RUNTIME_MEMORY_OVERHEAD);
     }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java
index 28ab077..fe5ec9e 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java
@@ -97,6 +97,8 @@
                 compilerProperties.isColumnFilter());
         int maxVariableOccurrencesForInlining =
                 getMaxVariableOccurrencesForInlining(compilerProperties, querySpecificConfig, sourceLoc);
+        boolean orderFields = getBoolean(querySpecificConfig, CompilerProperties.COMPILER_ORDERFIELDS_KEY,
+                compilerProperties.isOrderedFields());
 
         PhysicalOptimizationConfig physOptConf = new PhysicalOptimizationConfig();
         physOptConf.setFrameSize(frameSize);
@@ -126,6 +128,7 @@
         physOptConf.setMinGroupFrames(compilerProperties.getMinGroupMemoryFrames());
         physOptConf.setMinWindowFrames(compilerProperties.getMinWindowMemoryFrames());
         physOptConf.setMaxVariableOccurrencesForInlining(maxVariableOccurrencesForInlining);
+        physOptConf.setOrderFields(orderFields);
 
         // We should have already validated the parameter names at this point...
         Set<String> filteredParameterNames = new HashSet<>(parameterNames);
diff --git a/asterixdb/asterix-doc/src/site/markdown/ncservice.md b/asterixdb/asterix-doc/src/site/markdown/ncservice.md
index c996e23..2352c02 100644
--- a/asterixdb/asterix-doc/src/site/markdown/ncservice.md
+++ b/asterixdb/asterix-doc/src/site/markdown/ncservice.md
@@ -344,6 +344,7 @@
 | common  | compiler.groupmemory                      | The memory budget (in bytes) for a group by operator instance in a partition | 33554432 (32 MB) |
 | common  | compiler.joinmemory                       | The memory budget (in bytes) for a join operator instance in a partition | 33554432 (32 MB) |
 | common  | compiler.parallelism                      | The degree of parallelism for query execution. Zero means to use the storage parallelism as the query execution parallelism, while other integer values dictate the number of query execution parallel partitions. The system will fall back to use the number of all available CPU cores in the cluster as the degree of parallelism if the number set by a user is too large or too small | 0 |
+| common  | compiler.orderfields                      | Enable/disbale select order fields in the response | false |
 | common  | compiler.sortmemory                       | The memory budget (in bytes) for a sort operator instance in a partition | 33554432 (32 MB) |
 | common  | compiler.sort.parallel                    | Enable full parallel sort for queries | true |
 | common  | compiler.sort.samples                     | The number of samples taken from each partition to guide the sort operation when full parallel sort is enabled | 100 |
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/common/TypeResolverUtil.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/common/TypeResolverUtil.java
index 9d52a85..6d9007f 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/common/TypeResolverUtil.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/common/TypeResolverUtil.java
@@ -176,9 +176,9 @@
         IAType[] fieldTypesArray = fieldTypes.toArray(new IAType[0]);
         ARecordType resultType;
         if (resultTypeIsOpen && knowsAdditonalFieldNames) {
-            resultType = new ARecordType("generalized-record-type", fieldNamesArray, fieldTypesArray, resultTypeIsOpen,
-                    allPossibleAdditionalFieldNames);
             LinkedHashSet<String> resultFieldOrder = generalizeRecordFieldOrderHint(leftType, rightType);
+            resultType = new ARecordType("generalized-record-type", fieldNamesArray, fieldTypesArray, resultTypeIsOpen,
+                    allPossibleAdditionalFieldNames, resultFieldOrder);
             if (resultFieldOrder != null) {
                 resultType.getAnnotations().add(new RecordFieldOrderAnnotation(resultFieldOrder));
             }
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/ARecordVisitablePointable.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/ARecordVisitablePointable.java
index 6637af8..e418542 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/ARecordVisitablePointable.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/ARecordVisitablePointable.java
@@ -22,6 +22,7 @@
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.LinkedHashSet;
 import java.util.List;
 
 import org.apache.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
@@ -39,6 +40,8 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.util.string.UTF8StringWriter;
 
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+
 /**
  * This class interprets the binary data representation of a record. One can
  * call getFieldNames, getFieldTypeTags and getFieldValues to get pointable
@@ -56,6 +59,8 @@
     // access results: field names, field types, and field values
     private final List<IVisitablePointable> fieldNames = new ArrayList<>();
     private final List<IVisitablePointable> fieldValues = new ArrayList<>();
+    private final IntArrayList reverseLookupClosedFields = new IntArrayList();
+    private int numFields = 0;
 
     // pointable allocator
     private final PointableAllocator allocator = new PointableAllocator();
@@ -63,6 +68,7 @@
     private final ResettableByteArrayOutputStream typeBos = new ResettableByteArrayOutputStream();
 
     private final ResettableByteArrayOutputStream dataBos = new ResettableByteArrayOutputStream();
+
     private final DataOutputStream dataDos = new DataOutputStream(dataBos);
 
     private final ARecordType inputRecType;
@@ -91,6 +97,22 @@
         try {
             final DataOutputStream typeDos = new DataOutputStream(typeBos);
             final UTF8StringWriter utf8Writer = new UTF8StringWriter();
+            LinkedHashSet<String> allOrderedFields = inputType.getAllOrderedFields();
+            if (allOrderedFields != null) {
+                numFields = allOrderedFields.size();
+                for (String field : allOrderedFields) {
+                    int nameStart = typeBos.size();
+                    typeDos.writeByte(ATypeTag.SERIALIZED_STRING_TYPE_TAG);
+                    utf8Writer.writeUTF8(field, typeDos);
+                    int nameEnd = typeBos.size();
+                    IVisitablePointable typeNameReference = AFlatValuePointable.FACTORY.create(null);
+                    typeNameReference.set(typeBos.getByteArray(), nameStart, nameEnd - nameStart);
+                    fieldNames.add(typeNameReference);
+                    fieldValues.add(missingReference);
+                }
+            }
+
+            int index = 0;
             for (int i = 0; i < numberOfSchemaFields; i++) {
                 // add type name Reference (including a string type tag)
                 int nameStart = typeBos.size();
@@ -99,7 +121,17 @@
                 int nameEnd = typeBos.size();
                 IVisitablePointable typeNameReference = AFlatValuePointable.FACTORY.create(null);
                 typeNameReference.set(typeBos.getByteArray(), nameStart, nameEnd - nameStart);
-                fieldNames.add(typeNameReference);
+                for (; index < numFields; index++) {
+                    if (fieldNames.get(index).equals(typeNameReference)) {
+                        break;
+                    }
+                }
+                if (index == numFields) {
+                    fieldNames.add(typeNameReference);
+                    reverseLookupClosedFields.add(fieldNames.size() - 1);
+                    continue;
+                }
+                reverseLookupClosedFields.add(index);
             }
 
             // initialize a constant: null value bytes reference
@@ -113,6 +145,7 @@
             typeDos.writeByte(ATypeTag.SERIALIZED_MISSING_TYPE_TAG);
             int missingFieldEnd = typeBos.size();
             missingReference.set(typeBos.getByteArray(), missingFieldStart, missingFieldEnd - missingFieldStart);
+
         } catch (IOException e) {
             throw new IllegalStateException(e);
         }
@@ -126,11 +159,22 @@
         // reset the allocator
         allocator.reset();
 
+        int removeTill = numFields;
+        if (numFields == 0) {
+            removeTill = numberOfSchemaFields;
+        }
         // clean up the returned containers
-        for (int i = fieldNames.size() - 1; i >= numberOfSchemaFields; i--) {
+        for (int i = fieldNames.size() - 1; i >= removeTill; i--) {
             fieldNames.remove(i);
         }
-        fieldValues.clear();
+
+        for (int i = fieldValues.size() - 1; i >= numFields; i--) {
+            fieldValues.remove(i);
+        }
+
+        for (int i = 0; i < numFields; i++) {
+            fieldValues.set(i, missingReference);
+        }
     }
 
     @Override
@@ -176,16 +220,25 @@
                     offsetArrayOffset += 4;
                 }
                 for (int fieldNumber = 0; fieldNumber < numberOfSchemaFields; fieldNumber++) {
+                    int index = reverseLookupClosedFields.get(fieldNumber);
                     if (hasOptionalFields) {
                         byte b1 = b[nullBitMapOffset + fieldNumber / 4];
                         if (RecordUtil.isNull(b1, fieldNumber)) {
                             // set null value (including type tag inside)
-                            fieldValues.add(nullReference);
+                            if (index < numFields) {
+                                fieldValues.set(index, nullReference);
+                            } else {
+                                fieldValues.add(nullReference);
+                            }
                             continue;
                         }
                         if (RecordUtil.isMissing(b1, fieldNumber)) {
                             // set missing value (including type tag inside)
-                            fieldValues.add(missingReference);
+                            if (index < numFields) {
+                                fieldValues.set(index, missingReference);
+                            } else {
+                                fieldValues.add(missingReference);
+                            }
                             continue;
                         }
                     }
@@ -212,12 +265,18 @@
                     int fend = dataBos.size();
                     IVisitablePointable fieldValue = allocator.allocateFieldValue(fieldType);
                     fieldValue.set(dataBos.getByteArray(), fstart, fend - fstart);
-                    fieldValues.add(fieldValue);
+                    if (index < numFields) {
+                        fieldValues.set(index, fieldValue);
+                    } else {
+                        fieldValues.add(fieldValue);
+                    }
                 }
             }
             if (isExpanded) {
                 int numberOfOpenFields = AInt32SerializerDeserializer.getInt(b, openPartOffset);
                 int fieldOffset = openPartOffset + 4 + (8 * numberOfOpenFields);
+                int currentCheck = 0;
+                int reverseLookupIndex = 0;
                 for (int i = 0; i < numberOfOpenFields; i++) {
                     // set the field name (including a type tag, which is a string)
                     int fieldValueLength =
@@ -228,7 +287,22 @@
                     int fnend = dataBos.size();
                     IVisitablePointable fieldName = allocator.allocateEmpty();
                     fieldName.set(dataBos.getByteArray(), fnstart, fnend - fnstart);
-                    fieldNames.add(fieldName);
+                    boolean addItToNumFields = true;
+                    for (; currentCheck < numFields; currentCheck++) {
+                        if (reverseLookupIndex < reverseLookupClosedFields.size()
+                                && currentCheck == reverseLookupClosedFields.get(reverseLookupIndex)) {
+                            reverseLookupIndex++;
+                            continue;
+                        }
+
+                        if (fieldNames.get(currentCheck).equals(fieldName)) {
+                            break;
+                        }
+                    }
+                    if (currentCheck >= numFields) {
+                        addItToNumFields = false;
+                        fieldNames.add(fieldName);
+                    }
                     fieldOffset += fieldValueLength;
 
                     typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(b[fieldOffset]);
@@ -238,7 +312,12 @@
                     // allocate
                     IVisitablePointable fieldValueAccessor = allocator.allocateFieldValue(typeTag, b, fieldOffset + 1);
                     fieldValueAccessor.set(b, fieldOffset, fieldValueLength);
-                    fieldValues.add(fieldValueAccessor);
+                    if (!addItToNumFields) {
+                        fieldValues.add(fieldValueAccessor);
+                    } else {
+                        fieldValues.set(currentCheck, fieldValueAccessor);
+                        currentCheck++;
+                    }
                     fieldOffset += fieldValueLength;
                 }
             }
@@ -263,5 +342,4 @@
     public <R, T> R accept(IVisitablePointableVisitor<R, T> vistor, T tag) throws HyracksDataException {
         return vistor.visit(this, tag);
     }
-
 }
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/ARecordPrinter.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/ARecordPrinter.java
index 7823637..34d618d 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/ARecordPrinter.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/ARecordPrinter.java
@@ -59,8 +59,8 @@
 
         ps.print(startRecord);
 
-        final int size = fieldNames.size();
         boolean first = true;
+        final int size = fieldNames.size();
         for (int i = 0; i < size; ++i) {
             final IVisitablePointable fieldName = fieldNames.get(i);
             final IVisitablePointable fieldValue = fieldValues.get(i);
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/OpenRecordConstructorResultType.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/OpenRecordConstructorResultType.java
index 838f6f8..bc3ae60 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/OpenRecordConstructorResultType.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/OpenRecordConstructorResultType.java
@@ -24,9 +24,11 @@
 import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.asterix.common.annotations.RecordFieldOrderAnnotation;
+import org.apache.asterix.common.config.CompilerProperties;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.om.typecomputer.base.IResultTypeComputer;
@@ -43,6 +45,7 @@
 import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
 import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import org.apache.hyracks.algebricks.core.config.AlgebricksConfig;
 import org.apache.hyracks.util.LogRedactionUtil;
 
 public class OpenRecordConstructorResultType implements IResultTypeComputer {
@@ -62,6 +65,13 @@
             return type;
         }
 
+        boolean orderFields = AlgebricksConfig.ORDERED_FIELDS;
+        if (metadataProvider != null) {
+            Map<String, Object> config = metadataProvider.getConfig();
+            orderFields = Boolean.parseBoolean((String) config.getOrDefault(CompilerProperties.COMPILER_ORDERFIELDS_KEY,
+                    String.valueOf(orderFields)));
+        }
+
         Iterator<Mutable<ILogicalExpression>> argIter = f.getArguments().iterator();
         List<String> namesList = new ArrayList<>();
         List<IAType> typesList = new ArrayList<>();
@@ -102,7 +112,12 @@
         IAType[] fieldTypes = typesList.toArray(new IAType[0]);
         ARecordType resultType;
         if (isOpen && canProvideAdditionFieldInfo) {
-            resultType = new ARecordType(null, fieldNames, fieldTypes, isOpen, allPossibleAdditionalFieldNames);
+            if (orderFields) {
+                resultType = new ARecordType(null, fieldNames, fieldTypes, isOpen, allPossibleAdditionalFieldNames,
+                        allPossibleFieldNamesOrdered);
+            } else {
+                resultType = new ARecordType(null, fieldNames, fieldTypes, isOpen, allPossibleAdditionalFieldNames);
+            }
             resultType.getAnnotations().add(new RecordFieldOrderAnnotation(allPossibleFieldNamesOrdered));
         } else {
             resultType = new ARecordType(null, fieldNames, fieldTypes, isOpen);
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/ARecordType.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/ARecordType.java
index fd51433..ba352ac 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/ARecordType.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/ARecordType.java
@@ -23,6 +23,7 @@
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -54,10 +55,15 @@
 
     private static final long serialVersionUID = 1L;
     private static final JavaType SET = OBJECT_MAPPER.getTypeFactory().constructCollectionType(Set.class, String.class);
+
+    private static final JavaType LIST =
+            OBJECT_MAPPER.getTypeFactory().constructCollectionType(LinkedHashSet.class, String.class);
     private static final String IS_OPEN = "isOpen";
     private static final String FIELD_NAMES = "fieldNames";
     private static final String FIELD_TYPES = "fieldTypes";
     private static final String ADDITIONAL_FIELDS = "additionalFieldNames";
+
+    private static final String ORDERED_FIELDS = "orderedFields";
     private final String[] fieldNames;
     private final IAType[] fieldTypes;
     private final Map<String, Integer> fieldNameToIndexMap = new HashMap<>();
@@ -71,6 +77,8 @@
     // the bounded set of all possible additional field names.
     private final Set<String> allPossibleAdditionalFieldNames;
 
+    private final LinkedHashSet<String> allOrderedFields;
+
     /**
      * @param typeName   the name of the type
      * @param fieldNames the names of the closed fields
@@ -78,7 +86,31 @@
      * @param isOpen     whether the record is open
      */
     public ARecordType(String typeName, String[] fieldNames, IAType[] fieldTypes, boolean isOpen) {
-        this(typeName, fieldNames, fieldTypes, isOpen, null);
+        this(typeName, fieldNames, fieldTypes, isOpen, null, null);
+    }
+
+    /**
+     * @param typeName   the name of the type
+     * @param fieldNames the names of the closed fields
+     * @param fieldTypes the types of the closed fields
+     * @param isOpen     whether the record is open
+     * @param allPossibleAdditionalFieldNames, all possible additional field names.
+     */
+    public ARecordType(String typeName, String[] fieldNames, IAType[] fieldTypes, boolean isOpen,
+            Set<String> allPossibleAdditionalFieldNames) {
+        this(typeName, fieldNames, fieldTypes, isOpen, allPossibleAdditionalFieldNames, null);
+    }
+
+    /**
+     * @param typeName   the name of the type
+     * @param fieldNames the names of the closed fields
+     * @param fieldTypes the types of the closed fields
+     * @param isOpen     whether the record is open
+     * @param allOrderedFields fields in order.
+     */
+    public ARecordType(String typeName, String[] fieldNames, IAType[] fieldTypes, boolean isOpen,
+            LinkedHashSet<String> allOrderedFields) {
+        this(typeName, fieldNames, fieldTypes, isOpen, null, allOrderedFields);
     }
 
     /**
@@ -86,10 +118,11 @@
      * @param fieldNames                       the names of the closed fields
      * @param fieldTypes                       the types of the closed fields
      * @param isOpen                           whether the record is open
-     * @param allPossibleAdditionalFieldNames, all possible additional field names.
+     * @param allPossibleAdditionalFieldNames, all possible additional field names
+     * @param allOrderedFields fields in order.
      */
     public ARecordType(String typeName, String[] fieldNames, IAType[] fieldTypes, boolean isOpen,
-            Set<String> allPossibleAdditionalFieldNames) {
+            Set<String> allPossibleAdditionalFieldNames, LinkedHashSet<String> allOrderedFields) {
         super(typeName);
         this.fieldNames = fieldNames;
         this.fieldTypes = fieldTypes;
@@ -100,6 +133,7 @@
             fieldNameToIndexMap.put(fieldNames[index], index);
         }
         this.allPossibleAdditionalFieldNames = allPossibleAdditionalFieldNames;
+        this.allOrderedFields = allOrderedFields;
     }
 
     public boolean canContainField(String fieldName) {
@@ -383,6 +417,7 @@
         jsonObject.put(IS_OPEN, isOpen);
         jsonObject.putPOJO(FIELD_NAMES, fieldNames);
         jsonObject.putPOJO(ADDITIONAL_FIELDS, allPossibleAdditionalFieldNames);
+        jsonObject.putPOJO(ORDERED_FIELDS, allOrderedFields);
         ArrayNode fieldTypesArray = OBJECT_MAPPER.createArrayNode();
         for (int i = 0; i < fieldTypes.length; i++) {
             fieldTypesArray.add(fieldTypes[i].toJson(registry));
@@ -397,12 +432,17 @@
         boolean isOpen = json.get(IS_OPEN).asBoolean();
         String[] fieldNames = OBJECT_MAPPER.convertValue(json.get(FIELD_NAMES), String[].class);
         Set<String> additionalFields = OBJECT_MAPPER.convertValue(json.get(ADDITIONAL_FIELDS), SET);
+        LinkedHashSet<String> orderedFields = OBJECT_MAPPER.convertValue(json.get(ORDERED_FIELDS), LIST);
         ArrayNode fieldTypesNode = (ArrayNode) json.get(FIELD_TYPES);
         IAType[] fieldTypes = new IAType[fieldTypesNode.size()];
         for (int i = 0; i < fieldTypesNode.size(); i++) {
             fieldTypes[i] = (IAType) registry.deserialize(fieldTypesNode.get(i));
         }
-        return new ARecordType(typeName, fieldNames, fieldTypes, isOpen, additionalFields);
+        return new ARecordType(typeName, fieldNames, fieldTypes, isOpen, additionalFields, orderedFields);
+    }
+
+    public LinkedHashSet<String> getAllOrderedFields() {
+        return allOrderedFields;
     }
 
     public List<IAType> getFieldTypes(List<List<String>> fields) throws AlgebricksException {
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java
index 465cd29..0b29f7d 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java
@@ -46,5 +46,6 @@
             StorageUtil.getIntSizeInBytes(8, StorageUtil.StorageUnit.KILOBYTE);
     public static final boolean BATCH_LOOKUP_DEFAULT = true;
     public static final boolean COLUMN_FILTER_DEFAULT = true;
+    public static final boolean ORDERED_FIELDS = false;
     public static final int MAX_VARIABLE_OCCURRENCES_INLINING_DEFAULT = 128;
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
index 07e8bfc..163fb16 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
@@ -64,6 +64,8 @@
     private static final String MIN_WINDOW_FRAMES = "MIN_WINDOW_FRAMES";
     private static final String MAX_VARIABLE_OCCURRENCES_INLINING = "MAX_VARIABLE_OCCURRENCES_INLINING";
 
+    private static final String ORDER_FIELDS = "ORDER_FIELDS";
+
     private final Properties properties = new Properties();
 
     public PhysicalOptimizationConfig() {
@@ -160,6 +162,10 @@
         setInt(MAX_FRAMES_FOR_TEXTSEARCH, frameLimit);
     }
 
+    public void setOrderFields(boolean orderFields) {
+        setBoolean(ORDER_FIELDS, orderFields);
+    }
+
     public int getHashGroupByTableSize() {
         return getInt(DEFAULT_HASH_GROUP_TABLE_SIZE, 10485767);
     }
@@ -236,6 +242,10 @@
         return getBoolean(SORT_PARALLEL, AlgebricksConfig.SORT_PARALLEL_DEFAULT);
     }
 
+    public boolean isOrderField() {
+        return getBoolean(ORDER_FIELDS, AlgebricksConfig.ORDERED_FIELDS);
+    }
+
     public void setSortParallel(boolean sortParallel) {
         setBoolean(SORT_PARALLEL, sortParallel);
     }