[ASTERIXDB-3288][RT] COPY TO runtime - Part 2
- user model changes: yes
- storage format changes: no
- interface changes: yes
Details:
- Add runtime directory path validation
- Fail if the writing path exists and
is not empty
- Changed file counter to have the maxmimum
number of digits that Java long can hold
instead of only 4-digits
Change-Id: I3bd18d6c77feaf90600f66a437facdbab925aef1
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17890
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudOutputStream.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudOutputStream.java
index 18a97ad..349b1b1 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudOutputStream.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudOutputStream.java
@@ -21,7 +21,7 @@
import java.io.IOException;
import java.io.OutputStream;
-public class CloudOutputStream extends OutputStream {
+public final class CloudOutputStream extends OutputStream {
private final CloudResettableInputStream inputStream;
public CloudOutputStream(CloudResettableInputStream inputStream) {
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
index f198001..0533184 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
@@ -139,7 +139,7 @@
try {
bufferedWriter.upload(this, writeBuffer.limit());
} catch (Exception e) {
- LOGGER.fatal(e);
+ LOGGER.error(e);
throw HyracksDataException.create(e);
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
index 76a768a..7941ada 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
@@ -130,6 +130,8 @@
*/
boolean exists(String bucket, String path) throws HyracksDataException;
+ boolean isEmptyPrefix(String bucket, String path) throws HyracksDataException;
+
/**
* Create a parallel downloader
*
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
index a2aa21c..c23e437 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
@@ -18,7 +18,6 @@
*/
package org.apache.asterix.cloud.clients.aws.s3;
-import java.io.Serializable;
import java.util.Map;
import org.apache.asterix.common.config.CloudProperties;
@@ -28,8 +27,7 @@
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
-public final class S3ClientConfig implements Serializable {
- private static final long serialVersionUID = 548292720313565948L;
+public final class S3ClientConfig {
// The maximum number of file that can be deleted (AWS restriction)
static final int DELETE_BATCH_SIZE = 1000;
private final String region;
@@ -55,17 +53,16 @@
public static S3ClientConfig of(Map<String, String> configuration) {
// Used to determine local vs. actual S3
- String endPoint = configuration.get(S3Constants.SERVICE_END_POINT_FIELD_NAME);
+ String endPoint = configuration.getOrDefault(S3Constants.SERVICE_END_POINT_FIELD_NAME, "");
// Disabled
long profilerLogInterval = 0;
// Dummy values;
String region = "";
- String prefix = null;
+ String prefix = "";
boolean anonymousAuth = false;
- return new S3ClientConfig(region, configuration.get(S3Constants.SERVICE_END_POINT_FIELD_NAME), "",
- anonymousAuth, profilerLogInterval);
+ return new S3ClientConfig(region, endPoint, prefix, anonymousAuth, profilerLogInterval);
}
public String getRegion() {
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClientUtils.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientUtils.java
similarity index 88%
rename from asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClientUtils.java
rename to asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientUtils.java
index 82b0ae1..fe47537 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClientUtils.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientUtils.java
@@ -31,9 +31,9 @@
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.S3Object;
-public class S3CloudClientUtils {
+public class S3ClientUtils {
- private S3CloudClientUtils() {
+ private S3ClientUtils() {
throw new AssertionError("do not instantiate");
}
@@ -63,6 +63,13 @@
return files;
}
+ public static boolean isEmptyPrefix(S3Client s3Client, String bucket, String path) {
+ ListObjectsV2Request.Builder listObjectsBuilder = ListObjectsV2Request.builder().bucket(bucket);
+ listObjectsBuilder.prefix(toCloudPrefix(path));
+
+ return s3Client.listObjectsV2(listObjectsBuilder.build()).contents().isEmpty();
+ }
+
public static String encodeURI(String path) {
if (path.isEmpty()) {
return path;
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
index b47a6ff..5cdf971 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
@@ -19,8 +19,8 @@
package org.apache.asterix.cloud.clients.aws.s3;
import static org.apache.asterix.cloud.clients.aws.s3.S3ClientConfig.DELETE_BATCH_SIZE;
-import static org.apache.asterix.cloud.clients.aws.s3.S3CloudClientUtils.encodeURI;
-import static org.apache.asterix.cloud.clients.aws.s3.S3CloudClientUtils.listS3Objects;
+import static org.apache.asterix.cloud.clients.aws.s3.S3ClientUtils.encodeURI;
+import static org.apache.asterix.cloud.clients.aws.s3.S3ClientUtils.listS3Objects;
import java.io.FilenameFilter;
import java.io.IOException;
@@ -227,6 +227,12 @@
}
@Override
+ public boolean isEmptyPrefix(String bucket, String path) throws HyracksDataException {
+ profiler.objectsList();
+ return S3ClientUtils.isEmptyPrefix(s3Client, bucket, path);
+ }
+
+ @Override
public IParallelDownloader createParallelDownloader(String bucket, IOManager ioManager) {
return new S3ParallelDownloader(bucket, ioManager, config, profiler);
}
@@ -269,7 +275,7 @@
private Set<String> filterAndGet(List<S3Object> contents, FilenameFilter filter) {
Set<String> files = new HashSet<>();
for (S3Object s3Object : contents) {
- String path = config.isLocalS3Provider() ? S3CloudClientUtils.decodeURI(s3Object.key()) : s3Object.key();
+ String path = config.isLocalS3Provider() ? S3ClientUtils.decodeURI(s3Object.key()) : s3Object.key();
if (filter.accept(null, IoUtil.getFileNameFromPath(path))) {
files.add(path);
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java
new file mode 100644
index 0000000..64eee70
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.writer;
+
+import org.apache.asterix.cloud.CloudOutputStream;
+import org.apache.asterix.cloud.CloudResettableInputStream;
+import org.apache.asterix.cloud.IWriteBufferProvider;
+import org.apache.asterix.cloud.WriterSingleBufferProvider;
+import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
+import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.runtime.writer.IExternalFilePrinter;
+import org.apache.asterix.runtime.writer.IExternalFileWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.api.exceptions.Warning;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+import com.google.common.base.Utf8;
+
+abstract class AbstractCloudExternalFileWriter implements IExternalFileWriter {
+ private final IExternalFilePrinter printer;
+ private final ICloudClient cloudClient;
+ private final String bucket;
+ private final boolean partitionedPath;
+ private final IWarningCollector warningCollector;
+ private final SourceLocation pathSourceLocation;
+ private final IWriteBufferProvider bufferProvider;
+ private ICloudBufferedWriter bufferedWriter;
+
+ AbstractCloudExternalFileWriter(IExternalFilePrinter printer, ICloudClient cloudClient, String bucket,
+ boolean partitionedPath, IWarningCollector warningCollector, SourceLocation pathSourceLocation) {
+ this.printer = printer;
+ this.cloudClient = cloudClient;
+ this.bucket = bucket;
+ this.partitionedPath = partitionedPath;
+ this.warningCollector = warningCollector;
+ this.pathSourceLocation = pathSourceLocation;
+ bufferProvider = new WriterSingleBufferProvider();
+ }
+
+ @Override
+ public final void open() throws HyracksDataException {
+ printer.open();
+ }
+
+ @Override
+ public void validate(String directory) throws HyracksDataException {
+ if (partitionedPath && !cloudClient.isEmptyPrefix(bucket, directory)) {
+ throw new RuntimeDataException(ErrorCode.DIRECTORY_IS_NOT_EMPTY, pathSourceLocation, directory);
+ }
+ }
+
+ @Override
+ public final boolean newFile(String directory, String fileName) throws HyracksDataException {
+ String fullPath = directory + fileName;
+ if (isExceedingMaxLength(fullPath)) {
+ if (warningCollector.shouldWarn()) {
+ warningCollector.warn(Warning.of(pathSourceLocation, ErrorCode.WRITE_PATH_LENGTH_EXCEEDS_MAX_LENGTH,
+ fullPath, getPathMaxLengthInBytes(), getAdapterName()));
+ }
+ return false;
+ }
+
+ bufferedWriter = cloudClient.createBufferedWriter(bucket, fullPath);
+ CloudResettableInputStream inputStream = new CloudResettableInputStream(bufferedWriter, bufferProvider);
+
+ CloudOutputStream outputStream = new CloudOutputStream(inputStream);
+ printer.newStream(outputStream);
+
+ return true;
+ }
+
+ @Override
+ public final void write(IValueReference value) throws HyracksDataException {
+ printer.print(value);
+ }
+
+ @Override
+ public final void abort() throws HyracksDataException {
+ bufferedWriter.abort();
+ printer.close();
+ }
+
+ @Override
+ public final void close() throws HyracksDataException {
+ printer.close();
+ }
+
+ abstract String getAdapterName();
+
+ abstract int getPathMaxLengthInBytes();
+
+ private boolean isExceedingMaxLength(String path) {
+ return Utf8.encodedLength(path) >= getPathMaxLengthInBytes();
+ }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/CloudExternalFileWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/CloudExternalFileWriter.java
deleted file mode 100644
index 22095ca..0000000
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/CloudExternalFileWriter.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.cloud.writer;
-
-import org.apache.asterix.cloud.CloudOutputStream;
-import org.apache.asterix.cloud.CloudResettableInputStream;
-import org.apache.asterix.cloud.IWriteBufferProvider;
-import org.apache.asterix.cloud.WriterSingleBufferProvider;
-import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
-import org.apache.asterix.cloud.clients.ICloudClient;
-import org.apache.asterix.runtime.writer.IExternalFilePrinter;
-import org.apache.asterix.runtime.writer.IExternalFileWriter;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.data.std.api.IValueReference;
-
-final class CloudExternalFileWriter implements IExternalFileWriter {
- private final IExternalFilePrinter printer;
- private final ICloudClient cloudClient;
- private final String bucket;
- private final IWriteBufferProvider bufferProvider;
- private ICloudBufferedWriter bufferedWriter;
-
- public CloudExternalFileWriter(IExternalFilePrinter printer, ICloudClient cloudClient, String bucket) {
- this.printer = printer;
- this.cloudClient = cloudClient;
- this.bucket = bucket;
- bufferProvider = new WriterSingleBufferProvider();
- }
-
- @Override
- public void open() throws HyracksDataException {
- printer.open();
- }
-
- @Override
- public void newFile(String path) throws HyracksDataException {
- bufferedWriter = cloudClient.createBufferedWriter(bucket, path);
- CloudResettableInputStream inputStream = new CloudResettableInputStream(bufferedWriter, bufferProvider);
-
- CloudOutputStream outputStream = new CloudOutputStream(inputStream);
- printer.newStream(outputStream);
- }
-
- @Override
- public void write(IValueReference value) throws HyracksDataException {
- printer.print(value);
- }
-
- @Override
- public void abort() throws HyracksDataException {
- bufferedWriter.abort();
- printer.close();
- }
-
- @Override
- public void close() throws HyracksDataException {
- printer.close();
- }
-}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriter.java
new file mode 100644
index 0000000..5fd4ea6
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.writer;
+
+import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.runtime.writer.IExternalFilePrinter;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+
+final class S3ExternalFileWriter extends AbstractCloudExternalFileWriter {
+
+ S3ExternalFileWriter(IExternalFilePrinter printer, ICloudClient cloudClient, String bucket, boolean partitionedPath,
+ IWarningCollector warningCollector, SourceLocation pathSourceLocation) {
+ super(printer, cloudClient, bucket, partitionedPath, warningCollector, pathSourceLocation);
+ }
+
+ @Override
+ String getAdapterName() {
+ return ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3;
+ }
+
+ @Override
+ int getPathMaxLengthInBytes() {
+ return 1024;
+ }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
index 204d5da..d7a51cd 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
@@ -33,13 +33,17 @@
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.aws.s3.S3Utils;
+import org.apache.asterix.runtime.writer.ExternalFileWriterConfiguration;
import org.apache.asterix.runtime.writer.IExternalFileFilterWriterFactoryProvider;
+import org.apache.asterix.runtime.writer.IExternalFilePrinter;
import org.apache.asterix.runtime.writer.IExternalFilePrinterFactory;
import org.apache.asterix.runtime.writer.IExternalFileWriter;
import org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.api.util.ExceptionUtils;
import org.apache.hyracks.data.std.primitive.LongPointable;
import org.apache.logging.log4j.LogManager;
@@ -48,16 +52,30 @@
import software.amazon.awssdk.services.s3.model.NoSuchBucketException;
public final class S3ExternalFileWriterFactory implements IExternalFileWriterFactory {
- private static final Logger LOGGER = LogManager.getLogger();
private static final long serialVersionUID = 4551318140901866805L;
- public static final IExternalFileFilterWriterFactoryProvider PROVIDER = S3ExternalFileWriterFactory::new;
- private final S3ClientConfig config;
+ private static final Logger LOGGER = LogManager.getLogger();
+ static final char SEPARATOR = '/';
+ public static final IExternalFileFilterWriterFactoryProvider PROVIDER =
+ new IExternalFileFilterWriterFactoryProvider() {
+ @Override
+ public IExternalFileWriterFactory create(ExternalFileWriterConfiguration configuration) {
+ return new S3ExternalFileWriterFactory(configuration);
+ }
+
+ @Override
+ public char getSeparator() {
+ return SEPARATOR;
+ }
+ };
private final Map<String, String> configuration;
+ private final SourceLocation pathSourceLocation;
+ private final String staticPath;
private transient S3CloudClient cloudClient;
- private S3ExternalFileWriterFactory(Map<String, String> configuration) {
- this.config = S3ClientConfig.of(configuration);
- this.configuration = configuration;
+ private S3ExternalFileWriterFactory(ExternalFileWriterConfiguration externalConfig) {
+ configuration = externalConfig.getConfiguration();
+ pathSourceLocation = externalConfig.getPathSourceLocation();
+ staticPath = externalConfig.getStaticPath();
cloudClient = null;
}
@@ -66,14 +84,18 @@
throws HyracksDataException {
buildClient();
String bucket = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
- return new CloudExternalFileWriter(printerFactory.createPrinter(), cloudClient, bucket);
+ IExternalFilePrinter printer = printerFactory.createPrinter();
+ IWarningCollector warningCollector = context.getWarningCollector();
+ return new S3ExternalFileWriter(printer, cloudClient, bucket, staticPath == null, warningCollector,
+ pathSourceLocation);
}
private void buildClient() throws HyracksDataException {
try {
synchronized (this) {
if (cloudClient == null) {
- // only a single client should be build
+ // only a single client should be built
+ S3ClientConfig config = S3ClientConfig.of(configuration);
cloudClient = new S3CloudClient(config, S3Utils.buildAwsS3Client(configuration));
}
}
@@ -83,32 +105,40 @@
}
@Override
- public char getFileSeparator() {
- return '/';
+ public char getSeparator() {
+ return SEPARATOR;
}
@Override
public void validate() throws AlgebricksException {
+ S3ClientConfig config = S3ClientConfig.of(configuration);
ICloudClient testClient = new S3CloudClient(config, S3Utils.buildAwsS3Client(configuration));
String bucket = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+
if (bucket == null || bucket.isEmpty()) {
throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED,
ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
}
+
try {
doValidate(testClient, bucket);
} catch (IOException e) {
if (e.getCause() instanceof NoSuchBucketException) {
throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, bucket);
} else {
- LOGGER.fatal(e);
+ LOGGER.error(e);
throw CompilationException.create(ErrorCode.EXTERNAL_SOURCE_ERROR,
ExceptionUtils.getMessageOrToString(e));
}
}
}
- private static void doValidate(ICloudClient testClient, String bucket) throws IOException {
+ private void doValidate(ICloudClient testClient, String bucket) throws IOException, AlgebricksException {
+ if (staticPath != null && !testClient.isEmptyPrefix(bucket, staticPath)) {
+ // Ensure that the static path is empty
+ throw new CompilationException(ErrorCode.DIRECTORY_IS_NOT_EMPTY, pathSourceLocation, staticPath);
+ }
+
Random random = new Random();
String pathPrefix = "testFile";
String path = pathPrefix + random.nextInt();
@@ -127,6 +157,7 @@
stream.write(data, 0, data.length);
} catch (HyracksDataException e) {
stream.abort();
+ aborted = true;
} finally {
if (stream != null && !aborted) {
stream.finish();
diff --git a/asterixdb/asterix-cloud/src/test/java/org/apach/asterix/cloud/LSMTest.java b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/LSMTest.java
similarity index 96%
rename from asterixdb/asterix-cloud/src/test/java/org/apach/asterix/cloud/LSMTest.java
rename to asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/LSMTest.java
index 612aa1d..92d7f12 100644
--- a/asterixdb/asterix-cloud/src/test/java/org/apach/asterix/cloud/LSMTest.java
+++ b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/LSMTest.java
@@ -16,15 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apach.asterix.cloud;
+package org.apache.asterix.cloud;
import java.io.FilenameFilter;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
-import org.apache.asterix.cloud.CloudResettableInputStream;
-import org.apache.asterix.cloud.WriteBufferProvider;
import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
import org.apache.asterix.cloud.clients.ICloudClient;
import org.apache.logging.log4j.LogManager;
diff --git a/asterixdb/asterix-cloud/src/test/java/org/apach/asterix/cloud/s3/LSMS3Test.java b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java
similarity index 97%
rename from asterixdb/asterix-cloud/src/test/java/org/apach/asterix/cloud/s3/LSMS3Test.java
rename to asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java
index 06286cc..0452da0 100644
--- a/asterixdb/asterix-cloud/src/test/java/org/apach/asterix/cloud/s3/LSMS3Test.java
+++ b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java
@@ -16,11 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apach.asterix.cloud.s3;
+package org.apache.asterix.cloud.s3;
import java.net.URI;
-import org.apach.asterix.cloud.LSMTest;
+import org.apache.asterix.cloud.LSMTest;
import org.apache.asterix.cloud.clients.aws.s3.S3ClientConfig;
import org.apache.asterix.cloud.clients.aws.s3.S3CloudClient;
import org.junit.AfterClass;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 9ac8513..0c3d8ca 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -89,6 +89,10 @@
INVALID_PARQUET_FILE(59),
TYPE_MISMATCH_EXTRA_FIELD(60),
TYPE_MISMATCH_MISSING_FIELD(61),
+ DIRECTORY_IS_NOT_EMPTY(62),
+ COULD_NOT_CREATE_FILE(63),
+ NON_STRING_WRITE_PATH(64),
+ WRITE_PATH_LENGTH_EXCEEDS_MAX_LENGTH(65),
UNSUPPORTED_JRE(100),
@@ -282,6 +286,8 @@
UNKNOWN_DATABASE(1185),
DATABASE_EXISTS(1186),
CANNOT_DROP_DATABASE_DEPENDENT_EXISTS(1187),
+ UNSUPPORTED_WRITING_ADAPTER(1188),
+ UNSUPPORTED_WRITING_FORMAT(1189),
// Feed errors
DATAFLOW_ILLEGAL_STATE(3001),
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 122bff6..1d48e1e 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -96,6 +96,10 @@
59 = Invalid Parquet file: %1$s. Reason: %2$s
60 = Type mismatch: including an extra field %1$s
61 = Type mismatch: missing a required field %1$s: %2$s
+62 = Cannot write to a non-empty directory '%1$s'
+63 = Could not create file '%1$s'
+64 = Path expression produced a value of type '%1$s'. Path must be of type string
+65 = Length of the file path '%1$s' exceeds the maximum length of '%2$s bytes' allowed in %3$s
100 = Unsupported JRE: %1$s
@@ -284,6 +288,8 @@
1185 = Cannot find database with name %1$s
1186 = A database with this name %1$s already exists
1187 = Cannot drop database: %1$s %2$s being used by %3$s %4$s
+1188 = Unsupported writing adapter '%1$s'. Supported adapters: %2$s
+1189 = Unsupported writing format '%1$s'. Supported formats: %2$s
# Feed Errors
3001 = Illegal state.
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 7428a4b..7c30b64 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -18,8 +18,6 @@
*/
package org.apache.asterix.external.util;
-import java.util.Collections;
-import java.util.HashSet;
import java.util.Set;
import java.util.TimeZone;
import java.util.function.LongSupplier;
@@ -207,23 +205,13 @@
public static final String FORMAT_PARQUET = "parquet";
public static final String FORMAT_APACHE_ICEBERG = "apache-iceberg";
public static final Set<String> ALL_FORMATS;
+ public static final Set<String> TEXTUAL_FORMATS;
static {
- Set<String> formats = new HashSet<>(14);
- formats.add(FORMAT_BINARY);
- formats.add(FORMAT_ADM);
- formats.add(FORMAT_JSON_LOWER_CASE);
- formats.add(FORMAT_DELIMITED_TEXT);
- formats.add(FORMAT_TWEET);
- formats.add(FORMAT_RSS);
- formats.add(FORMAT_SEMISTRUCTURED);
- formats.add(FORMAT_LINE_SEPARATED);
- formats.add(FORMAT_HDFS_WRITABLE);
- formats.add(FORMAT_KV);
- formats.add(FORMAT_CSV);
- formats.add(FORMAT_TSV);
- formats.add(FORMAT_PARQUET);
- ALL_FORMATS = Collections.unmodifiableSet(formats);
+ ALL_FORMATS = Set.of(FORMAT_BINARY, FORMAT_ADM, FORMAT_JSON_LOWER_CASE, FORMAT_DELIMITED_TEXT, FORMAT_TWEET,
+ FORMAT_RSS, FORMAT_SEMISTRUCTURED, FORMAT_LINE_SEPARATED, FORMAT_HDFS_WRITABLE, FORMAT_KV, FORMAT_CSV,
+ FORMAT_TSV, FORMAT_PARQUET);
+ TEXTUAL_FORMATS = Set.of(FORMAT_ADM, FORMAT_JSON_LOWER_CASE, FORMAT_CSV, FORMAT_TSV);
}
/**
@@ -311,6 +299,27 @@
public static final String PREFIX_DEFAULT_DELIMITER = "/";
public static final Pattern COMPUTED_FIELD_PATTERN = Pattern.compile("\\{[^{}:]+:[^{}:]+}");
+ /**
+ * Compression constants
+ */
+ public static final String KEY_COMPRESSION_GZIP = "gzip";
+
+ /**
+ * Writer Constants
+ */
+ public static final String KEY_WRITER_MAX_RESULT = "max-objects-per-file";
+ public static final String KEY_WRITER_COMPRESSION = "compression";
+ public static final int WRITER_MAX_RESULT_DEFAULT = 1000;
+ public static final Set<String> WRITER_SUPPORTED_FORMATS;
+ public static final Set<String> WRITER_SUPPORTED_ADAPTERS;
+ public static final Set<String> WRITER_SUPPORTED_COMPRESSION;
+
+ static {
+ WRITER_SUPPORTED_FORMATS = Set.of(FORMAT_JSON_LOWER_CASE);
+ WRITER_SUPPORTED_ADAPTERS = Set.of(ALIAS_LOCALFS_ADAPTER.toLowerCase(), KEY_ADAPTER_NAME_AWS_S3.toLowerCase());
+ WRITER_SUPPORTED_COMPRESSION = Set.of(KEY_COMPRESSION_GZIP);
+ }
+
public static class ParquetOptions {
private ParquetOptions() {
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/WriterValidationUtil.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/WriterValidationUtil.java
new file mode 100644
index 0000000..4c848b1
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/WriterValidationUtil.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+
+public class WriterValidationUtil {
+
+ private WriterValidationUtil() {
+ }
+
+ public static void validateWriterConfiguration(String adapter, Set<String> supportedAdapters,
+ Map<String, String> configuration, SourceLocation sourceLocation) throws CompilationException {
+ validateAdapter(adapter, supportedAdapters, sourceLocation);
+ validateFormat(configuration, sourceLocation);
+ validateCompression(configuration, sourceLocation);
+ validateMaxResult(configuration, sourceLocation);
+ }
+
+ private static void validateAdapter(String adapter, Set<String> supportedAdapters, SourceLocation sourceLocation)
+ throws CompilationException {
+ checkSupported(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE, adapter, supportedAdapters,
+ ErrorCode.UNSUPPORTED_WRITING_ADAPTER, sourceLocation, false);
+ }
+
+ private static void validateFormat(Map<String, String> configuration, SourceLocation sourceLocation)
+ throws CompilationException {
+ String format = configuration.get(ExternalDataConstants.KEY_FORMAT);
+ checkSupported(ExternalDataConstants.KEY_FORMAT, format, ExternalDataConstants.WRITER_SUPPORTED_FORMATS,
+ ErrorCode.UNSUPPORTED_WRITING_FORMAT, sourceLocation, false);
+ }
+
+ private static void validateCompression(Map<String, String> configuration, SourceLocation sourceLocation)
+ throws CompilationException {
+ String compression = configuration.get(ExternalDataConstants.KEY_WRITER_COMPRESSION);
+ checkSupported(ExternalDataConstants.KEY_WRITER_COMPRESSION, compression,
+ ExternalDataConstants.WRITER_SUPPORTED_COMPRESSION, ErrorCode.UNKNOWN_COMPRESSION_SCHEME,
+ sourceLocation, true);
+ }
+
+ private static void validateMaxResult(Map<String, String> configuration, SourceLocation sourceLocation)
+ throws CompilationException {
+ String maxResult = configuration.get(ExternalDataConstants.KEY_WRITER_MAX_RESULT);
+ if (maxResult == null) {
+ return;
+ }
+
+ try {
+ Integer.parseInt(maxResult);
+ } catch (NumberFormatException e) {
+ throw CompilationException.create(ErrorCode.INTEGER_VALUE_EXPECTED, sourceLocation, maxResult);
+ }
+ }
+
+ private static void checkSupported(String paramKey, String value, Set<String> supportedSet, ErrorCode errorCode,
+ SourceLocation sourceLocation, boolean optional) throws CompilationException {
+ if (optional && value == null) {
+ return;
+ }
+
+ if (value == null) {
+ throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, sourceLocation, paramKey);
+ }
+
+ String normalizedValue = value.toLowerCase();
+ if (!supportedSet.contains(normalizedValue)) {
+ throw CompilationException.create(errorCode, sourceLocation, value, supportedSet.toString());
+ }
+ }
+
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/ILocalFSValidator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/ILocalFSValidator.java
new file mode 100644
index 0000000..c026607
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/ILocalFSValidator.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+
+interface ILocalFSValidator {
+ void validate(String directory, SourceLocation sourceLocation) throws HyracksDataException;
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriter.java
index e8983d8..a3a2f70 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriter.java
@@ -23,17 +23,25 @@
import java.io.FileOutputStream;
import java.io.IOException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.runtime.writer.IExternalFilePrinter;
import org.apache.asterix.runtime.writer.IExternalFileWriter;
import org.apache.commons.io.FileUtils;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.data.std.api.IValueReference;
final class LocalFSExternalFileWriter implements IExternalFileWriter {
private final IExternalFilePrinter printer;
+ private final ILocalFSValidator validator;
+ private final SourceLocation pathSourceLocation;
- LocalFSExternalFileWriter(IExternalFilePrinter printer) {
+ LocalFSExternalFileWriter(IExternalFilePrinter printer, ILocalFSValidator validator,
+ SourceLocation pathSourceLocation) {
this.printer = printer;
+ this.validator = validator;
+ this.pathSourceLocation = pathSourceLocation;
}
@Override
@@ -42,18 +50,24 @@
}
@Override
- public void newFile(String path) throws HyracksDataException {
+ public void validate(String directory) throws HyracksDataException {
+ validator.validate(directory, pathSourceLocation);
+ }
+
+ @Override
+ public boolean newFile(String directory, String fileName) throws HyracksDataException {
try {
- File currentFile = new File(path);
- if (currentFile.exists()) {
- currentFile.delete();
- }
+ File parentDirectory = new File(directory);
+ File currentFile = new File(parentDirectory, fileName);
FileUtils.createParentDirectories(currentFile);
- currentFile.createNewFile();
+ if (!currentFile.createNewFile()) {
+ throw RuntimeDataException.create(ErrorCode.COULD_NOT_CREATE_FILE, currentFile.getAbsolutePath());
+ }
printer.newStream(new BufferedOutputStream(new FileOutputStream(currentFile)));
} catch (IOException e) {
throw HyracksDataException.create(e);
}
+ return true;
}
@Override
@@ -62,12 +76,12 @@
}
@Override
- public void abort() {
+ public void abort() throws HyracksDataException {
printer.close();
}
@Override
- public void close() {
+ public void close() throws HyracksDataException {
printer.close();
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriterFactory.java
index d206b2a..313757a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriterFactory.java
@@ -20,31 +20,106 @@
import java.io.File;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.runtime.writer.ExternalFileWriterConfiguration;
import org.apache.asterix.runtime.writer.IExternalFileFilterWriterFactoryProvider;
import org.apache.asterix.runtime.writer.IExternalFilePrinterFactory;
import org.apache.asterix.runtime.writer.IExternalFileWriter;
import org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
public final class LocalFSExternalFileWriterFactory implements IExternalFileWriterFactory {
private static final long serialVersionUID = 871685327574547749L;
- public static final IExternalFileFilterWriterFactoryProvider PROVIDER = c -> new LocalFSExternalFileWriterFactory();
+ private static final char SEPARATOR = File.separatorChar;
+ public static final IExternalFileFilterWriterFactoryProvider PROVIDER =
+ new IExternalFileFilterWriterFactoryProvider() {
+ @Override
+ public IExternalFileWriterFactory create(ExternalFileWriterConfiguration configuration) {
+ return new LocalFSExternalFileWriterFactory(configuration);
+ }
- private LocalFSExternalFileWriterFactory() {
+ @Override
+ public char getSeparator() {
+ return SEPARATOR;
+ }
+ };
+ private static final ILocalFSValidator NO_OP_VALIDATOR = LocalFSExternalFileWriterFactory::noOpValidation;
+ private static final ILocalFSValidator VALIDATOR = LocalFSExternalFileWriterFactory::validate;
+ private final SourceLocation pathSourceLocation;
+ private final boolean singleNodeCluster;
+ private final String staticPath;
+ private boolean validated;
+
+ private LocalFSExternalFileWriterFactory(ExternalFileWriterConfiguration externalConfig) {
+ pathSourceLocation = externalConfig.getPathSourceLocation();
+ singleNodeCluster = externalConfig.isSingleNodeCluster();
+ staticPath = externalConfig.getStaticPath();
+ validated = false;
}
@Override
- public IExternalFileWriter createWriter(IHyracksTaskContext context, IExternalFilePrinterFactory printerFactory) {
- return new LocalFSExternalFileWriter(printerFactory.createPrinter());
+ public IExternalFileWriter createWriter(IHyracksTaskContext context, IExternalFilePrinterFactory printerFactory)
+ throws HyracksDataException {
+ ILocalFSValidator validator = VALIDATOR;
+ if (staticPath != null) {
+ synchronized (this) {
+ validateStaticPath();
+ }
+ validator = NO_OP_VALIDATOR;
+ }
+ return new LocalFSExternalFileWriter(printerFactory.createPrinter(), validator, pathSourceLocation);
}
@Override
- public char getFileSeparator() {
- return File.separatorChar;
+ public char getSeparator() {
+ return SEPARATOR;
}
@Override
- public void validate() {
+ public void validate() throws AlgebricksException {
+ // A special case validation for a single node cluster
+ if (singleNodeCluster && staticPath != null) {
+ if (isNonEmptyDirectory(new File(staticPath))) {
+ throw new CompilationException(ErrorCode.DIRECTORY_IS_NOT_EMPTY, pathSourceLocation, staticPath);
+ }
+
+ // Ensure that it is not validated again in a single node cluster
+ validated = true;
+ }
+ }
+
+ private void validateStaticPath() throws HyracksDataException {
+ if (validated) {
+ return;
+ }
+
+ if (isNonEmptyDirectory(new File(staticPath))) {
+ throw new RuntimeDataException(ErrorCode.DIRECTORY_IS_NOT_EMPTY, pathSourceLocation, staticPath);
+ }
+ validated = true;
+ }
+
+ static boolean isNonEmptyDirectory(File parentDirectory) {
+ if (parentDirectory.exists()) {
+ String[] files = parentDirectory.list();
+ return files != null && files.length != 0;
+ }
+
+ return false;
+ }
+
+ private static void noOpValidation(String directory, SourceLocation sourceLocation) {
// NoOp
}
+
+ private static void validate(String directory, SourceLocation sourceLocation) throws HyracksDataException {
+ if (isNonEmptyDirectory(new File(directory))) {
+ throw new RuntimeDataException(ErrorCode.DIRECTORY_IS_NOT_EMPTY, sourceLocation, directory);
+ }
+ }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinter.java
index bff0db8..8f8c63a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinter.java
@@ -30,6 +30,7 @@
final class TextualExternalFilePrinter implements IExternalFilePrinter {
private final IPrinter printer;
private final IExternalFileCompressStreamFactory compressStreamFactory;
+ private TextualOutputStreamDelegate delegate;
private PrintStream printStream;
TextualExternalFilePrinter(IPrinter printer, IExternalFileCompressStreamFactory compressStreamFactory) {
@@ -45,22 +46,26 @@
@Override
public void newStream(OutputStream outputStream) throws HyracksDataException {
if (printStream != null) {
- printStream.close();
+ close();
}
- printStream = new PrintStream(compressStreamFactory.createStream(outputStream));
+ delegate = new TextualOutputStreamDelegate(compressStreamFactory.createStream(outputStream));
+ printStream = new PrintStream(delegate);
}
@Override
public void print(IValueReference value) throws HyracksDataException {
printer.print(value.getByteArray(), value.getStartOffset(), value.getLength(), printStream);
printStream.println();
+ delegate.checkError();
}
@Override
- public void close() {
- printStream.close();
- if (printStream.checkError()) {
- throw new IllegalStateException("Print error. Check the logs for more information");
+ public void close() throws HyracksDataException {
+ if (printStream != null) {
+ printStream.close();
+ printStream = null;
+ delegate.checkError();
+ delegate = null;
}
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualOutputStreamDelegate.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualOutputStreamDelegate.java
new file mode 100644
index 0000000..2ec2661
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualOutputStreamDelegate.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer.printer;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Objects;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+/**
+ * The purpose of this is to catch and check an error that could be thrown by {@link #stream} when printing results.
+ * {@link java.io.PrintStream} doesn't throw any exception on write. Hence, we can miss all errors thrown during
+ * write operations.
+ *
+ * @see TextualExternalFilePrinter#print(IValueReference)
+ */
+final class TextualOutputStreamDelegate extends OutputStream {
+ private final OutputStream stream;
+ private Exception exception;
+
+ TextualOutputStreamDelegate(OutputStream stream) {
+ this.stream = Objects.requireNonNull(stream);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ try {
+ stream.write(b, off, len);
+ } catch (Exception e) {
+ this.exception = e;
+ throw e;
+ }
+
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ try {
+ stream.write(b);
+ } catch (Exception e) {
+ this.exception = e;
+ throw e;
+ }
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ try {
+ stream.write(b);
+ } catch (Exception e) {
+ this.exception = e;
+ throw e;
+ }
+ }
+
+ @Override
+ public synchronized void flush() throws IOException {
+ try {
+ stream.flush();
+ } catch (Exception e) {
+ this.exception = e;
+ throw e;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ stream.close();
+ } catch (Exception e) {
+ this.exception = e;
+ throw e;
+ }
+ }
+
+ void checkError() throws HyracksDataException {
+ if (exception != null) {
+ throw HyracksDataException.create(exception);
+ }
+ }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/AbstractPathResolver.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/AbstractPathResolver.java
index 88153de..bacf434 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/AbstractPathResolver.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/AbstractPathResolver.java
@@ -18,66 +18,52 @@
*/
package org.apache.asterix.runtime.writer;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-
abstract class AbstractPathResolver implements IPathResolver {
- // TODO is 4-digits enough?
- // TODO do we need jobId?
//4-digit format for the partition number, jobId, and file counter
- private static final int NUMBER_OF_DIGITS = 4;
- private static final String FILE_FORMAT = "%0" + NUMBER_OF_DIGITS + "d";
+ private static final int PREFIX_NUMBER_OF_DIGITS = 4;
+ private static final String PREFIX_FILE_FORMAT = "%0" + PREFIX_NUMBER_OF_DIGITS + "d";
+ private static final String FILE_COUNTER_FORMAT = "%0" + getMaxNumberOfDigits() + "d";
private final String fileExtension;
- private final char fileSeparator;
- private final int partition;
- private final long jobId;
- private final StringBuilder pathStringBuilder;
- private int fileCounter;
+ private final int fileCounterOffset;
+ private final StringBuilder fileStringBuilder;
+ protected final char fileSeparator;
+ private long fileCounter;
- AbstractPathResolver(String fileExtension, char fileSeparator, int partition, long jobId) {
+ AbstractPathResolver(String fileExtension, char fileSeparator, int partition) {
this.fileExtension = fileExtension;
this.fileSeparator = fileSeparator;
- this.partition = partition;
- this.jobId = jobId;
- pathStringBuilder = new StringBuilder();
+ fileStringBuilder = new StringBuilder();
+ fileCounterOffset = initFileBuilder(fileStringBuilder, partition, fileExtension);
fileCounter = 0;
}
@Override
- public final String getPartitionPath(IFrameTupleReference tuple) throws HyracksDataException {
- fileCounter = 0;
- pathStringBuilder.setLength(0);
- appendPrefix(pathStringBuilder, tuple);
- if (pathStringBuilder.charAt(pathStringBuilder.length() - 1) != fileSeparator) {
- pathStringBuilder.append(fileSeparator);
- }
- pathStringBuilder.append(String.format(FILE_FORMAT, partition));
- pathStringBuilder.append('-');
- pathStringBuilder.append(String.format(FILE_FORMAT, jobId));
- pathStringBuilder.append('-');
- pathStringBuilder.append(String.format(FILE_FORMAT, fileCounter++));
+ public final String getNextFileName() {
+ fileStringBuilder.setLength(fileCounterOffset);
+ fileStringBuilder.append(String.format(FILE_COUNTER_FORMAT, fileCounter++));
if (fileExtension != null && !fileExtension.isEmpty()) {
- pathStringBuilder.append('.');
- pathStringBuilder.append(fileExtension);
+ fileStringBuilder.append('.');
+ fileStringBuilder.append(fileExtension);
}
- return pathStringBuilder.toString();
+ return fileStringBuilder.toString();
}
- @Override
- public final String getNextPath() {
- int numOfCharToRemove = NUMBER_OF_DIGITS;
+ private static int initFileBuilder(StringBuilder fileStringBuilder, int partition, String fileExtension) {
+ fileStringBuilder.append(String.format(PREFIX_FILE_FORMAT, partition));
+ fileStringBuilder.append('-');
+ int offset = fileStringBuilder.length();
+ // dummy
+ fileStringBuilder.append(String.format(FILE_COUNTER_FORMAT, 0));
if (fileExtension != null && !fileExtension.isEmpty()) {
- numOfCharToRemove += 1 + fileExtension.length();
+ fileStringBuilder.append('.');
+ fileStringBuilder.append(fileExtension);
}
- pathStringBuilder.setLength(pathStringBuilder.length() - numOfCharToRemove);
- pathStringBuilder.append(String.format(FILE_FORMAT, fileCounter++));
- if (fileExtension != null && !fileExtension.isEmpty()) {
- pathStringBuilder.append('.');
- pathStringBuilder.append(fileExtension);
- }
- return pathStringBuilder.toString();
+ return offset;
}
- abstract void appendPrefix(StringBuilder pathStringBuilder, IFrameTupleReference tuple) throws HyracksDataException;
+ private static int getMaxNumberOfDigits() {
+ return Long.toString(Long.MAX_VALUE).length();
+ }
+
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/DynamicPathResolver.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/DynamicPathResolver.java
index dbd97e4..a1d2411 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/DynamicPathResolver.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/DynamicPathResolver.java
@@ -20,10 +20,13 @@
import java.io.UTFDataFormatException;
-import org.apache.asterix.runtime.evaluators.functions.PointableHelper;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.om.types.ATypeTag;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.api.exceptions.Warning;
import org.apache.hyracks.data.std.primitive.VoidPointable;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
import org.apache.hyracks.util.string.UTF8CharBuffer;
@@ -31,26 +34,44 @@
final class DynamicPathResolver extends AbstractPathResolver {
private final IScalarEvaluator pathEval;
- private final String inappropriatePartitionPath;
+ private final IWarningCollector warningCollector;
+ private final StringBuilder dirStringBuilder;
private final VoidPointable pathResult;
private final UTF8CharBuffer charBuffer;
+ private final SourceLocation pathSourceLocation;
- DynamicPathResolver(String fileExtension, char fileSeparator, int partition, long jobId, IScalarEvaluator pathEval,
- String inappropriatePartitionPath, IWarningCollector warningCollector) {
- super(fileExtension, fileSeparator, partition, jobId);
+ DynamicPathResolver(String fileExtension, char fileSeparator, int partition, IScalarEvaluator pathEval,
+ IWarningCollector warningCollector, SourceLocation pathSourceLocation) {
+ super(fileExtension, fileSeparator, partition);
this.pathEval = pathEval;
- this.inappropriatePartitionPath = inappropriatePartitionPath;
+ this.warningCollector = warningCollector;
+ this.pathSourceLocation = pathSourceLocation;
+ dirStringBuilder = new StringBuilder();
pathResult = new VoidPointable();
charBuffer = new UTF8CharBuffer();
}
@Override
- void appendPrefix(StringBuilder pathStringBuilder, IFrameTupleReference tuple) throws HyracksDataException {
+ public String getPartitionDirectory(IFrameTupleReference tuple) throws HyracksDataException {
+ if (!appendPrefix(tuple)) {
+ return ExternalWriter.UNRESOLVABLE_PATH;
+ }
+
+ if (dirStringBuilder.charAt(dirStringBuilder.length() - 1) != fileSeparator) {
+ dirStringBuilder.append(fileSeparator);
+ }
+ return dirStringBuilder.toString();
+ }
+
+ private boolean appendPrefix(IFrameTupleReference tuple) throws HyracksDataException {
+ dirStringBuilder.setLength(0);
pathEval.evaluate(tuple, pathResult);
- if (PointableHelper.isNullOrMissing(pathResult)) {
- // TODO warn
- pathStringBuilder.append(inappropriatePartitionPath);
- return;
+ ATypeTag typeTag = ATypeTag.VALUE_TYPE_MAPPING[pathResult.getByteArray()[0]];
+ if (typeTag != ATypeTag.STRING) {
+ if (warningCollector.shouldWarn()) {
+ warningCollector.warn(Warning.of(pathSourceLocation, ErrorCode.NON_STRING_WRITE_PATH, typeTag));
+ }
+ return false;
}
try {
@@ -58,6 +79,7 @@
} catch (UTFDataFormatException e) {
throw HyracksDataException.create(e);
}
- pathStringBuilder.append(charBuffer.getBuffer(), 0, charBuffer.getFilledLength());
+ dirStringBuilder.append(charBuffer.getBuffer(), 0, charBuffer.getFilledLength());
+ return true;
}
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriterConfiguration.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriterConfiguration.java
new file mode 100644
index 0000000..b62a07a
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriterConfiguration.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.writer;
+
+import java.util.Map;
+
+import org.apache.hyracks.api.exceptions.SourceLocation;
+
+public final class ExternalFileWriterConfiguration {
+ private final Map<String, String> configuration;
+ private final SourceLocation pathSourceLocation;
+ private final String staticPath;
+ private final boolean singleNodeCluster;
+
+ public ExternalFileWriterConfiguration(Map<String, String> configuration, SourceLocation pathSourceLocation,
+ String staticPath, boolean singleNodeCluster) {
+ this.configuration = configuration;
+ this.pathSourceLocation = pathSourceLocation;
+ this.staticPath = staticPath;
+ this.singleNodeCluster = singleNodeCluster;
+ }
+
+ public Map<String, String> getConfiguration() {
+ return configuration;
+ }
+
+ public SourceLocation getPathSourceLocation() {
+ return pathSourceLocation;
+ }
+
+ public String getStaticPath() {
+ return staticPath;
+ }
+
+ public boolean isSingleNodeCluster() {
+ return singleNodeCluster;
+ }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriter.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriter.java
index a4af805..5fc07af 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriter.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriter.java
@@ -24,9 +24,11 @@
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
final class ExternalWriter implements IExternalWriter {
+ static final String UNRESOLVABLE_PATH = "UNRESOLVABLE_PATH";
private final IPathResolver pathResolver;
private final IExternalFileWriter writer;
private final int maxResultPerFile;
+ private String partitionPath;
private int tupleCounter;
public ExternalWriter(IPathResolver pathResolver, IExternalFileWriter writer, int maxResultPerFile) {
@@ -42,17 +44,23 @@
@Override
public void initNewPartition(IFrameTupleReference tuple) throws HyracksDataException {
- tupleCounter = 0;
- writer.newFile(pathResolver.getPartitionPath(tuple));
+ partitionPath = pathResolver.getPartitionDirectory(tuple);
+ if (UNRESOLVABLE_PATH != partitionPath) {
+ writer.validate(partitionPath);
+ newFile();
+ }
}
@Override
public void write(IValueReference value) throws HyracksDataException {
+ if (UNRESOLVABLE_PATH == partitionPath) {
+ // Ignore writing values for unresolvable partition paths
+ return;
+ }
writer.write(value);
tupleCounter++;
if (tupleCounter >= maxResultPerFile) {
- tupleCounter = 0;
- writer.newFile(pathResolver.getNextPath());
+ newFile();
}
}
@@ -65,4 +73,12 @@
public void close() throws HyracksDataException {
writer.close();
}
+
+ private void newFile() throws HyracksDataException {
+ tupleCounter = 0;
+ if (!writer.newFile(partitionPath, pathResolver.getNextFileName())) {
+ // the partitionPath could contain illegal chars or the length of the total path is too long
+ partitionPath = UNRESOLVABLE_PATH;
+ }
+ }
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriterFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriterFactory.java
index 350c5d6..e7c0db0 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriterFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriterFactory.java
@@ -26,6 +26,7 @@
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.SourceLocation;
public class ExternalWriterFactory implements IExternalWriterFactory {
private static final long serialVersionUID = 1412969574113419638L;
@@ -34,35 +35,34 @@
private final String fileExtension;
private final int maxResult;
private final IScalarEvaluatorFactory pathEvalFactory;
- private final String inappropriatePartitionPath;
private final String staticPath;
+ private final SourceLocation pathSourceLocation;
public ExternalWriterFactory(IExternalFileWriterFactory writerFactory, IExternalFilePrinterFactory printerFactory,
- String fileExtension, int maxResult, IScalarEvaluatorFactory pathEvalFactory,
- String inappropriatePartitionPath, String staticPath) {
+ String fileExtension, int maxResult, IScalarEvaluatorFactory pathEvalFactory, String staticPath,
+ SourceLocation pathSourceLocation) {
this.writerFactory = writerFactory;
this.printerFactory = printerFactory;
this.fileExtension = fileExtension;
this.maxResult = maxResult;
this.pathEvalFactory = pathEvalFactory;
- this.inappropriatePartitionPath = inappropriatePartitionPath;
this.staticPath = staticPath;
+ this.pathSourceLocation = pathSourceLocation;
}
@Override
public IExternalWriter createWriter(IHyracksTaskContext context) throws HyracksDataException {
int partition = context.getTaskAttemptId().getTaskId().getPartition();
- long jobId = context.getJobletContext().getJobId().getId();
- char fileSeparator = writerFactory.getFileSeparator();
+ char fileSeparator = writerFactory.getSeparator();
IPathResolver resolver;
if (staticPath == null) {
EvaluatorContext evaluatorContext = new EvaluatorContext(context);
IScalarEvaluator pathEval = pathEvalFactory.createScalarEvaluator(evaluatorContext);
IWarningCollector warningCollector = context.getWarningCollector();
- resolver = new DynamicPathResolver(fileExtension, fileSeparator, partition, jobId, pathEval,
- inappropriatePartitionPath, warningCollector);
+ resolver = new DynamicPathResolver(fileExtension, fileSeparator, partition, pathEval, warningCollector,
+ pathSourceLocation);
} else {
- resolver = new StaticPathResolver(fileExtension, fileSeparator, partition, jobId, staticPath);
+ resolver = new StaticPathResolver(fileExtension, fileSeparator, partition, staticPath);
}
IExternalFileWriter writer = writerFactory.createWriter(context, printerFactory);
return new ExternalWriter(resolver, writer, maxResult);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileFilterWriterFactoryProvider.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileFilterWriterFactoryProvider.java
index 2de5e11..7a863f7 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileFilterWriterFactoryProvider.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileFilterWriterFactoryProvider.java
@@ -18,9 +18,8 @@
*/
package org.apache.asterix.runtime.writer;
-import java.util.Map;
-
-@FunctionalInterface
public interface IExternalFileFilterWriterFactoryProvider {
- IExternalFileWriterFactory create(Map<String, String> configuration);
+ IExternalFileWriterFactory create(ExternalFileWriterConfiguration configuration);
+
+ char getSeparator();
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinter.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinter.java
index fb89f4a..ba5fa1d 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinter.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinter.java
@@ -50,5 +50,5 @@
/**
* Flush and close the printer
*/
- void close();
+ void close() throws HyracksDataException;
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriter.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriter.java
index c07e8cb..f8ae81b 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriter.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriter.java
@@ -32,11 +32,20 @@
void open() throws HyracksDataException;
/**
+ * Validate the writing directory
+ *
+ * @param directory to write
+ */
+ void validate(String directory) throws HyracksDataException;
+
+ /**
* Initialize the writer to write to a new path
*
- * @param path of the file to writer (including the file name)
+ * @param directory of where to write the file
+ * @param fileName of the file name to create
+ * @return true if a new file can be created, false otherwise
*/
- void newFile(String path) throws HyracksDataException;
+ boolean newFile(String directory, String fileName) throws HyracksDataException;
/**
* Writer the provided value
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriterFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriterFactory.java
index 2ab6dac..d8f1f84 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriterFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriterFactory.java
@@ -42,7 +42,7 @@
/**
* @return file (or path) separator
*/
- char getFileSeparator();
+ char getSeparator();
/**
* Validate the writer by running a test write routine to ensure the writer has the appropriate permissions
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IPathResolver.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IPathResolver.java
index 2419210..35b4ddd 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IPathResolver.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IPathResolver.java
@@ -32,10 +32,11 @@
* @param tuple contains the partitioning values
* @return the final path which includes the partitioning values
*/
- String getPartitionPath(IFrameTupleReference tuple) throws HyracksDataException;
+ String getPartitionDirectory(IFrameTupleReference tuple) throws HyracksDataException;
/**
* @return the path of the next file to be written for the same partition
*/
- String getNextPath();
+ String getNextFileName();
+
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/StaticPathResolver.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/StaticPathResolver.java
index 34c6575..4caf44f 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/StaticPathResolver.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/StaticPathResolver.java
@@ -18,19 +18,23 @@
*/
package org.apache.asterix.runtime.writer;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
final class StaticPathResolver extends AbstractPathResolver {
- private final String prefix;
+ private final String directoryPath;
- StaticPathResolver(String fileExtension, char fileSeparator, int partition, long jobId, String prefix) {
- super(fileExtension, fileSeparator, partition, jobId);
- this.prefix = prefix;
+ StaticPathResolver(String fileExtension, char fileSeparator, int partition, String directoryPath) {
+ super(fileExtension, fileSeparator, partition);
+
+ if (directoryPath.charAt(directoryPath.length() - 1) != fileSeparator) {
+ this.directoryPath = directoryPath + fileSeparator;
+ } else {
+ this.directoryPath = directoryPath;
+ }
}
@Override
- void appendPrefix(StringBuilder pathStringBuilder, IFrameTupleReference tuple) throws HyracksDataException {
- pathStringBuilder.append(prefix);
+ public String getPartitionDirectory(IFrameTupleReference tuple) {
+ return directoryPath;
}
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntime.java
index e96531f..01e137b 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntime.java
@@ -22,6 +22,7 @@
import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputSinkPushRuntime;
import org.apache.hyracks.algebricks.runtime.writers.IExternalWriter;
+import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -46,6 +47,7 @@
private FrameTupleAccessor tupleAccessor;
private FrameTupleReference tupleRef;
private boolean first;
+ private IFrameWriter frameWriter;
SinkExternalWriterRuntime(int sourceColumn, int[] partitionColumns, IBinaryComparator[] partitionComparators,
RecordDescriptor inputRecordDesc, IExternalWriter writer) {
@@ -68,6 +70,7 @@
tupleAccessor = new FrameTupleAccessor(inputRecordDesc);
tupleRef = new FrameTupleReference();
}
+ this.frameWriter.open();
}
@Override
@@ -88,11 +91,18 @@
@Override
public void fail() throws HyracksDataException {
writer.abort();
+ frameWriter.fail();
}
@Override
public void close() throws HyracksDataException {
writer.close();
+ frameWriter.close();
+ }
+
+ @Override
+ public void setOutputFrameWriter(int index, IFrameWriter frameWriter, RecordDescriptor recordDesc) {
+ this.frameWriter = frameWriter;
}
private boolean isNewPartition(int index) throws HyracksDataException {
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index 7291473..af95280 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -154,6 +154,7 @@
PARSING_ERROR(124),
INVALID_INVERTED_LIST_TYPE_TRAITS(125),
ILLEGAL_STATE(126),
+ UNSUPPORTED_WRITE_SPEC(127),
// Compilation error codes.
RULECOLLECTION_NOT_INSTANCE_OF_LIST(10000),
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 4d9c60b..b2e6312 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -144,6 +144,7 @@
124 = Parsing error %s: %s
125 = Invalid inverted list type traits: %1$s
126 = Illegal state. %1$s
+127 = Unsupported copy to specification: PARTITION BY %1$s, ORDER BY %2$s
10000 = The given rule collection %1$s is not an instance of the List class.
10001 = Cannot compose partition constraint %1$s with %2$s