[ASTERIXDB-2983][EXT] Warn on no-files for Parquet instead of failing
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Parquet adapter fails (IOException) if no files were returned
- We should warn instead of failing to be consistent with other
file formats (e.g., JSON)
- This patch also fixes inappropriate calls to
IWarningCollector#shouldWarn()
Change-Id: Ie929046ef01ea5eee60d23f6e4665ac6727e2d1e
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/14063
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wael Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Hussain Towaileb <hussainht@gmail.com>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/no-files/no-files.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/no-files/no-files.1.ddl.sqlpp
new file mode 100644
index 0000000..2fc3e62
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/no-files/no-files.1.ddl.sqlpp
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description : No files
+* Expected Res : Warning: The provided external dataset configuration returned no files from the external source
+* Date : November 14th 2021
+*/
+
+-- param max-warnings:json=1000
+
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+
+USE test;
+
+
+CREATE TYPE ParquetType as {
+};
+
+CREATE EXTERNAL DATASET ParquetDataset(ParquetType) USING %adapter%
+(
+ %template%,
+ ("container"="playground"),
+ ("definition"="NOT_A_DEFINITION"),
+ ("format" = "parquet")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/no-files/no-files.2.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/no-files/no-files.2.query.sqlpp
new file mode 100644
index 0000000..37de163
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/no-files/no-files.2.query.sqlpp
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description : No files
+* Expected Res : Warning: The provided external dataset configuration returned no files from the external source
+* Date : November 14th 2021
+*/
+
+-- param max-warnings:json=1000
+
+USE test;
+
+SELECT VALUE COUNT(*)
+FROM ParquetDataset p
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/no-files/no-files.02.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/no-files/no-files.02.adm
new file mode 100644
index 0000000..c227083
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/no-files/no-files.02.adm
@@ -0,0 +1 @@
+0
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
index 96d34ec..6369c15 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
@@ -163,6 +163,15 @@
<expected-error>ASX1161: Type 'ParquetType' contains declared fields, which is not supported for 'parquet' format</expected-error>
</compilation-unit>
</test-case>
+ <test-case FilePath="external-dataset" check-warnings="true">
+ <compilation-unit name="common/parquet/no-files">
+ <placeholder name="adapter" value="S3" />
+ <output-dir compare="Text">common/parquet/no-files</output-dir>
+ <source-location>false</source-location>
+ <expected-warn>The provided external dataset configuration returned no files from the external source</expected-warn>
+ <expected-warn>The provided external dataset configuration returned no files from the external source</expected-warn>
+ </compilation-unit>
+ </test-case>
<!-- Parquet Tests End -->
<test-case FilePath="external-dataset">
<compilation-unit name="common/empty-string-definition">
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
index f15f735..8ea9ed4 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
@@ -54,6 +54,7 @@
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.data.std.api.IValueReference;
import org.apache.hyracks.hdfs.dataflow.ConfFactory;
import org.apache.hyracks.hdfs.dataflow.InputSplitsFactory;
import org.apache.hyracks.hdfs.scheduler.Scheduler;
@@ -86,16 +87,16 @@
@Override
public void configure(IServiceContext serviceCtx, Map<String, String> configuration,
IWarningCollector warningCollector) throws AlgebricksException, HyracksDataException {
- JobConf hdfsConf = createHdfsConf(serviceCtx, configuration, warningCollector.shouldWarn());
+ JobConf hdfsConf = createHdfsConf(serviceCtx, configuration);
configureHdfsConf(hdfsConf, configuration);
}
- protected JobConf createHdfsConf(IServiceContext serviceCtx, Map<String, String> configuration, boolean shouldWarn)
+ protected JobConf createHdfsConf(IServiceContext serviceCtx, Map<String, String> configuration)
throws HyracksDataException {
this.serviceCtx = serviceCtx;
this.configuration = configuration;
init((ICCServiceContext) serviceCtx);
- return HDFSUtils.configureHDFSJobConf(configuration, shouldWarn);
+ return HDFSUtils.configureHDFSJobConf(configuration);
}
protected void configureHdfsConf(JobConf conf, Map<String, String> configuration) throws AlgebricksException {
@@ -106,7 +107,7 @@
// if files list was set, we restrict the splits to the list
InputSplit[] inputSplits;
if (files == null) {
- inputSplits = conf.getInputFormat().getSplits(conf, numPartitions);
+ inputSplits = getInputSplits(conf, numPartitions);
} else {
inputSplits = HDFSUtils.getSplits(conf, files);
}
@@ -119,12 +120,12 @@
read = new boolean[readSchedule.length];
Arrays.fill(read, false);
String formatString = configuration.get(ExternalDataConstants.KEY_FORMAT);
- if (formatString == null || formatString.equals(ExternalDataConstants.FORMAT_HDFS_WRITABLE)
- || formatString.equals(ExternalDataConstants.FORMAT_NOOP)
- || formatString.equals(ExternalDataConstants.FORMAT_PARQUET)) {
+ if (formatString == null || formatString.equals(ExternalDataConstants.FORMAT_HDFS_WRITABLE)) {
RecordReader<?, ?> reader = conf.getInputFormat().getRecordReader(inputSplits[0], conf, Reporter.NULL);
this.recordClass = reader.createValue().getClass();
reader.close();
+ } else if (formatString.equals(ExternalDataConstants.FORMAT_PARQUET)) {
+ recordClass = IValueReference.class;
} else {
recordReaderClazz = StreamRecordReaderProvider.getRecordReaderClazz(configuration);
this.recordClass = char[].class;
@@ -134,6 +135,13 @@
}
}
+ private InputSplit[] getInputSplits(JobConf conf, int numPartitions) throws IOException {
+ if (HDFSUtils.isEmpty(conf)) {
+ return Scheduler.EMPTY_INPUT_SPLITS;
+ }
+ return conf.getInputFormat().getSplits(conf, numPartitions);
+ }
+
// Used to tell the factory to restrict the splits to the intersection between this list a
// actual files on hde
@Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
index 85d8671..803e657 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
@@ -50,7 +50,7 @@
putS3ConfToHadoopConf(configuration, path);
//Configure Hadoop S3 input splits
- JobConf conf = createHdfsConf(serviceCtx, configuration, warningCollector.shouldWarn());
+ JobConf conf = createHdfsConf(serviceCtx, configuration);
int numberOfPartitions = getPartitionConstraint().getLocations().length;
ExternalDataUtils.AwsS3.configureAwsS3HdfsJobConf(conf, configuration, numberOfPartitions);
configureHdfsConf(conf, configuration);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
index 0f9f484..ee765ce 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
@@ -54,7 +54,7 @@
putAzureBlobConfToHadoopConf(configuration, path);
//Configure Hadoop Azure input splits
- JobConf conf = createHdfsConf(serviceCtx, configuration, warningCollector.shouldWarn());
+ JobConf conf = createHdfsConf(serviceCtx, configuration);
ExternalDataUtils.Azure.configureAzureHdfsJobConf(conf, configuration, endPoint);
configureHdfsConf(conf, configuration);
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSLookupReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSLookupReaderFactory.java
index 8fbe5c4..75d431d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSLookupReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSLookupReaderFactory.java
@@ -65,7 +65,7 @@
IWarningCollector warningCollector) throws AsterixException {
this.serviceCtx = serviceCtx;
this.configuration = configuration;
- JobConf conf = HDFSUtils.configureHDFSJobConf(configuration, warningCollector.shouldWarn());
+ JobConf conf = HDFSUtils.configureHDFSJobConf(configuration);
try {
confFactory = new ConfFactory(conf);
} catch (HyracksDataException e) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/AsterixTypeToParquetTypeVisitor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/AsterixTypeToParquetTypeVisitor.java
index d596493..c0a47d5 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/AsterixTypeToParquetTypeVisitor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/AsterixTypeToParquetTypeVisitor.java
@@ -49,8 +49,8 @@
private final List<Warning> warnings;
private Map<String, FunctionCallInformation> funcInfo;
- public AsterixTypeToParquetTypeVisitor(boolean shouldWarn) {
- warnings = shouldWarn ? new ArrayList<>() : null;
+ public AsterixTypeToParquetTypeVisitor() {
+ warnings = new ArrayList<>();
}
public MessageType clipType(ARecordType rootType, MessageType fileSchema,
@@ -161,7 +161,7 @@
ATypeTag expectedType = node.getTypeTag();
boolean isNotExpected = actualType != expectedType;
- if (warnings != null && isNotExpected) {
+ if (isNotExpected) {
//typeName is unique
FunctionCallInformation info = funcInfo.get(node.getTypeName());
//If no warning is created, then it means it has been reported
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetFileRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetFileRecordReader.java
index b06afd3..cc9b34c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetFileRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetFileRecordReader.java
@@ -51,10 +51,8 @@
@Override
public void close() throws IOException {
super.close();
- if (warningCollector.shouldWarn()) {
- //report warnings
- HDFSUtils.issueWarnings(warningCollector, conf);
- }
+ //Issue warning if any was reported
+ HDFSUtils.issueWarnings(warningCollector, conf);
}
@SuppressWarnings("unchecked")
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetReadSupport.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetReadSupport.java
index b2a5eeb..aac293d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetReadSupport.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/ParquetReadSupport.java
@@ -23,7 +23,6 @@
import java.util.List;
import java.util.Map;
-import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.HDFSUtils;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.runtime.projection.FunctionCallInformation;
@@ -52,8 +51,7 @@
private static MessageType getRequestedSchema(InitContext context) {
Configuration configuration = context.getConfiguration();
MessageType fileSchema = context.getFileSchema();
- boolean shouldWarn = configuration.getBoolean(ExternalDataConstants.KEY_HADOOP_ASTERIX_WARNINGS_ENABLED, false);
- AsterixTypeToParquetTypeVisitor visitor = new AsterixTypeToParquetTypeVisitor(shouldWarn);
+ AsterixTypeToParquetTypeVisitor visitor = new AsterixTypeToParquetTypeVisitor();
try {
ARecordType expectedType = HDFSUtils.getExpectedType(configuration);
Map<String, FunctionCallInformation> functionCallInformationMap =
@@ -61,7 +59,7 @@
MessageType requestedType = visitor.clipType(expectedType, fileSchema, functionCallInformationMap);
List<Warning> warnings = visitor.getWarnings();
- if (shouldWarn && !warnings.isEmpty()) {
+ if (!warnings.isEmpty()) {
//New warnings were created, set the warnings in hadoop configuration to be reported
HDFSUtils.setWarnings(warnings, configuration);
//Update the reported warnings so that we do not report the same warning again
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index ec25997..16a0f66 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -60,7 +60,6 @@
public static final String KEY_HADOOP_SHORT_CIRCUIT = "dfs.client.read.shortcircuit";
public static final String KEY_HADOOP_SOCKET_PATH = "dfs.domain.socket.path";
public static final String KEY_HADOOP_BUFFER_SIZE = "io.file.buffer.size";
- public static final String KEY_HADOOP_ASTERIX_WARNINGS_ENABLED = "org.apache.asterix.warnings.enabled";
//Base64 encoded warnings issued from Hadoop
public static final String KEY_HADOOP_ASTERIX_WARNINGS_LIST = "org.apache.asterix.warnings.list";
//Disable caching FileSystem for Hadoop
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
index 9f65cd7..28a0766 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -201,7 +201,7 @@
}
}
- public static JobConf configureHDFSJobConf(Map<String, String> configuration, boolean shouldWarn) {
+ public static JobConf configureHDFSJobConf(Map<String, String> configuration) {
JobConf conf = new JobConf();
String localShortCircuitSocketPath = configuration.get(ExternalDataConstants.KEY_LOCAL_SOCKET_PATH);
String formatClassName = HDFSUtils.getInputFormatClassName(configuration);
@@ -235,11 +235,6 @@
//Subset of the values were requested, set the functionCallInformation
conf.set(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION,
configuration.get(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION));
- /*
- * Allows Parquet to issue warnings in case we found type mismatches (if warnings are enabled).
- * Warnings will be issued during the type matching of Parquet's schema with the requested schema
- */
- conf.setBoolean(ExternalDataConstants.KEY_HADOOP_ASTERIX_WARNINGS_ENABLED, shouldWarn);
}
conf.set(ExternalDataConstants.KEY_REQUESTED_FIELDS, requestedValues);
}
@@ -320,7 +315,7 @@
public static void issueWarnings(IWarningCollector warningCollector, Configuration conf) throws IOException {
String warnings = conf.get(ExternalDataConstants.KEY_HADOOP_ASTERIX_WARNINGS_LIST, "");
- if (!warnings.isEmpty() && warningCollector.shouldWarn()) {
+ if (!warnings.isEmpty()) {
String[] encodedWarnings = warnings.split(",");
Base64.Decoder decoder = Base64.getDecoder();
for (int i = 0; i < encodedWarnings.length; i++) {
@@ -330,7 +325,9 @@
*/
byte[] warningBytes = decoder.decode(encodedWarnings[i]);
DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(warningBytes));
- warningCollector.warn(Warning.create(dataInputStream));
+ if (warningCollector.shouldWarn()) {
+ warningCollector.warn(Warning.create(dataInputStream));
+ }
}
//Remove reported warnings
conf.unset(ExternalDataConstants.KEY_HADOOP_ASTERIX_WARNINGS_LIST);
@@ -348,4 +345,14 @@
conf.set(String.format(ExternalDataConstants.KEY_HADOOP_DISABLE_FS_CACHE_TEMPLATE, protocol),
ExternalDataConstants.TRUE);
}
+
+ /**
+ * Check whether the provided path is empty
+ *
+ * @param job Hadoop Configuration
+ * @return <code>true</code> if the path is empty, <code>false</code> otherwise
+ */
+ public static boolean isEmpty(JobConf job) {
+ return job.get(ExternalDataConstants.KEY_HADOOP_INPUT_DIR, "").isEmpty();
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java
index bc187f8..b9d68f7 100644
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/Scheduler.java
@@ -49,16 +49,21 @@
* class works for Hadoop old API.
*/
public class Scheduler {
+ /**
+ * Empty input splits
+ */
+ public static final InputSplit[] EMPTY_INPUT_SPLITS = {};
+
private static final Logger LOGGER = LogManager.getLogger();
/** a list of NCs */
private String[] NCs;
/** a map from ip to NCs */
- private Map<String, List<String>> ipToNcMapping = new HashMap<String, List<String>>();
+ private Map<String, List<String>> ipToNcMapping = new HashMap<>();
/** a map from the NC name to the index */
- private Map<String, Integer> ncNameToIndex = new HashMap<String, Integer>();
+ private Map<String, Integer> ncNameToIndex = new HashMap<>();
/** a map from NC name to the NodeControllerInfo */
private Map<String, NodeControllerInfo> ncNameToNcInfos;
@@ -108,8 +113,7 @@
/**
* The constructor of the scheduler.
*
- * @param ncNameToNcInfos
- * the mapping from nc names to nc infos
+ * @param ncNameToNcInfos the mapping from nc names to nc infos
* @throws HyracksException
*/
public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
@@ -121,10 +125,8 @@
/**
* The constructor of the scheduler.
*
- * @param ncNameToNcInfos
- * the mapping from nc names to nc infos
- * @param topology
- * the hyracks cluster toplogy
+ * @param ncNameToNcInfos the mapping from nc names to nc infos
+ * @param topology the hyracks cluster toplogy
* @throws HyracksException
*/
public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos, ClusterTopology topology)
@@ -137,8 +139,7 @@
/**
* The constructor of the scheduler.
*
- * @param ncNameToNcInfos
- * the mapping from nc names to nc infos
+ * @param ncNameToNcInfos the mapping from nc names to nc infos
* @throws HyracksException
*/
public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos, INcCollectionBuilder ncCollectionBuilder)
@@ -156,14 +157,14 @@
* @throws HyracksDataException
*/
public String[] getLocationConstraints(InputSplit[] splits) throws HyracksException {
- if (splits == null) {
+ if (splits == null || splits == EMPTY_INPUT_SPLITS) {
/** deal the case when the splits array is null */
return new String[] {};
}
int[] workloads = new int[NCs.length];
Arrays.fill(workloads, 0);
String[] locations = new String[splits.length];
- Map<String, IntWritable> locationToNumOfSplits = new HashMap<String, IntWritable>();
+ Map<String, IntWritable> locationToNumOfSplits = new HashMap<>();
/**
* upper bound number of slots that a machine can get
*/
@@ -217,16 +218,11 @@
/**
* Schedule non-local slots to each machine
*
- * @param splits
- * The HDFS file splits.
- * @param workloads
- * The current capacity of each machine.
- * @param locations
- * The result schedule.
- * @param slotLimit
- * The maximum slots of each machine.
- * @param scheduled
- * Indicate which slot is scheduled.
+ * @param splits The HDFS file splits.
+ * @param workloads The current capacity of each machine.
+ * @param locations The result schedule.
+ * @param slotLimit The maximum slots of each machine.
+ * @param scheduled Indicate which slot is scheduled.
*/
private void scheduleNonLocalSlots(InputSplit[] splits, int[] workloads, String[] locations, int slotLimit,
boolean[] scheduled) throws IOException, UnknownHostException {
@@ -259,18 +255,12 @@
/**
* Schedule data-local slots to each machine.
*
- * @param splits
- * The HDFS file splits.
- * @param workloads
- * The current capacity of each machine.
- * @param locations
- * The result schedule.
- * @param slots
- * The maximum slots of each machine.
- * @param random
- * The random generator.
- * @param scheduled
- * Indicate which slot is scheduled.
+ * @param splits The HDFS file splits.
+ * @param workloads The current capacity of each machine.
+ * @param locations The result schedule.
+ * @param slots The maximum slots of each machine.
+ * @param random The random generator.
+ * @param scheduled Indicate which slot is scheduled.
* @throws IOException
* @throws UnknownHostException
*/
@@ -278,7 +268,7 @@
boolean[] scheduled, final Map<String, IntWritable> locationToNumSplits)
throws IOException, UnknownHostException {
/** scheduling candidates will be ordered inversely according to their popularity */
- PriorityQueue<String> scheduleCadndiates = new PriorityQueue<String>(3, new Comparator<String>() {
+ PriorityQueue<String> scheduleCadndiates = new PriorityQueue<>(3, new Comparator<>() {
@Override
public int compare(String s1, String s2) {
@@ -346,10 +336,8 @@
/**
* Scan the splits once and build a popularity map
*
- * @param splits
- * the split array
- * @param locationToNumOfSplits
- * the map to be built
+ * @param splits the split array
+ * @param locationToNumOfSplits the map to be built
* @throws IOException
*/
private void buildPopularityMap(InputSplit[] splits, Map<String, IntWritable> locationToNumOfSplits)