[ASTERIXDB-3459][RT] Extend ITuplePartitionComputerFactory and ITuplePartitionComputer

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
Extend ITuplePartitionComputerFactory and ITuplePartitionComputer with more
methods for extensions to use.

- Refactored CastEvaluatorDescriptor.
- Added internal functions for extensions.

Ext-ref: MB-61350
Change-Id: I6e763c16eb93185d84a2c181b0b402798ab5cb14
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18484
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ritik Raj <raj.ritik9835@gmail.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Tested-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
index 21b8b60..57b842a 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
@@ -1288,6 +1288,10 @@
     public static final FunctionIdentifier PUT_AUTOGENERATED_KEY =
             FunctionConstants.newAsterix("put-autogenerated-key", FunctionIdentifier.VARARGS);
 
+    public static final FunctionIdentifier ACCESS_FIELD = FunctionConstants.newAsterix("access-field", 2);
+
+    public static final FunctionIdentifier ACCESS_NESTED_FIELD = FunctionConstants.newAsterix("access-nested-field", 2);
+
     static {
         // first, take care of Algebricks builtin functions
         addFunction(IS_MISSING, BooleanOnlyTypeComputer.INSTANCE, true);
@@ -2146,6 +2150,9 @@
         // used by UPSERT/INSERT for collections with autogenerated uuid
         addPrivateFunction(PUT_AUTOGENERATED_KEY, PutAutogeneratedKeyTypeComputer.INSTANCE, false);
 
+        addPrivateFunction(ACCESS_FIELD, FieldAccessByNameResultType.INSTANCE, false);
+        addPrivateFunction(ACCESS_NESTED_FIELD, FieldAccessNestedResultType.INSTANCE, false);
+
     }
 
     static {
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CastTypeDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CastTypeDescriptor.java
index 6a0d6fb..b168119 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CastTypeDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CastTypeDescriptor.java
@@ -27,10 +27,7 @@
 import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
 import org.apache.asterix.runtime.functions.FunctionTypeInferers;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
-import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 /**
  * This runtime function casts an input ADM instance of a certain type into the form
@@ -73,15 +70,6 @@
     @Override
     public IScalarEvaluatorFactory createEvaluatorFactory(final IScalarEvaluatorFactory[] args) {
         final IScalarEvaluatorFactory recordEvalFactory = args[0];
-
-        return new IScalarEvaluatorFactory() {
-            private static final long serialVersionUID = 1L;
-
-            @Override
-            public IScalarEvaluator createScalarEvaluator(IEvaluatorContext ctx) throws HyracksDataException {
-                IScalarEvaluator argEval = recordEvalFactory.createScalarEvaluator(ctx);
-                return new CastTypeEvaluator(reqType, inputType, argEval, sourceLoc);
-            }
-        };
+        return new CastTypeEvaluatorFactory(recordEvalFactory, reqType, inputType, sourceLoc);
     }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CastTypeEvaluatorFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CastTypeEvaluatorFactory.java
new file mode 100644
index 0000000..eb56b5f
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CastTypeEvaluatorFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.evaluators.functions;
+
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+
+public class CastTypeEvaluatorFactory implements IScalarEvaluatorFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private final IScalarEvaluatorFactory recordEvalFactory;
+    private final IAType reqType;
+    private final IAType inputType;
+    private final SourceLocation sourceLoc;
+
+    public CastTypeEvaluatorFactory(IScalarEvaluatorFactory recordEvalFactory, IAType reqType, IAType inputType,
+            SourceLocation sourceLoc) {
+        this.recordEvalFactory = recordEvalFactory;
+        this.reqType = reqType;
+        this.inputType = inputType;
+        this.sourceLoc = sourceLoc;
+    }
+
+    @Override
+    public IScalarEvaluator createScalarEvaluator(IEvaluatorContext ctx) throws HyracksDataException {
+        IScalarEvaluator argEval = recordEvalFactory.createScalarEvaluator(ctx);
+        return new CastTypeEvaluator(reqType, inputType, argEval, sourceLoc);
+    }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/AccessFieldDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/AccessFieldDescriptor.java
new file mode 100644
index 0000000..4ad234f
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/AccessFieldDescriptor.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.evaluators.functions.records;
+
+import org.apache.asterix.common.annotations.MissingNullInOutFunction;
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+
+@MissingNullInOutFunction
+public class AccessFieldDescriptor extends AbstractScalarFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public static final IFunctionDescriptorFactory FACTORY = AccessFieldDescriptor::new;
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return BuiltinFunctions.ACCESS_FIELD;
+    }
+
+    @Override
+    public IScalarEvaluatorFactory createEvaluatorFactory(IScalarEvaluatorFactory[] args) {
+        return new FieldAccessByNameEvalFactory(args[0], args[1], sourceLoc, getIdentifier());
+    }
+
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/AccessNestedFieldDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/AccessNestedFieldDescriptor.java
new file mode 100644
index 0000000..87bb773
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/AccessNestedFieldDescriptor.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.evaluators.functions.records;
+
+import java.util.List;
+
+import org.apache.asterix.common.annotations.MissingNullInOutFunction;
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptor;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.om.functions.IFunctionTypeInferer;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
+import org.apache.asterix.runtime.functions.FunctionTypeInferers;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+
+@MissingNullInOutFunction
+public class AccessNestedFieldDescriptor extends AbstractScalarFunctionDynamicDescriptor {
+
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        @Override
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new AccessNestedFieldDescriptor();
+        }
+
+        @Override
+        public IFunctionTypeInferer createFunctionTypeInferer() {
+            return new FunctionTypeInferers.FieldAccessNestedTypeInferer();
+        }
+    };
+
+    private static final long serialVersionUID = 1L;
+    private ARecordType recType;
+    private List<String> fldName;
+
+    @Override
+    public void setImmutableStates(Object... states) {
+        this.recType = (ARecordType) states[0];
+        this.fldName = (List<String>) states[1];
+    }
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return BuiltinFunctions.ACCESS_NESTED_FIELD;
+    }
+
+    @Override
+    public IScalarEvaluatorFactory createEvaluatorFactory(IScalarEvaluatorFactory[] args) {
+        return new FieldAccessNestedEvalFactory(args[0], recType, fldName, sourceLoc, getIdentifier());
+    }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
index 214c8a0..86a0790 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
@@ -536,6 +536,8 @@
 import org.apache.asterix.runtime.evaluators.functions.bitwise.BitXorDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.bitwise.IsBitSetWithAllFlagDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.bitwise.IsBitSetWithoutAllFlagDescriptor;
+import org.apache.asterix.runtime.evaluators.functions.records.AccessFieldDescriptor;
+import org.apache.asterix.runtime.evaluators.functions.records.AccessNestedFieldDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.records.FieldAccessByIndexDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.records.FieldAccessByNameDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.records.FieldAccessNestedDescriptor;
@@ -990,7 +992,9 @@
         // Element accessors.
         fc.add(FieldAccessByIndexDescriptor.FACTORY);
         fc.add(FieldAccessByNameDescriptor.FACTORY);
+        fc.add(AccessFieldDescriptor.FACTORY);
         fc.add(FieldAccessNestedDescriptor.FACTORY);
+        fc.add(AccessNestedFieldDescriptor.FACTORY);
 
         fc.add(AnyCollectionMemberDescriptor.FACTORY);
         fc.add(GetItemDescriptor.FACTORY);
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITuplePartitionComputer.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITuplePartitionComputer.java
index 2068ef3..6cd8fd4 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITuplePartitionComputer.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITuplePartitionComputer.java
@@ -41,4 +41,9 @@
      */
     public default void initialize() throws HyracksDataException {
     }
+
+    default int partition(IFrameTupleAccessor accessor, int tIndex, int evaluatorIndex, int nParts)
+            throws HyracksDataException {
+        throw new UnsupportedOperationException();
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITuplePartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITuplePartitionComputerFactory.java
index 81f9053..26a4ea9 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITuplePartitionComputerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITuplePartitionComputerFactory.java
@@ -21,7 +21,13 @@
 import java.io.Serializable;
 
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public interface ITuplePartitionComputerFactory extends Serializable {
-    public ITuplePartitionComputer createPartitioner(IHyracksTaskContext hyracksTaskContext);
+    public ITuplePartitionComputer createPartitioner(IHyracksTaskContext hyracksTaskContext)
+            throws HyracksDataException;
+
+    default IBinaryHashFunction[] getBinaryHashFunctions() {
+        throw new UnsupportedOperationException();
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java
index 9ee2105..56d561a 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java
@@ -53,10 +53,7 @@
 
     @Override
     public ITuplePartitionComputer createPartitioner(IHyracksTaskContext ctx) {
-        final IBinaryHashFunction[] hashFunctions = new IBinaryHashFunction[hashFunctionFactories.length];
-        for (int i = 0; i < hashFunctionFactories.length; ++i) {
-            hashFunctions[i] = hashFunctionFactories[i].createBinaryHashFunction();
-        }
+        final IBinaryHashFunction[] hashFunctions = getBinaryHashFunctions();
         if (partitionsMap == null) {
             return new FieldHashPartitionComputer(hashFields, hashFunctions, null);
         } else {
@@ -69,4 +66,17 @@
             return new FieldHashPartitionComputer(hashFields, hashFunctions, storagePartition2Compute);
         }
     }
+
+    @Override
+    public IBinaryHashFunction[] getBinaryHashFunctions() {
+        final IBinaryHashFunction[] hashFunctions = new IBinaryHashFunction[hashFunctionFactories.length];
+        for (int i = 0; i < hashFunctionFactories.length; ++i) {
+            hashFunctions[i] = hashFunctionFactories[i].createBinaryHashFunction();
+        }
+        return hashFunctions;
+    }
+
+    public int[][] getPartitionsMap() {
+        return partitionsMap;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/RepartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/RepartitionComputerFactory.java
index 1821d78..d26bd5b 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/RepartitionComputerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/RepartitionComputerFactory.java
@@ -36,7 +36,8 @@
     }
 
     @Override
-    public ITuplePartitionComputer createPartitioner(IHyracksTaskContext hyracksTaskContext) {
+    public ITuplePartitionComputer createPartitioner(IHyracksTaskContext hyracksTaskContext)
+            throws HyracksDataException {
         return new ITuplePartitionComputer() {
             private ITuplePartitionComputer delegate = delegateFactory.createPartitioner(hyracksTaskContext);