[NO ISSUE] Allow MetadataProvider config to store non String values

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
- In many cases, we would like to associate a value with a key
  in MetadataProvider to be accessed during the compilation
  of jobs. However, currently, there is no place to store
  such values, so we ended up storing them in the config
  map.
- The config map is a <String, String> map and so, we would
  write our values as a string and then parse them when needed.
- To avoid this, and to avoid introducing a new map, we simply
  change the config stored in MetadataProvider from <String,String>
  to <String, Object>.

Change-Id: I55b392ad199d74b0f3cffdc38b54593b12ec1a06
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2795
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Integration-Tests: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/FuzzyUtils.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/FuzzyUtils.java
index 503a631..d7e05b3 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/FuzzyUtils.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/FuzzyUtils.java
@@ -60,7 +60,7 @@
     }
 
     public static IAObject getSimThreshold(MetadataProvider metadata, String simFuncName) {
-        String simThresholValue = metadata.getPropertyValue(SIM_THRESHOLD_PROP_NAME);
+        String simThresholValue = metadata.getProperty(SIM_THRESHOLD_PROP_NAME);
         IAObject ret = null;
         if (simFuncName.equals(JACCARD_FUNCTION_NAME)) {
             if (simThresholValue != null) {
@@ -103,7 +103,7 @@
 
     public static float getSimThreshold(MetadataProvider metadata) {
         float simThreshold = JACCARD_DEFAULT_SIM_THRESHOLD;
-        String simThresholValue = metadata.getPropertyValue(SIM_THRESHOLD_PROP_NAME);
+        String simThresholValue = metadata.getProperty(SIM_THRESHOLD_PROP_NAME);
         if (simThresholValue != null) {
             simThreshold = Float.parseFloat(simThresholValue);
         }
@@ -112,7 +112,7 @@
 
     // TODO: The default function depend on the input types.
     public static String getSimFunction(MetadataProvider metadata) {
-        String simFunction = metadata.getPropertyValue(SIM_FUNCTION_PROP_NAME);
+        String simFunction = metadata.getProperty(SIM_FUNCTION_PROP_NAME);
         if (simFunction == null) {
             simFunction = DEFAULT_SIM_FUNCTION;
         }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodUtils.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodUtils.java
index 728aef6..c925b55 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodUtils.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodUtils.java
@@ -978,9 +978,9 @@
         }
 
         // Gets all variables from the right (inner) branch.
-        VariableUtilities.getLiveVariables((ILogicalOperator) subTree.getRootRef().getValue(), liveVarsInSubTreeRootOp);
+        VariableUtilities.getLiveVariables(subTree.getRootRef().getValue(), liveVarsInSubTreeRootOp);
         // Gets the used variables from the SELECT or JOIN operator.
-        VariableUtilities.getUsedVariables((ILogicalOperator) topOpRef.getValue(), usedVarsInTopOp);
+        VariableUtilities.getUsedVariables(topOpRef.getValue(), usedVarsInTopOp);
         // Excludes the variables in the condition from the outer branch - in join case.
         for (Iterator<LogicalVariable> iterator = usedVarsInTopOp.iterator(); iterator.hasNext();) {
             LogicalVariable v = iterator.next();
@@ -1066,7 +1066,7 @@
         if (afterTopOpRefs != null) {
             for (Mutable<ILogicalOperator> afterTopOpRef : afterTopOpRefs) {
                 varsTmpSet.clear();
-                OperatorPropertiesUtil.getFreeVariablesInOp((ILogicalOperator) afterTopOpRef.getValue(), varsTmpSet);
+                OperatorPropertiesUtil.getFreeVariablesInOp(afterTopOpRef.getValue(), varsTmpSet);
                 copyVarsToAnotherList(varsTmpSet, usedVarsAfterTopOp);
             }
         }
@@ -1210,7 +1210,7 @@
         // For the index-nested-loop join case,
         // we propagate all variables that come from the outer relation and are used after join operator.
         // Adds the variables that are both live after JOIN and used after the JOIN operator.
-        VariableUtilities.getLiveVariables((ILogicalOperator) topOpRef.getValue(), liveVarsAfterTopOp);
+        VariableUtilities.getLiveVariables(topOpRef.getValue(), liveVarsAfterTopOp);
         for (LogicalVariable v : usedVarsAfterTopOp) {
             if (!liveVarsAfterTopOp.contains(v) || findVarInTripleVarList(unionVarMap, v, false)) {
                 continue;
@@ -1223,8 +1223,7 @@
         // Replaces the original variables in the operators after the SELECT or JOIN operator to satisfy SSA.
         if (afterTopOpRefs != null) {
             for (Mutable<ILogicalOperator> afterTopOpRef : afterTopOpRefs) {
-                VariableUtilities.substituteVariables((ILogicalOperator) afterTopOpRef.getValue(),
-                        origVarToOutputVarMap, context);
+                VariableUtilities.substituteVariables(afterTopOpRef.getValue(), origVarToOutputVarMap, context);
             }
         }
 
@@ -1808,7 +1807,7 @@
         List<LogicalVariable> dataScanRecordVars = new ArrayList<>();
 
         // Collects the used variables in the given select (join) operator.
-        VariableUtilities.getUsedVariables((ILogicalOperator) topRef.getValue(), usedVarsInSelJoinOpTemp);
+        VariableUtilities.getUsedVariables(topRef.getValue(), usedVarsInSelJoinOpTemp);
 
         // Removes the duplicated variables that are used in the select (join) operator
         // in case where the variable is used multiple times in the operator's expression.
@@ -1841,10 +1840,8 @@
         List<LogicalVariable> liveVarsInSubTreeRootOp = new ArrayList<>();
         List<LogicalVariable> producedVarsInSubTreeRootOp = new ArrayList<>();
 
-        VariableUtilities.getLiveVariables((ILogicalOperator) indexSubTree.getRootRef().getValue(),
-                liveVarsInSubTreeRootOp);
-        VariableUtilities.getProducedVariables((ILogicalOperator) indexSubTree.getRootRef().getValue(),
-                producedVarsInSubTreeRootOp);
+        VariableUtilities.getLiveVariables(indexSubTree.getRootRef().getValue(), liveVarsInSubTreeRootOp);
+        VariableUtilities.getProducedVariables(indexSubTree.getRootRef().getValue(), producedVarsInSubTreeRootOp);
 
         copyVarsToAnotherList(liveVarsInSubTreeRootOp, liveVarsAfterSelJoinOp);
         copyVarsToAnotherList(producedVarsInSubTreeRootOp, liveVarsAfterSelJoinOp);
@@ -2039,10 +2036,8 @@
         for (Mutable<ILogicalOperator> afterSelectRef : afterSelectOpRefs) {
             usedVarsAfterSelectOrJoinOp.clear();
             producedVarsAfterSelectOrJoinOp.clear();
-            VariableUtilities.getUsedVariables((ILogicalOperator) afterSelectRef.getValue(),
-                    usedVarsAfterSelectOrJoinOp);
-            VariableUtilities.getProducedVariables((ILogicalOperator) afterSelectRef.getValue(),
-                    producedVarsAfterSelectOrJoinOp);
+            VariableUtilities.getUsedVariables(afterSelectRef.getValue(), usedVarsAfterSelectOrJoinOp);
+            VariableUtilities.getProducedVariables(afterSelectRef.getValue(), producedVarsAfterSelectOrJoinOp);
             // Checks whether COUNT exists in the given plan since we can substitute record variable
             // with the PK variable as an optimization because COUNT(record) is equal to COUNT(PK).
             // For this case only, we can replace the record variable with the PK variable.
@@ -2051,14 +2046,13 @@
                 aggOp = (AggregateOperator) afterSelectRefOp;
                 condExprs = aggOp.getExpressions();
                 for (int i = 0; i < condExprs.size(); i++) {
-                    condExpr = (ILogicalExpression) condExprs.get(i).getValue();
+                    condExpr = condExprs.get(i).getValue();
                     if (condExpr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
                         condExprFnCall = (AbstractFunctionCallExpression) condExpr;
                         if (condExprFnCall.getFunctionIdentifier() == BuiltinFunctions.COUNT) {
                             // COUNT found. count on the record ($$0) can be replaced as the PK variable.
                             countAggFunctionIsUsedInThePlan = true;
-                            VariableUtilities.getUsedVariables((ILogicalOperator) afterSelectRef.getValue(),
-                                    usedVarsInCount);
+                            VariableUtilities.getUsedVariables(afterSelectRef.getValue(), usedVarsInCount);
                             break;
                         }
                     }
@@ -2187,7 +2181,7 @@
         }
 
         if (matchedFuncExprs.size() == 1) {
-            condExpr = (ILogicalExpression) optFuncExpr.getFuncExpr();
+            condExpr = optFuncExpr.getFuncExpr();
             condExprFnCall = (AbstractFunctionCallExpression) condExpr;
             for (int i = 0; i < condExprFnCall.getArguments().size(); i++) {
                 Mutable<ILogicalExpression> expr = condExprFnCall.getArguments().get(i);
@@ -2372,7 +2366,7 @@
                 AssignOperator assignOp = (AssignOperator) assignUnnestOp;
                 condExprs = assignOp.getExpressions();
                 for (int i = 0; i < condExprs.size(); i++) {
-                    condExpr = (ILogicalExpression) condExprs.get(i).getValue();
+                    condExpr = condExprs.get(i).getValue();
                     if (condExpr.getExpressionTag() == LogicalExpressionTag.CONSTANT
                             && !targetVars.contains(assignOp.getVariables().get(i))) {
                         targetVars.add(assignOp.getVariables().get(i));
@@ -2400,13 +2394,13 @@
             case LEFT_OUTER_UNNEST_MAP:
                 return topOp;
             case UNIONALL:
-                dataSourceOp = (ILogicalOperator) dataSourceOp.getInputs().get(0).getValue();
+                dataSourceOp = dataSourceOp.getInputs().get(0).getValue();
                 // Index-only plan case:
                 // The order of operators: 7 unionall <- 6 select <- 5 assign?
                 // <- 4 unnest-map (PIdx) <- 3 split <- 2 unnest-map (SIdx) <- ...
                 // We do this to skip the primary index-search since we are looking for a secondary index-search here.
                 do {
-                    dataSourceOp = (ILogicalOperator) dataSourceOp.getInputs().get(0).getValue();
+                    dataSourceOp = dataSourceOp.getInputs().get(0).getValue();
                 } while (dataSourceOp.getOperatorTag() != LogicalOperatorTag.SPLIT && dataSourceOp.hasInputs());
 
                 if (dataSourceOp.getOperatorTag() != LogicalOperatorTag.SPLIT) {
@@ -2415,7 +2409,7 @@
                 }
 
                 do {
-                    dataSourceOp = (ILogicalOperator) dataSourceOp.getInputs().get(0).getValue();
+                    dataSourceOp = dataSourceOp.getInputs().get(0).getValue();
                 } while (dataSourceOp.getOperatorTag() != LogicalOperatorTag.UNNEST_MAP
                         && dataSourceOp.getOperatorTag() != LogicalOperatorTag.LEFT_OUTER_UNNEST_MAP
                         && dataSourceOp.hasInputs());
@@ -2532,9 +2526,10 @@
      *         false otherwise.
      */
     public static boolean getNoIndexOnlyOption(IOptimizationContext context) {
-        Map<String, String> config = context.getMetadataProvider().getConfig();
+        Map<String, Object> config = context.getMetadataProvider().getConfig();
         if (config.containsKey(AbstractIntroduceAccessMethodRule.NO_INDEX_ONLY_PLAN_OPTION)) {
-            return Boolean.parseBoolean(config.get(AbstractIntroduceAccessMethodRule.NO_INDEX_ONLY_PLAN_OPTION));
+            return Boolean
+                    .parseBoolean((String) config.get(AbstractIntroduceAccessMethodRule.NO_INDEX_ONLY_PLAN_OPTION));
         }
         return AbstractIntroduceAccessMethodRule.NO_INDEX_ONLY_PLAN_OPTION_DEFAULT_VALUE;
     }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index 1a88170..ca96a96 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -129,6 +129,7 @@
     private static final ObjectWriter OBJECT_WRITER = new ObjectMapper().writerWithDefaultPrettyPrinter();
 
     // A white list of supported configurable parameters.
+    public static final String PREFIX_INTERNAL_PARAMETERS = "_internal";
     private static final Set<String> CONFIGURABLE_PARAMETER_NAMES =
             ImmutableSet.of(CompilerProperties.COMPILER_JOINMEMORY_KEY, CompilerProperties.COMPILER_GROUPMEMORY_KEY,
                     CompilerProperties.COMPILER_SORTMEMORY_KEY, CompilerProperties.COMPILER_TEXTSEARCHMEMORY_KEY,
@@ -217,7 +218,7 @@
             generateLogicalPlan(plan, output.config().getPlanFormat());
         }
         CompilerProperties compilerProperties = metadataProvider.getApplicationContext().getCompilerProperties();
-        Map<String, String> querySpecificConfig = validateConfig(metadataProvider.getConfig(), sourceLoc);
+        Map<String, Object> querySpecificConfig = validateConfig(metadataProvider.getConfig(), sourceLoc);
         final PhysicalOptimizationConfig physOptConf =
                 getPhysicalOptimizationConfig(compilerProperties, querySpecificConfig, sourceLoc);
 
@@ -235,7 +236,7 @@
         builder.setMissableTypeComputer(MissableTypeComputer.INSTANCE);
         builder.setConflictingTypeResolver(ConflictingTypeResolver.INSTANCE);
 
-        int parallelism = getParallelism(querySpecificConfig.get(CompilerProperties.COMPILER_PARALLELISM_KEY),
+        int parallelism = getParallelism((String) querySpecificConfig.get(CompilerProperties.COMPILER_PARALLELISM_KEY),
                 compilerProperties.getParallelism());
         AlgebricksAbsolutePartitionConstraint computationLocations =
                 chooseLocations(clusterInfoCollector, parallelism, metadataProvider.getClusterLocations());
@@ -308,19 +309,19 @@
     }
 
     protected PhysicalOptimizationConfig getPhysicalOptimizationConfig(CompilerProperties compilerProperties,
-            Map<String, String> querySpecificConfig, SourceLocation sourceLoc) throws AlgebricksException {
+            Map<String, Object> querySpecificConfig, SourceLocation sourceLoc) throws AlgebricksException {
         int frameSize = compilerProperties.getFrameSize();
         int sortFrameLimit = getFrameLimit(CompilerProperties.COMPILER_SORTMEMORY_KEY,
-                querySpecificConfig.get(CompilerProperties.COMPILER_SORTMEMORY_KEY),
+                (String) querySpecificConfig.get(CompilerProperties.COMPILER_SORTMEMORY_KEY),
                 compilerProperties.getSortMemorySize(), frameSize, MIN_FRAME_LIMIT_FOR_SORT, sourceLoc);
         int groupFrameLimit = getFrameLimit(CompilerProperties.COMPILER_GROUPMEMORY_KEY,
-                querySpecificConfig.get(CompilerProperties.COMPILER_GROUPMEMORY_KEY),
+                (String) querySpecificConfig.get(CompilerProperties.COMPILER_GROUPMEMORY_KEY),
                 compilerProperties.getGroupMemorySize(), frameSize, MIN_FRAME_LIMIT_FOR_GROUP_BY, sourceLoc);
         int joinFrameLimit = getFrameLimit(CompilerProperties.COMPILER_JOINMEMORY_KEY,
-                querySpecificConfig.get(CompilerProperties.COMPILER_JOINMEMORY_KEY),
+                (String) querySpecificConfig.get(CompilerProperties.COMPILER_JOINMEMORY_KEY),
                 compilerProperties.getJoinMemorySize(), frameSize, MIN_FRAME_LIMIT_FOR_JOIN, sourceLoc);
         int textSearchFrameLimit = getFrameLimit(CompilerProperties.COMPILER_TEXTSEARCHMEMORY_KEY,
-                querySpecificConfig.get(CompilerProperties.COMPILER_TEXTSEARCHMEMORY_KEY),
+                (String) querySpecificConfig.get(CompilerProperties.COMPILER_TEXTSEARCHMEMORY_KEY),
                 compilerProperties.getTextSearchMemorySize(), frameSize, MIN_FRAME_LIMIT_FOR_TEXTSEARCH, sourceLoc);
         final PhysicalOptimizationConfig physOptConf = OptimizationConfUtil.getPhysicalOptimizationConfig();
         physOptConf.setFrameSize(frameSize);
@@ -482,10 +483,11 @@
     }
 
     // Validates if the query contains unsupported query parameters.
-    private static Map<String, String> validateConfig(Map<String, String> config, SourceLocation sourceLoc)
+    private static Map<String, Object> validateConfig(Map<String, Object> config, SourceLocation sourceLoc)
             throws AlgebricksException {
         for (String parameterName : config.keySet()) {
-            if (!CONFIGURABLE_PARAMETER_NAMES.contains(parameterName)) {
+            if (!CONFIGURABLE_PARAMETER_NAMES.contains(parameterName)
+                    && !parameterName.startsWith(PREFIX_INTERNAL_PARAMETERS)) {
                 throw AsterixException.create(ErrorCode.COMPILATION_UNSUPPORTED_QUERY_PARAMETER, sourceLoc,
                         parameterName);
             }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
index 99ed42d..d4a52fb 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
@@ -82,7 +82,8 @@
         // Note: The current implementation of the wait for completion flag is problematic due to locking issues:
         // Locks obtained during the start of the feed are not released, and so, the feed can't be stopped
         // and also, read locks over dataverses, datasets, etc, are never released.
-        boolean wait = Boolean.parseBoolean(metadataProvider.getConfig().get(StartFeedStatement.WAIT_FOR_COMPLETION));
+        boolean wait =
+                Boolean.parseBoolean((String) metadataProvider.getConfig().get(StartFeedStatement.WAIT_FOR_COMPLETION));
         if (wait) {
             IActiveEntityEventSubscriber stoppedSubscriber =
                     new WaitForStateSubscriber(this, EnumSet.of(ActivityState.STOPPED));
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java
index e9087c2..bb80406 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java
@@ -83,7 +83,7 @@
         String outputType = ConstantExpressionUtil.getStringArgument(f, 5);
         MetadataProvider metadataProvider = (MetadataProvider) context.getMetadataProvider();
         DataSourceId asid = new DataSourceId(dataverse, getTargetFeed);
-        String policyName = metadataProvider.getConfig().get(FeedActivityDetails.FEED_POLICY_NAME);
+        String policyName = (String) metadataProvider.getConfig().get(FeedActivityDetails.FEED_POLICY_NAME);
         FeedPolicyEntity policy = metadataProvider.findFeedPolicy(dataverse, policyName);
         if (policy == null) {
             policy = BuiltinFeedPolicies.getFeedPolicy(policyName);
@@ -93,7 +93,7 @@
             }
         }
         ArrayList<LogicalVariable> feedDataScanOutputVariables = new ArrayList<>();
-        String csLocations = metadataProvider.getConfig().get(FeedActivityDetails.COLLECT_LOCATIONS);
+        String csLocations = (String) metadataProvider.getConfig().get(FeedActivityDetails.COLLECT_LOCATIONS);
         List<LogicalVariable> pkVars = new ArrayList<>();
         FeedDataSource ds = createFeedDataSource(asid, targetDataset, sourceFeedName, subscriptionLocation,
                 metadataProvider, policy, outputType, csLocations, unnest.getVariable(), context, pkVars);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 76116c4..5b4c198 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -421,10 +421,13 @@
         }
     }
 
-    protected void handleSetStatement(Statement stmt, Map<String, String> config) {
+    protected void handleSetStatement(Statement stmt, Map<String, String> config) throws CompilationException {
         SetStatement ss = (SetStatement) stmt;
         String pname = ss.getPropName();
         String pvalue = ss.getPropValue();
+        if (pname.startsWith(APIFramework.PREFIX_INTERNAL_PARAMETERS)) {
+            throw new CompilationException(ErrorCode.ILLEGAL_SET_PARAMETER, pname);
+        }
         config.put(pname, pvalue);
     }
 
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index c1ccda7..b07a30e 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -45,6 +45,7 @@
 import org.apache.asterix.formats.nontagged.TypeTraitProvider;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.bootstrap.MetadataBuiltinEntities;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.Dataverse;
@@ -684,6 +685,9 @@
             int[] keyIndexes, List<Integer> keyIndicators, StorageComponentProvider storageComponentProvider,
             IFrameOperationCallbackFactory frameOpCallbackFactory, boolean hasSecondaries) throws Exception {
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        MetadataProvider mdProvider = new MetadataProvider(
+                (ICcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext(),
+                MetadataBuiltinEntities.DEFAULT_DATAVERSE);
         org.apache.hyracks.algebricks.common.utils.Pair<ILSMMergePolicyFactory, Map<String, String>> mergePolicy =
                 DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
         MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -700,8 +704,9 @@
                 new LSMPrimaryUpsertOperatorNodePushable(ctx, ctx.getTaskAttemptId().getTaskId().getPartition(),
                         indexHelperFactory, primaryIndexInfo.primaryIndexInsertFieldsPermutations,
                         recordDescProvider.getInputRecordDescriptor(new ActivityId(new OperatorDescriptorId(0), 0), 0),
-                        modificationCallbackFactory, searchCallbackFactory, keyIndexes.length, recordType, -1,
-                        frameOpCallbackFactory == null ? dataset.getFrameOpCallbackFactory() : frameOpCallbackFactory,
+                        modificationCallbackFactory, searchCallbackFactory,
+                        keyIndexes.length, recordType, -1, frameOpCallbackFactory == null
+                                ? dataset.getFrameOpCallbackFactory(mdProvider) : frameOpCallbackFactory,
                         MissingWriterFactory.INSTANCE, hasSecondaries);
         RecordDescriptor upsertOutRecDesc = getUpsertOutRecDesc(primaryIndexInfo.rDesc, dataset,
                 filterFields == null ? 0 : filterFields.length, recordType, metaType);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java
index 830307b..e4e300a 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java
@@ -127,7 +127,7 @@
             MetadataProvider metadataProvider = mock(MetadataProvider.class);
 
             @SuppressWarnings("unchecked")
-            Map<String, String> config = mock(Map.class);
+            Map<String, Object> config = mock(Map.class);
             when(metadataProvider.getDefaultDataverseName()).thenReturn(dvName);
             when(metadataProvider.getConfig()).thenReturn(config);
             when(config.get(FunctionUtil.IMPORT_PRIVATE_FUNCTIONS)).thenReturn("true");
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java
index 8f28752..1d7ad2d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java
@@ -50,5 +50,10 @@
         public void close() throws IOException {
             // No Op
         }
+
+        @Override
+        public void fail(Throwable th) {
+            // No Op
+        }
     }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index f570aa8..d8f3f7d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -177,6 +177,7 @@
     public static final int FIELD_NOT_OF_TYPE = 1089;
     public static final int ARRAY_FIELD_ELEMENTS_MUST_BE_OF_TYPE = 1090;
     public static final int COMPILATION_TYPE_MISMATCH_GENERIC = 1091;
+    public static final int ILLEGAL_SET_PARAMETER = 1092;
 
     // Feed errors
     public static final int DATAFLOW_ILLEGAL_STATE = 3001;
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 8c09d75..5c67f4e 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -164,6 +164,7 @@
 1088 = Required field %1$s was not found
 1089 = Field %1$s must be of type %2$s but found to be of type %3$s
 1090 = Field %1$s must be of an array of type %2$s but found to contain an item of type %3$s
+1092 = Parameter %1$s cannot be set
 
 # Feed Errors
 3001 = Illegal state.
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/FunctionUtil.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/FunctionUtil.java
index 1f564a5..6752f77 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/FunctionUtil.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/FunctionUtil.java
@@ -102,7 +102,7 @@
         if (expression == null) {
             return functionDecls;
         }
-        String value = metadataProvider.getConfig().get(FunctionUtil.IMPORT_PRIVATE_FUNCTIONS);
+        String value = (String) metadataProvider.getConfig().get(FunctionUtil.IMPORT_PRIVATE_FUNCTIONS);
         boolean includePrivateFunctions = (value != null) ? Boolean.valueOf(value.toLowerCase()) : false;
         Set<CallExpr> functionCalls = functionCollector.getFunctionCalls(expression);
         for (CallExpr functionCall : functionCalls) {
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/SqlppQueryRewriter.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/SqlppQueryRewriter.java
index 09a9f90..ecd8c8e 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/SqlppQueryRewriter.java
+++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/SqlppQueryRewriter.java
@@ -168,7 +168,7 @@
     }
 
     protected void inlineWithExpressions() throws CompilationException {
-        String inlineWith = metadataProvider.getConfig().get(INLINE_WITH);
+        String inlineWith = (String) metadataProvider.getConfig().get(INLINE_WITH);
         if (inlineWith != null && inlineWith.equalsIgnoreCase(NOT_INLINE_WITH)) {
             return;
         }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index f2be6cd..f3f5c56 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -148,7 +148,7 @@
     private final StorageProperties storageProperties;
     private final IFunctionManager functionManager;
     private final LockList locks;
-    private final Map<String, String> config;
+    private final Map<String, Object> config;
 
     private Dataverse defaultDataverse;
     private MetadataTransactionContext mdTxnCtx;
@@ -173,8 +173,18 @@
         config = new HashMap<>();
     }
 
-    public String getPropertyValue(String propertyName) {
-        return config.get(propertyName);
+    @SuppressWarnings("unchecked")
+    public <T> T getProperty(String name) {
+        return (T) config.get(name);
+    }
+
+    public void setProperty(String name, Object value) {
+        config.put(name, value);
+    }
+
+    @SuppressWarnings("unchecked")
+    public <T> T removeProperty(String name) {
+        return (T) config.remove(name);
     }
 
     public void disableBlockingOperator() {
@@ -186,7 +196,7 @@
     }
 
     @Override
-    public Map<String, String> getConfig() {
+    public Map<String, Object> getConfig() {
         return config;
     }
 
@@ -296,7 +306,7 @@
      */
     public ARecordType findOutputRecordType() throws AlgebricksException {
         return MetadataManagerUtil.findOutputRecordType(mdTxnCtx, getDefaultDataverseName(),
-                getPropertyValue("output-record-type"));
+                getProperty("output-record-type"));
     }
 
     public Dataset findDataset(String dataverse, String dataset) throws AlgebricksException {
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index d0a22b7..8471d45 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -682,7 +682,7 @@
                 datasetPartitions, isSink);
     }
 
-    public IFrameOperationCallbackFactory getFrameOpCallbackFactory() {
+    public IFrameOperationCallbackFactory getFrameOpCallbackFactory(MetadataProvider mdProvider) {
         return NoOpFrameOperationCallbackFactory.INSTANCE;
     }
 
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 6d81145..bbdfadf 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -447,7 +447,7 @@
         RecordDescriptor outputRecordDesc = new RecordDescriptor(outputSerDes, outputTypeTraits);
         op = new LSMPrimaryUpsertOperatorDescriptor(spec, outputRecordDesc, fieldPermutation, idfh,
                 missingWriterFactory, modificationCallbackFactory, searchCallbackFactory,
-                dataset.getFrameOpCallbackFactory(), numKeys, itemType, fieldIdx, hasSecondaries);
+                dataset.getFrameOpCallbackFactory(metadataProvider), numKeys, itemType, fieldIdx, hasSecondaries);
         return new Pair<>(op, splitsAndConstraint.second);
     }
 
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index dba6760..3df1b13 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -256,14 +256,19 @@
 
                 @Override
                 public void frameCompleted() throws HyracksDataException {
-                    callback.frameCompleted();
                     appender.write(writer, true);
+                    callback.frameCompleted();
                 }
 
                 @Override
                 public void close() throws IOException {
                     callback.close();
                 }
+
+                @Override
+                public void fail(Throwable th) {
+                    callback.fail(th);
+                }
             };
         } catch (Throwable e) { // NOSONAR: Re-thrown
             throw HyracksDataException.create(e);
@@ -305,7 +310,12 @@
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         accessor.reset(buffer);
         int itemCount = accessor.getTupleCount();
-        lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback);
+        try {
+            lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback);
+        } catch (Throwable th) {// NOSONAR: Must notify of all failures
+            frameOpCallback.fail(th);
+            throw th;
+        }
         if (itemCount > 0) {
             lastRecordInTimeStamp = System.currentTimeMillis();
         }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index be227ec..c0d18df 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -18,8 +18,6 @@
  */
 package org.apache.asterix.transaction.management.service.logging;
 
-import static org.apache.hyracks.util.ExitUtil.EC_IMMEDIATE_HALT;
-
 import java.io.File;
 import java.io.FilenameFilter;
 import java.io.IOException;
@@ -705,7 +703,7 @@
             }
         } catch (Exception e) {
             LOGGER.log(Level.ERROR, "LogFlusher is terminating abnormally. System is in unusable state; halting", e);
-            ExitUtil.halt(EC_IMMEDIATE_HALT);
+            ExitUtil.halt(ExitUtil.EC_TXN_LOG_FLUSHER_FAILURE);
             throw new AssertionError("not reachable");
         } finally {
             if (interrupted) {
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index ae66ee2..efa9c1c 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -212,5 +212,6 @@
             IVariableTypeEnvironment typeEnv, ILogicalExpression filterExpr, JobGenContext context)
             throws AlgebricksException;
 
-    public Map<String, String> getConfig();
+    public Map<String, Object> getConfig();
+
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index 066b6c1..9ca4bdb 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -580,7 +580,7 @@
                 }
                 case UNORDERED_PARTITIONED: {
                     List<LogicalVariable> varList = new ArrayList<>(((UnorderedPartitionedProperty) pp).getColumnSet());
-                    String hashMergeHint = context.getMetadataProvider().getConfig().get(HASH_MERGE);
+                    String hashMergeHint = (String) context.getMetadataProvider().getConfig().get(HASH_MERGE);
                     if (hashMergeHint == null || !hashMergeHint.equalsIgnoreCase(TRUE_CONSTANT)) {
                         pop = new HashPartitionExchangePOperator(varList, domain);
                         break;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
index 627e972..77623c2 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
@@ -73,7 +73,7 @@
         }
         if (registrationException != null) {
             LOGGER.fatal("Registering with {} failed with exception", this, registrationException);
-            ExitUtil.halt(ExitUtil.EC_IMMEDIATE_HALT);
+            ExitUtil.halt(ExitUtil.EC_NODE_REGISTRATION_FAILURE);
         }
         return getCcId();
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
index 040fe03..cde26d5 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
@@ -18,8 +18,6 @@
  */
 package org.apache.hyracks.ipc.impl;
 
-import static org.apache.hyracks.util.ExitUtil.EC_IMMEDIATE_HALT;
-
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
@@ -344,7 +342,7 @@
                 }
             } catch (Exception e) {
                 LOGGER.fatal("Unrecoverable networking failure; Halting...", e);
-                ExitUtil.halt(EC_IMMEDIATE_HALT);
+                ExitUtil.halt(ExitUtil.EC_NETWORK_FAILURE);
             }
             return false;
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java
index df78c53..78e5862 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java
@@ -33,4 +33,11 @@
      * @throws HyracksDataException
      */
     void frameCompleted() throws HyracksDataException;
+
+    /**
+     * Called when the task has failed.
+     *
+     * @param th
+     */
+    void fail(Throwable th);
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
index f7c401a..75865b9d 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
@@ -42,6 +42,9 @@
     public static final int EC_UNHANDLED_EXCEPTION = 11;
     public static final int EC_FAILED_TO_DELETE_CORRUPTED_RESOURCES = 12;
     public static final int EC_ERROR_CREATING_RESOURCES = 13;
+    public static final int EC_TXN_LOG_FLUSHER_FAILURE = 14;
+    public static final int EC_NODE_REGISTRATION_FAILURE = 15;
+    public static final int EC_NETWORK_FAILURE = 16;
     public static final int EC_FAILED_TO_CANCEL_ACTIVE_START_STOP = 22;
     public static final int EC_IMMEDIATE_HALT = 33;
     public static final int EC_HALT_ABNORMAL_RESERVED_44 = 44;