ASTERIXDB-1112: hash-lookup type inferers

Change-Id: I6ec217536e3c36a1e671da43c5d2fb5eafb8fd08
Reviewed-on: https://asterix-gerrit.ics.uci.edu/409
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/formats/NonTaggedDataFormat.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/formats/NonTaggedDataFormat.java
index 72c7eb1..d00ad3c 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/formats/NonTaggedDataFormat.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/formats/NonTaggedDataFormat.java
@@ -371,10 +371,12 @@
 
     private static LogicalVariable METADATA_DUMMY_VAR = new LogicalVariable(-1);
 
-    private static final HashMap<ATypeTag, IValueParserFactory> typeToValueParserFactMap = new HashMap<ATypeTag, IValueParserFactory>();
+    private static final HashMap<ATypeTag, IValueParserFactory> typeToValueParserFactMap = new HashMap<>();
 
     public static final String NON_TAGGED_DATA_FORMAT = "org.apache.asterix.runtime.formats.NonTaggedDataFormat";
 
+    private Map<FunctionIdentifier, FunctionTypeInferer> functionTypeInferers = new HashMap<>();
+
     static {
         typeToValueParserFactMap.put(ATypeTag.INT32, IntegerParserFactory.INSTANCE);
         typeToValueParserFactMap.put(ATypeTag.FLOAT, FloatParserFactory.INSTANCE);
@@ -695,6 +697,8 @@
             mgr.registerFunction(fdFactory);
         }
         FunctionManagerHolder.setFunctionManager(mgr);
+
+        registerTypeInferers();
     }
 
     @Override
@@ -895,162 +899,193 @@
         if (fd == null) {
             throw new AsterixRuntimeException("Unresolved function " + fnId);
         }
-        typeInference(expr, fd, context);
+        final FunctionIdentifier fid = fd.getIdentifier();
+        if (functionTypeInferers.containsKey(fid)) {
+            functionTypeInferers.get(fid).infer(expr, fd, context);
+        }
         return fd;
     }
 
-    private void typeInference(ILogicalExpression expr, IFunctionDescriptor fd, IVariableTypeEnvironment context)
-            throws AlgebricksException {
-        if (fd.getIdentifier().equals(AsterixBuiltinFunctions.LISTIFY)) {
-            AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) expr;
-            if (f.getArguments().size() == 0) {
-                ((ListifyAggregateDescriptor) fd).reset(new AOrderedListType(null, null));
-            } else {
-                IAType itemType = (IAType) context.getType(f.getArguments().get(0).getValue());
-                if (itemType instanceof AUnionType) {
-                    if (((AUnionType) itemType).isNullableType())
-                        itemType = ((AUnionType) itemType).getNullableType();
-                    else
-                        // Convert UNION types into ANY.
-                        itemType = BuiltinType.ANY;
-                }
-                ((ListifyAggregateDescriptor) fd).reset(new AOrderedListType(itemType, null));
-            }
-        }
-        if (fd.getIdentifier().equals(AsterixBuiltinFunctions.RECORD_MERGE)) {
-            AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) expr;
-            IAType outType = (IAType) context.getType(expr);
-            IAType type0 = (IAType) context.getType(f.getArguments().get(0).getValue());
-            IAType type1 = (IAType) context.getType(f.getArguments().get(1).getValue());
-            ((RecordMergeDescriptor) fd).reset(outType, type0, type1);
-        }
-
-        if (fd.getIdentifier().equals(AsterixBuiltinFunctions.CAST_RECORD)) {
-            AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr;
-            ARecordType rt = (ARecordType) TypeComputerUtilities.getRequiredType(funcExpr);
-            IAType it = (IAType) context.getType(funcExpr.getArguments().get(0).getValue());
-            if (it.getTypeTag().equals(ATypeTag.ANY)) {
-                it = DefaultOpenFieldType.NESTED_OPEN_RECORD_TYPE;
-            }
-            ((CastRecordDescriptor) fd).reset(rt, (ARecordType) it);
-        }
-        if (fd.getIdentifier().equals(AsterixBuiltinFunctions.CAST_LIST)) {
-            AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr;
-            AbstractCollectionType rt = (AbstractCollectionType) TypeComputerUtilities.getRequiredType(funcExpr);
-            IAType it = (IAType) context.getType(funcExpr.getArguments().get(0).getValue());
-            if (it.getTypeTag().equals(ATypeTag.ANY)) {
-                it = DefaultOpenFieldType.NESTED_OPEN_AORDERED_LIST_TYPE;
-            }
-            ((CastListDescriptor) fd).reset(rt, (AbstractCollectionType) it);
-        }
-        if (fd.getIdentifier().equals(AsterixBuiltinFunctions.FLOW_RECORD)) {
-            ARecordType it = (ARecordType) TypeComputerUtilities.getInputType((AbstractFunctionCallExpression) expr);
-            ((FlowRecordDescriptor) fd).reset(it);
-        }
-        if (fd.getIdentifier().equals(AsterixBuiltinFunctions.OPEN_RECORD_CONSTRUCTOR)) {
-            ARecordType rt = (ARecordType) context.getType(expr);
-            ((OpenRecordConstructorDescriptor) fd).reset(rt,
-                    computeOpenFields((AbstractFunctionCallExpression) expr, rt));
-        }
-        if (fd.getIdentifier().equals(AsterixBuiltinFunctions.CLOSED_RECORD_CONSTRUCTOR)) {
-            ((ClosedRecordConstructorDescriptor) fd).reset((ARecordType) context.getType(expr));
-        }
-        if (fd.getIdentifier().equals(AsterixBuiltinFunctions.ORDERED_LIST_CONSTRUCTOR)) {
-            ((OrderedListConstructorDescriptor) fd).reset((AOrderedListType) context.getType(expr));
-        }
-        if (fd.getIdentifier().equals(AsterixBuiltinFunctions.UNORDERED_LIST_CONSTRUCTOR)) {
-            ((UnorderedListConstructorDescriptor) fd).reset((AUnorderedListType) context.getType(expr));
-        }
-        if (fd.getIdentifier().equals(AsterixBuiltinFunctions.FIELD_ACCESS_BY_INDEX)) {
-            AbstractFunctionCallExpression fce = (AbstractFunctionCallExpression) expr;
-            IAType t = (IAType) context.getType(fce.getArguments().get(0).getValue());
-            switch (t.getTypeTag()) {
-                case RECORD: {
-                    ARecordType recType = (ARecordType) t;
-                    ((FieldAccessByIndexDescriptor) fd).reset(recType);
-                    break;
-                }
-                case UNION: {
-                    AUnionType unionT = (AUnionType) t;
-                    if (unionT.isNullableType()) {
-                        IAType t2 = unionT.getNullableType();
-                        if (t2.getTypeTag() == ATypeTag.RECORD) {
-                            ARecordType recType = (ARecordType) t2;
-                            ((FieldAccessByIndexDescriptor) fd).reset(recType);
-                            break;
-                        }
-                    }
-                    throw new NotImplementedException("field-access-by-index for data of type " + t);
-                }
-                default: {
-                    throw new NotImplementedException("field-access-by-index for data of type " + t);
-                }
-            }
-        }
-        if (fd.getIdentifier().equals(AsterixBuiltinFunctions.FIELD_ACCESS_NESTED)) {
-            AbstractFunctionCallExpression fce = (AbstractFunctionCallExpression) expr;
-            IAType t = (IAType) context.getType(fce.getArguments().get(0).getValue());
-            AOrderedList fieldPath = (AOrderedList) (((AsterixConstantValue) ((ConstantExpression) fce.getArguments()
-                    .get(1).getValue()).getValue()).getObject());
-            List<String> listFieldPath = new ArrayList<String>();
-            for (int i = 0; i < fieldPath.size(); i++) {
-                listFieldPath.add(((AString) fieldPath.getItem(i)).getStringValue());
-            }
-
-            switch (t.getTypeTag()) {
-                case RECORD: {
-                    ARecordType recType = (ARecordType) t;
-                    ((FieldAccessNestedDescriptor) fd).reset(recType, listFieldPath);
-                    break;
-                }
-                default: {
-                    throw new NotImplementedException("field-access-nested for data of type " + t);
-                }
-            }
-        }
-        if (fd.getIdentifier().equals(AsterixBuiltinFunctions.GET_RECORD_FIELDS)) {
-            AbstractFunctionCallExpression fce = (AbstractFunctionCallExpression) expr;
-            IAType t = (IAType) context.getType(fce.getArguments().get(0).getValue());
-            if (t.getTypeTag().equals(ATypeTag.RECORD)) {
-                ARecordType recType = (ARecordType) t;
-                ((GetRecordFieldsDescriptor) fd).reset(recType);
-            } else {
-                throw new NotImplementedException("get-record-fields for data of type " + t);
-            }
-        }
-        if (fd.getIdentifier().equals(AsterixBuiltinFunctions.GET_RECORD_FIELD_VALUE)) {
-            AbstractFunctionCallExpression fce = (AbstractFunctionCallExpression) expr;
-            IAType t = (IAType) context.getType(fce.getArguments().get(0).getValue());
-            if (t.getTypeTag().equals(ATypeTag.RECORD)) {
-                ARecordType recType = (ARecordType) t;
-                ((GetRecordFieldValueDescriptor) fd).reset(recType);
-            } else {
-                throw new NotImplementedException("get-record-field-value for data of type " + t);
-            }
-        }
+    interface FunctionTypeInferer {
+        void infer(ILogicalExpression expr, IFunctionDescriptor fd, IVariableTypeEnvironment context) throws AlgebricksException;
     }
 
-    private boolean[] computeOpenFields(AbstractFunctionCallExpression expr, ARecordType recType) {
-        int n = expr.getArguments().size() / 2;
-        boolean[] open = new boolean[n];
-        for (int i = 0; i < n; i++) {
-            Mutable<ILogicalExpression> argRef = expr.getArguments().get(2 * i);
-            ILogicalExpression arg = argRef.getValue();
-            if (arg.getExpressionTag() == LogicalExpressionTag.CONSTANT) {
-                String fn = ((AString) ((AsterixConstantValue) ((ConstantExpression) arg).getValue()).getObject())
-                        .getStringValue();
-                open[i] = true;
-                for (String s : recType.getFieldNames()) {
-                    if (s.equals(fn)) {
-                        open[i] = false;
-                        break;
+    void registerTypeInferers() {
+        functionTypeInferers.put(AsterixBuiltinFunctions.LISTIFY, new FunctionTypeInferer() {
+            public void infer(ILogicalExpression expr, IFunctionDescriptor fd, IVariableTypeEnvironment context) throws AlgebricksException {
+                AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) expr;
+                if (f.getArguments().size() == 0) {
+                    ((ListifyAggregateDescriptor) fd).reset(new AOrderedListType(null, null));
+                } else {
+                    IAType itemType = (IAType) context.getType(f.getArguments().get(0).getValue());
+                    if (itemType instanceof AUnionType) {
+                        if (((AUnionType) itemType).isNullableType())
+                            itemType = ((AUnionType) itemType).getNullableType();
+                        else
+                            // Convert UNION types into ANY.
+                            itemType = BuiltinType.ANY;
+                    }
+                    ((ListifyAggregateDescriptor) fd).reset(new AOrderedListType(itemType, null));
+                }
+            }
+        });
+        functionTypeInferers.put(AsterixBuiltinFunctions.RECORD_MERGE, new FunctionTypeInferer() {
+            public void infer(ILogicalExpression expr, IFunctionDescriptor fd, IVariableTypeEnvironment context) throws AlgebricksException {
+                AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) expr;
+                IAType outType = (IAType) context.getType(expr);
+                IAType type0 = (IAType) context.getType(f.getArguments().get(0).getValue());
+                IAType type1 = (IAType) context.getType(f.getArguments().get(1).getValue());
+                ((RecordMergeDescriptor) fd).reset(outType, type0, type1);
+            }
+        });
+        functionTypeInferers.put(AsterixBuiltinFunctions.CAST_RECORD, new FunctionTypeInferer() {
+            public void infer(ILogicalExpression expr, IFunctionDescriptor fd, IVariableTypeEnvironment context) throws AlgebricksException {
+                AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr;
+                ARecordType rt = (ARecordType) TypeComputerUtilities.getRequiredType(funcExpr);
+                IAType it = (IAType) context.getType(funcExpr.getArguments().get(0).getValue());
+                if (it.getTypeTag().equals(ATypeTag.ANY)) {
+                    it = DefaultOpenFieldType.NESTED_OPEN_RECORD_TYPE;
+                }
+                ((CastRecordDescriptor) fd).reset(rt, (ARecordType) it);
+            }
+        });
+        functionTypeInferers.put(AsterixBuiltinFunctions.CAST_LIST, new FunctionTypeInferer() {
+            public void infer(ILogicalExpression expr, IFunctionDescriptor fd, IVariableTypeEnvironment context) throws AlgebricksException {
+                AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr;
+                AbstractCollectionType rt = (AbstractCollectionType) TypeComputerUtilities.getRequiredType(funcExpr);
+                IAType it = (IAType) context.getType(funcExpr.getArguments().get(0).getValue());
+                if (it.getTypeTag().equals(ATypeTag.ANY)) {
+                    it = DefaultOpenFieldType.NESTED_OPEN_AORDERED_LIST_TYPE;
+                }
+                ((CastListDescriptor) fd).reset(rt, (AbstractCollectionType) it);
+            }
+        });
+        functionTypeInferers.put(AsterixBuiltinFunctions.FLOW_RECORD, new FunctionTypeInferer() {
+            public void infer(ILogicalExpression expr, IFunctionDescriptor fd, IVariableTypeEnvironment context) throws AlgebricksException {
+                ARecordType it = (ARecordType) TypeComputerUtilities.getInputType((AbstractFunctionCallExpression) expr);
+                ((FlowRecordDescriptor) fd).reset(it);
+            }
+        });
+        functionTypeInferers.put(AsterixBuiltinFunctions.OPEN_RECORD_CONSTRUCTOR, new FunctionTypeInferer() {
+            public void infer(ILogicalExpression expr, IFunctionDescriptor fd, IVariableTypeEnvironment context) throws AlgebricksException {
+                ARecordType rt = (ARecordType) context.getType(expr);
+                ((OpenRecordConstructorDescriptor) fd).reset(rt,
+                        computeOpenFields((AbstractFunctionCallExpression) expr, rt));
+            }
+
+            private boolean[] computeOpenFields(AbstractFunctionCallExpression expr, ARecordType recType) {
+                int n = expr.getArguments().size() / 2;
+                boolean[] open = new boolean[n];
+                for (int i = 0; i < n; i++) {
+                    Mutable<ILogicalExpression> argRef = expr.getArguments().get(2 * i);
+                    ILogicalExpression arg = argRef.getValue();
+                    if (arg.getExpressionTag() == LogicalExpressionTag.CONSTANT) {
+                        String fn = ((AString) ((AsterixConstantValue) ((ConstantExpression) arg).getValue()).getObject())
+                                .getStringValue();
+                        open[i] = true;
+                        for (String s : recType.getFieldNames()) {
+                            if (s.equals(fn)) {
+                                open[i] = false;
+                                break;
+                            }
+                        }
+                    } else {
+                        open[i] = true;
                     }
                 }
-            } else {
-                open[i] = true;
+                return open;
             }
-        }
-        return open;
+        });
+        functionTypeInferers.put(AsterixBuiltinFunctions.CLOSED_RECORD_CONSTRUCTOR, new FunctionTypeInferer() {
+            public void infer(ILogicalExpression expr, IFunctionDescriptor fd, IVariableTypeEnvironment context) throws AlgebricksException {
+                ((ClosedRecordConstructorDescriptor) fd).reset((ARecordType) context.getType(expr));
+            }
+        });
+        functionTypeInferers.put(AsterixBuiltinFunctions.ORDERED_LIST_CONSTRUCTOR, new FunctionTypeInferer() {
+            public void infer(ILogicalExpression expr, IFunctionDescriptor fd, IVariableTypeEnvironment context) throws AlgebricksException {
+                ((OrderedListConstructorDescriptor) fd).reset((AOrderedListType) context.getType(expr));
+            }
+        });
+        functionTypeInferers.put(AsterixBuiltinFunctions.UNORDERED_LIST_CONSTRUCTOR, new FunctionTypeInferer() {
+            public void infer(ILogicalExpression expr, IFunctionDescriptor fd, IVariableTypeEnvironment context) throws AlgebricksException {
+                ((UnorderedListConstructorDescriptor) fd).reset((AUnorderedListType) context.getType(expr));
+            }
+        });
+        functionTypeInferers.put(AsterixBuiltinFunctions.FIELD_ACCESS_BY_INDEX, new FunctionTypeInferer() {
+            public void infer(ILogicalExpression expr, IFunctionDescriptor fd, IVariableTypeEnvironment context) throws AlgebricksException {
+                AbstractFunctionCallExpression fce = (AbstractFunctionCallExpression) expr;
+                IAType t = (IAType) context.getType(fce.getArguments().get(0).getValue());
+                switch (t.getTypeTag()) {
+                    case RECORD: {
+                        ARecordType recType = (ARecordType) t;
+                        ((FieldAccessByIndexDescriptor) fd).reset(recType);
+                        break;
+                    }
+                    case UNION: {
+                        AUnionType unionT = (AUnionType) t;
+                        if (unionT.isNullableType()) {
+                            IAType t2 = unionT.getNullableType();
+                            if (t2.getTypeTag() == ATypeTag.RECORD) {
+                                ARecordType recType = (ARecordType) t2;
+                                ((FieldAccessByIndexDescriptor) fd).reset(recType);
+                                break;
+                            }
+                        }
+                        throw new NotImplementedException("field-access-by-index for data of type " + t);
+                    }
+                    default: {
+                        throw new NotImplementedException("field-access-by-index for data of type " + t);
+                    }
+                }
+            }
+        });
+        functionTypeInferers.put(AsterixBuiltinFunctions.FIELD_ACCESS_NESTED, new FunctionTypeInferer() {
+            public void infer(ILogicalExpression expr, IFunctionDescriptor fd, IVariableTypeEnvironment context) throws AlgebricksException {
+                AbstractFunctionCallExpression fce = (AbstractFunctionCallExpression) expr;
+                IAType t = (IAType) context.getType(fce.getArguments().get(0).getValue());
+                AOrderedList fieldPath = (AOrderedList) (((AsterixConstantValue) ((ConstantExpression) fce.getArguments()
+                        .get(1).getValue()).getValue()).getObject());
+                List<String> listFieldPath = new ArrayList<String>();
+                for (int i = 0; i < fieldPath.size(); i++) {
+                    listFieldPath.add(((AString) fieldPath.getItem(i)).getStringValue());
+                }
+
+                switch (t.getTypeTag()) {
+                    case RECORD: {
+                        ARecordType recType = (ARecordType) t;
+                        ((FieldAccessNestedDescriptor) fd).reset(recType, listFieldPath);
+                        break;
+                    }
+                    default: {
+                        throw new NotImplementedException("field-access-nested for data of type " + t);
+                    }
+                }
+            }
+        });
+        functionTypeInferers.put(AsterixBuiltinFunctions.GET_RECORD_FIELDS, new FunctionTypeInferer() {
+            public void infer(ILogicalExpression expr, IFunctionDescriptor fd, IVariableTypeEnvironment context) throws AlgebricksException {
+                AbstractFunctionCallExpression fce = (AbstractFunctionCallExpression) expr;
+                IAType t = (IAType) context.getType(fce.getArguments().get(0).getValue());
+                if (t.getTypeTag().equals(ATypeTag.RECORD)) {
+                    ARecordType recType = (ARecordType) t;
+                    ((GetRecordFieldsDescriptor) fd).reset(recType);
+                } else {
+                    throw new NotImplementedException("get-record-fields for data of type " + t);
+                }
+            }
+        });
+        functionTypeInferers.put(AsterixBuiltinFunctions.GET_RECORD_FIELD_VALUE, new FunctionTypeInferer() {
+            public void infer(ILogicalExpression expr, IFunctionDescriptor fd, IVariableTypeEnvironment context) throws AlgebricksException {
+                AbstractFunctionCallExpression fce = (AbstractFunctionCallExpression) expr;
+                IAType t = (IAType) context.getType(fce.getArguments().get(0).getValue());
+                if (t.getTypeTag().equals(ATypeTag.RECORD)) {
+                    ARecordType recType = (ARecordType) t;
+                    ((GetRecordFieldValueDescriptor) fd).reset(recType);
+                } else {
+                    throw new NotImplementedException("get-record-field-value for data of type " + t);
+                }
+            }
+        });
     }
 
     @Override