[NO ISSUE] Readability improvements
Change-Id: I8b27805be1668fe6591c442fcd44020a418c2931
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2333
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Integration-Tests: Murtadha Hubail <mhubail@apache.org>
Tested-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index 0f91275..a18277e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -20,7 +20,6 @@
import java.io.IOException;
import java.io.PrintWriter;
-import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -95,6 +94,7 @@
import org.apache.hyracks.algebricks.core.rewriter.base.AlgebricksOptimizationContext;
import org.apache.hyracks.algebricks.core.rewriter.base.IOptimizationContextFactory;
import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import org.apache.hyracks.algebricks.data.IPrinterFactoryProvider;
import org.apache.hyracks.api.client.IClusterInfoCollector;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.client.NodeControllerInfo;
@@ -106,6 +106,7 @@
import org.apache.hyracks.control.common.config.OptionTypes;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.common.collect.ImmutableSet;
/**
@@ -201,63 +202,48 @@
}
public JobSpecification compileQuery(IClusterInfoCollector clusterInfoCollector, MetadataProvider metadataProvider,
- Query rwQ, int varCounter, String outputDatasetName, SessionOutput output, ICompiledDmlStatement statement)
- throws AlgebricksException, RemoteException, ACIDException {
+ Query query, int varCounter, String outputDatasetName, SessionOutput output,
+ ICompiledDmlStatement statement) throws AlgebricksException, ACIDException {
+
+ // establish facts
+ final boolean isQuery = query != null;
+ final boolean isLoad = statement != null && statement.getKind() == Statement.Kind.LOAD;
SessionConfig conf = output.config();
if (!conf.is(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS) && conf.is(SessionConfig.OOB_REWRITTEN_EXPR_TREE)) {
output.out().println();
printPlanPrefix(output, "Rewritten expression tree");
- if (rwQ != null) {
- rwQ.accept(astPrintVisitorFactory.createLangVisitor(output.out()), 0);
+ if (isQuery) {
+ query.accept(astPrintVisitorFactory.createLangVisitor(output.out()), 0);
}
printPlanPostfix(output);
}
- TxnId txnId = TxnIdFactory.create();
+ final TxnId txnId = TxnIdFactory.create();
metadataProvider.setTxnId(txnId);
ILangExpressionToPlanTranslator t =
translatorFactory.createExpressionToPlanTranslator(metadataProvider, varCounter);
- ILogicalPlan plan;
- // statement = null when it's a query
- if (statement == null || statement.getKind() != Statement.Kind.LOAD) {
- plan = t.translate(rwQ, outputDatasetName, statement);
- } else {
- plan = t.translateLoad(statement);
- }
+ ILogicalPlan plan = isLoad ? t.translateLoad(statement) : t.translate(query, outputDatasetName, statement);
if (!conf.is(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS) && conf.is(SessionConfig.OOB_LOGICAL_PLAN)) {
output.out().println();
printPlanPrefix(output, "Logical plan");
- if (rwQ != null || (statement != null && statement.getKind() == Statement.Kind.LOAD)) {
+ if (isQuery || isLoad) {
PlanPrettyPrinter.printPlan(plan, getPrettyPrintVisitor(output.config().getLpfmt(), output.out()), 0);
}
printPlanPostfix(output);
}
CompilerProperties compilerProperties = metadataProvider.getApplicationContext().getCompilerProperties();
- int frameSize = compilerProperties.getFrameSize();
- Map<String, String> querySpecificConfig = metadataProvider.getConfig();
- validateConfig(querySpecificConfig); // Validates the user-overridden query parameters.
- int sortFrameLimit = getFrameLimit(CompilerProperties.COMPILER_SORTMEMORY_KEY,
- querySpecificConfig.get(CompilerProperties.COMPILER_SORTMEMORY_KEY),
- compilerProperties.getSortMemorySize(), frameSize, MIN_FRAME_LIMIT_FOR_SORT);
- int groupFrameLimit = getFrameLimit(CompilerProperties.COMPILER_GROUPMEMORY_KEY,
- querySpecificConfig.get(CompilerProperties.COMPILER_GROUPMEMORY_KEY),
- compilerProperties.getGroupMemorySize(), frameSize, MIN_FRAME_LIMIT_FOR_GROUP_BY);
- int joinFrameLimit = getFrameLimit(CompilerProperties.COMPILER_JOINMEMORY_KEY,
- querySpecificConfig.get(CompilerProperties.COMPILER_JOINMEMORY_KEY),
- compilerProperties.getJoinMemorySize(), frameSize, MIN_FRAME_LIMIT_FOR_JOIN);
- OptimizationConfUtil.getPhysicalOptimizationConfig().setFrameSize(frameSize);
- OptimizationConfUtil.getPhysicalOptimizationConfig().setMaxFramesExternalSort(sortFrameLimit);
- OptimizationConfUtil.getPhysicalOptimizationConfig().setMaxFramesExternalGroupBy(groupFrameLimit);
- OptimizationConfUtil.getPhysicalOptimizationConfig().setMaxFramesForJoin(joinFrameLimit);
+ Map<String, String> querySpecificConfig = validateConfig(metadataProvider.getConfig());
+ final PhysicalOptimizationConfig physOptConf =
+ getPhysicalOptimizationConfig(compilerProperties, querySpecificConfig);
HeuristicCompilerFactoryBuilder builder =
new HeuristicCompilerFactoryBuilder(OptimizationContextFactory.INSTANCE);
- builder.setPhysicalOptimizationConfig(OptimizationConfUtil.getPhysicalOptimizationConfig());
+ builder.setPhysicalOptimizationConfig(physOptConf);
builder.setLogicalRewrites(ruleSetFactory.getLogicalRewrites(metadataProvider.getApplicationContext()));
builder.setPhysicalRewrites(ruleSetFactory.getPhysicalRewrites(metadataProvider.getApplicationContext()));
IDataFormat format = metadataProvider.getDataFormat();
@@ -285,7 +271,7 @@
PlanPrettyPrinter.printPhysicalOps(plan, buffer, 0);
} else {
printPlanPrefix(output, "Optimized logical plan");
- if (rwQ != null || (statement != null && statement.getKind() == Statement.Kind.LOAD)) {
+ if (isQuery || isLoad) {
PlanPrettyPrinter.printPlan(plan,
getPrettyPrintVisitor(output.config().getLpfmt(), output.out()), 0);
}
@@ -293,7 +279,7 @@
}
}
}
- if (rwQ != null && rwQ.isExplain()) {
+ if (isQuery && query.isExplain()) {
try {
LogicalOperatorPrettyPrintVisitor pvisitor = new LogicalOperatorPrettyPrintVisitor();
PlanPrettyPrinter.printPlan(plan, pvisitor, 0);
@@ -318,25 +304,7 @@
builder.setHashFunctionFamilyProvider(format.getBinaryHashFunctionFamilyProvider());
builder.setMissingWriterFactory(format.getMissingWriterFactory());
builder.setPredicateEvaluatorFactoryProvider(format.getPredicateEvaluatorFactoryProvider());
-
- final SessionConfig.OutputFormat outputFormat = conf.fmt();
- switch (outputFormat) {
- case LOSSLESS_JSON:
- builder.setPrinterProvider(format.getLosslessJSONPrinterFactoryProvider());
- break;
- case CSV:
- builder.setPrinterProvider(format.getCSVPrinterFactoryProvider());
- break;
- case ADM:
- builder.setPrinterProvider(format.getADMPrinterFactoryProvider());
- break;
- case CLEAN_JSON:
- builder.setPrinterProvider(format.getCleanJSONPrinterFactoryProvider());
- break;
- default:
- throw new AlgebricksException("Unexpected OutputFormat: " + outputFormat);
- }
-
+ builder.setPrinterProvider(getPrinterFactoryProvider(format, conf.fmt()));
builder.setSerializerDeserializerProvider(format.getSerdeProvider());
builder.setTypeTraitProvider(format.getTypeTraitProvider());
builder.setNormalizedKeyComputerFactoryProvider(format.getNormalizedKeyComputerFactoryProvider());
@@ -345,24 +313,66 @@
new JobEventListenerFactory(txnId, metadataProvider.isWriteTransaction());
JobSpecification spec = compiler.createJob(metadataProvider.getApplicationContext(), jobEventListenerFactory);
- // When the top-level statement is a query, the statement parameter is null.
- if (statement == null) {
+ if (isQuery) {
// Sets a required capacity, only for read-only queries.
// DDLs and DMLs are considered not that frequent.
// limit the computation locations to the locations that will be used in the query
- final AlgebricksAbsolutePartitionConstraint jobLocations = getJobLocations(spec,
- metadataProvider.getApplicationContext().getNodeJobTracker(), computationLocations);
- final IClusterCapacity jobRequiredCapacity = ResourceUtils.getRequiredCapacity(plan, jobLocations,
- sortFrameLimit, groupFrameLimit, joinFrameLimit, frameSize);
+ final INodeJobTracker nodeJobTracker = metadataProvider.getApplicationContext().getNodeJobTracker();
+ final AlgebricksAbsolutePartitionConstraint jobLocations =
+ getJobLocations(spec, nodeJobTracker, computationLocations);
+ final IClusterCapacity jobRequiredCapacity =
+ ResourceUtils.getRequiredCapacity(plan, jobLocations, physOptConf);
spec.setRequiredClusterCapacity(jobRequiredCapacity);
}
+ printJobSpec(query, spec, conf, output);
+ return spec;
+ }
+
+ protected PhysicalOptimizationConfig getPhysicalOptimizationConfig(CompilerProperties compilerProperties,
+ Map<String, String> querySpecificConfig) throws AlgebricksException {
+ int frameSize = compilerProperties.getFrameSize();
+ int sortFrameLimit = getFrameLimit(CompilerProperties.COMPILER_SORTMEMORY_KEY,
+ querySpecificConfig.get(CompilerProperties.COMPILER_SORTMEMORY_KEY),
+ compilerProperties.getSortMemorySize(), frameSize, MIN_FRAME_LIMIT_FOR_SORT);
+ int groupFrameLimit = getFrameLimit(CompilerProperties.COMPILER_GROUPMEMORY_KEY,
+ querySpecificConfig.get(CompilerProperties.COMPILER_GROUPMEMORY_KEY),
+ compilerProperties.getGroupMemorySize(), frameSize, MIN_FRAME_LIMIT_FOR_GROUP_BY);
+ int joinFrameLimit = getFrameLimit(CompilerProperties.COMPILER_JOINMEMORY_KEY,
+ querySpecificConfig.get(CompilerProperties.COMPILER_JOINMEMORY_KEY),
+ compilerProperties.getJoinMemorySize(), frameSize, MIN_FRAME_LIMIT_FOR_JOIN);
+ final PhysicalOptimizationConfig physOptConf = OptimizationConfUtil.getPhysicalOptimizationConfig();
+ physOptConf.setFrameSize(frameSize);
+ physOptConf.setMaxFramesExternalSort(sortFrameLimit);
+ physOptConf.setMaxFramesExternalGroupBy(groupFrameLimit);
+ physOptConf.setMaxFramesForJoin(joinFrameLimit);
+ return physOptConf;
+ }
+
+ protected IPrinterFactoryProvider getPrinterFactoryProvider(IDataFormat format,
+ SessionConfig.OutputFormat outputFormat) throws AlgebricksException {
+ switch (outputFormat) {
+ case LOSSLESS_JSON:
+ return format.getLosslessJSONPrinterFactoryProvider();
+ case CSV:
+ return format.getCSVPrinterFactoryProvider();
+ case ADM:
+ return format.getADMPrinterFactoryProvider();
+ case CLEAN_JSON:
+ return format.getCleanJSONPrinterFactoryProvider();
+ default:
+ throw new AlgebricksException("Unexpected OutputFormat: " + outputFormat);
+ }
+ }
+
+ protected void printJobSpec(Query rwQ, JobSpecification spec, SessionConfig conf, SessionOutput output)
+ throws AlgebricksException {
if (conf.is(SessionConfig.OOB_HYRACKS_JOB)) {
printPlanPrefix(output, "Hyracks job");
if (rwQ != null) {
try {
- output.out().println(
- new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(spec.toJSON()));
+ final ObjectWriter objectWriter = new ObjectMapper().writerWithDefaultPrettyPrinter();
+ output.out().println(objectWriter.writeValueAsString(spec.toJSON()));
} catch (IOException e) {
throw new AlgebricksException(e);
}
@@ -370,7 +380,6 @@
}
printPlanPostfix(output);
}
- return spec;
}
private AbstractLogicalOperatorPrettyPrintVisitor getPrettyPrintVisitor(SessionConfig.PlanFormat planFormat,
@@ -390,7 +399,6 @@
double duration = (endTime - startTime) / 1000.00;
out.println("<pre>Duration: " + duration + " sec</pre>");
}
-
}
public void executeJobArray(IHyracksClientConnection hcc, Job[] jobs, PrintWriter out) throws Exception {
@@ -499,12 +507,13 @@
}
// Validates if the query contains unsupported query parameters.
- private static void validateConfig(Map<String, String> config) throws AlgebricksException {
+ private static Map<String, String> validateConfig(Map<String, String> config) throws AlgebricksException {
for (String parameterName : config.keySet()) {
if (!CONFIGURABLE_PARAMETER_NAMES.contains(parameterName)) {
throw AsterixException.create(ErrorCode.COMPILATION_UNSUPPORTED_QUERY_PARAMETER, parameterName);
}
}
+ return config;
}
public static AlgebricksAbsolutePartitionConstraint getJobLocations(JobSpecification spec,
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
index 89c4c76..ccda1e7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java
@@ -24,6 +24,7 @@
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
import org.apache.hyracks.api.job.resource.ClusterCapacity;
import org.apache.hyracks.api.job.resource.IClusterCapacity;
@@ -40,21 +41,20 @@
* a given query plan.
* @param computationLocations,
* the partitions for computation.
- * @param sortFrameLimit,
- * the frame limit for one sorter partition.
- * @param groupFrameLimit,
- * the frame limit for one group-by partition.
- * @param joinFrameLimit
- * the frame limit for one joiner partition.
- * @param frameSize
- * the frame size used in query execution.
+ * @param physicalOptimizationConfig,
+ * a PhysicalOptimizationConfig.
* @return the required cluster capacity for executing the query.
* @throws AlgebricksException
* if the query plan is malformed.
*/
public static IClusterCapacity getRequiredCapacity(ILogicalPlan plan,
- AlgebricksAbsolutePartitionConstraint computationLocations, int sortFrameLimit, int groupFrameLimit,
- int joinFrameLimit, int frameSize) throws AlgebricksException {
+ AlgebricksAbsolutePartitionConstraint computationLocations,
+ PhysicalOptimizationConfig physicalOptimizationConfig) throws AlgebricksException {
+ final int frameSize = physicalOptimizationConfig.getFrameSize();
+ final int sortFrameLimit = physicalOptimizationConfig.getMaxFramesExternalSort();
+ final int groupFrameLimit = physicalOptimizationConfig.getMaxFramesForGroupBy();
+ final int joinFrameLimit = physicalOptimizationConfig.getMaxFramesForJoin();
+
// Creates a cluster capacity visitor.
IClusterCapacity clusterCapacity = new ClusterCapacity();
RequiredCapacityVisitor visitor = new RequiredCapacityVisitor(computationLocations.getLocations().length,
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 62337ad..f740d09 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -355,8 +355,7 @@
throws AlgebricksException {
DataSource source = findDataSource(dataSourceId);
Dataset dataset = ((DatasetDataSource) source).getDataset();
- String indexName = indexId;
- Index secondaryIndex = getIndex(dataset.getDataverseName(), dataset.getDatasetName(), indexName);
+ Index secondaryIndex = getIndex(dataset.getDataverseName(), dataset.getDatasetName(), indexId);
return (secondaryIndex != null)
? new DataSourceIndex(secondaryIndex, dataset.getDataverseName(), dataset.getDatasetName(), this)
: null;
@@ -381,26 +380,19 @@
List<LogicalVariable> projectVariables, boolean projectPushed, List<LogicalVariable> minFilterVars,
List<LogicalVariable> maxFilterVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv,
JobGenContext context, JobSpecification jobSpec, Object implConfig) throws AlgebricksException {
- try {
- return ((DataSource) dataSource).buildDatasourceScanRuntime(this, dataSource, scanVariables,
- projectVariables, projectPushed, minFilterVars, maxFilterVars, opSchema, typeEnv, context, jobSpec,
- implConfig);
- } catch (AsterixException e) {
- throw new AlgebricksException(e);
- }
+ return ((DataSource) dataSource).buildDatasourceScanRuntime(this, dataSource, scanVariables, projectVariables,
+ projectPushed, minFilterVars, maxFilterVars, opSchema, typeEnv, context, jobSpec, implConfig);
}
protected Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildLoadableDatasetScan(
JobSpecification jobSpec, IAdapterFactory adapterFactory, RecordDescriptor rDesc)
throws AlgebricksException {
ExternalScanOperatorDescriptor dataScanner = new ExternalScanOperatorDescriptor(jobSpec, rDesc, adapterFactory);
- AlgebricksPartitionConstraint constraint;
try {
- constraint = adapterFactory.getPartitionConstraint();
+ return new Pair<>(dataScanner, adapterFactory.getPartitionConstraint());
} catch (Exception e) {
throw new AlgebricksException(e);
}
- return new Pair<>(dataScanner, constraint);
}
public Dataverse findDataverse(String dataverseName) throws AlgebricksException {