[ASTERIXDB-3288][RT] Introduce COPY TO runtime
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
Introduce the necessary writers for COPY TO
Change-Id: Ie107e707189bdb7fc17b09439a08d47192cfa61f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17878
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-cloud/pom.xml b/asterixdb/asterix-cloud/pom.xml
index 9e840c2..79ee4c1 100644
--- a/asterixdb/asterix-cloud/pom.xml
+++ b/asterixdb/asterix-cloud/pom.xml
@@ -70,6 +70,11 @@
<artifactId>asterix-common</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.asterix</groupId>
+ <artifactId>asterix-external-data</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<!-- aws s3 start -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
index 8c7cd8a..dc8bc68 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
@@ -59,7 +59,7 @@
//TODO(DB): change
private final String metadataNamespacePath;
protected final ICloudClient cloudClient;
- protected final WriteBufferProvider writeBufferProvider;
+ protected final IWriteBufferProvider writeBufferProvider;
protected final String bucket;
protected final Set<Integer> partitions;
protected final List<FileReference> partitionPaths;
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudFileHandle.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudFileHandle.java
index 8572014..14c44ad 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudFileHandle.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudFileHandle.java
@@ -30,7 +30,7 @@
private final CloudResettableInputStream inputStream;
public CloudFileHandle(ICloudClient cloudClient, String bucket, FileReference fileRef,
- WriteBufferProvider bufferProvider) {
+ IWriteBufferProvider bufferProvider) {
super(fileRef);
ICloudBufferedWriter bufferedWriter = cloudClient.createBufferedWriter(bucket, fileRef.getRelativePath());
inputStream = new CloudResettableInputStream(bufferedWriter, bufferProvider);
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudOutputStream.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudOutputStream.java
new file mode 100644
index 0000000..18a97ad
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudOutputStream.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class CloudOutputStream extends OutputStream {
+ private final CloudResettableInputStream inputStream;
+
+ public CloudOutputStream(CloudResettableInputStream inputStream) {
+ this.inputStream = inputStream;
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ inputStream.write(b, off, len);
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ inputStream.write(b);
+ }
+
+ @Override
+ public void close() throws IOException {
+ inputStream.finish();
+ }
+
+ public void abort() throws IOException {
+ inputStream.abort();
+ }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
index 7ba95c5..f198001 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
@@ -24,16 +24,19 @@
import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
public class CloudResettableInputStream extends InputStream {
+ private static final Logger LOGGER = LogManager.getLogger();
// TODO: make configurable
public static final int MIN_BUFFER_SIZE = 5 * 1024 * 1024;
- private final WriteBufferProvider bufferProvider;
+ private final IWriteBufferProvider bufferProvider;
private ByteBuffer writeBuffer;
private final ICloudBufferedWriter bufferedWriter;
- public CloudResettableInputStream(ICloudBufferedWriter bufferedWriter, WriteBufferProvider bufferProvider) {
+ public CloudResettableInputStream(ICloudBufferedWriter bufferedWriter, IWriteBufferProvider bufferProvider) {
this.bufferedWriter = bufferedWriter;
this.bufferProvider = bufferProvider;
}
@@ -61,16 +64,24 @@
}
public void write(ByteBuffer header, ByteBuffer page) throws HyracksDataException {
- open();
write(header);
write(page);
}
public int write(ByteBuffer page) throws HyracksDataException {
open();
+ return write(page.array(), 0, page.limit());
+ }
- // amount to write
- int size = page.limit();
+ public void write(int b) throws HyracksDataException {
+ if (writeBuffer.remaining() == 0) {
+ uploadAndWait();
+ }
+ writeBuffer.put((byte) b);
+ }
+
+ public int write(byte[] b, int off, int len) throws HyracksDataException {
+ open();
// full buffer = upload -> write all
if (writeBuffer.remaining() == 0) {
@@ -78,23 +89,23 @@
}
// write partial -> upload -> write -> upload -> ...
- int offset = 0;
- int pageRemaining = size;
+ int offset = off;
+ int pageRemaining = len;
while (pageRemaining > 0) {
// enough to write all
if (writeBuffer.remaining() > pageRemaining) {
- writeBuffer.put(page.array(), offset, pageRemaining);
- return size;
+ writeBuffer.put(b, offset, pageRemaining);
+ return len;
}
int remaining = writeBuffer.remaining();
- writeBuffer.put(page.array(), offset, remaining);
+ writeBuffer.put(b, offset, remaining);
pageRemaining -= remaining;
offset += remaining;
uploadAndWait();
}
- return size;
+ return len;
}
public void finish() throws HyracksDataException {
@@ -128,6 +139,7 @@
try {
bufferedWriter.upload(this, writeBuffer.limit());
} catch (Exception e) {
+ LOGGER.fatal(e);
throw HyracksDataException.create(e);
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/IWriteBufferProvider.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/IWriteBufferProvider.java
new file mode 100644
index 0000000..693b73a
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/IWriteBufferProvider.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud;
+
+import java.nio.ByteBuffer;
+
+public interface IWriteBufferProvider {
+ ByteBuffer getBuffer();
+
+ void recycle(ByteBuffer buffer);
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java
index 5e49be3..ee17400 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java
@@ -24,17 +24,22 @@
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
-public class WriteBufferProvider {
+import org.apache.hyracks.util.annotations.ThreadSafe;
+
+@ThreadSafe
+public class WriteBufferProvider implements IWriteBufferProvider {
private final BlockingQueue<ByteBuffer> writeBuffers;
public WriteBufferProvider(int ioParallelism) {
writeBuffers = new ArrayBlockingQueue<>(ioParallelism);
}
+ @Override
public void recycle(ByteBuffer buffer) {
writeBuffers.offer(buffer);
}
+ @Override
public ByteBuffer getBuffer() {
ByteBuffer writeBuffer = writeBuffers.poll();
if (writeBuffer == null) {
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriterSingleBufferProvider.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriterSingleBufferProvider.java
new file mode 100644
index 0000000..287900d
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriterSingleBufferProvider.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud;
+
+import static org.apache.asterix.cloud.CloudResettableInputStream.MIN_BUFFER_SIZE;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.util.annotations.NotThreadSafe;
+
+@NotThreadSafe
+public class WriterSingleBufferProvider implements IWriteBufferProvider {
+
+ private final ByteBuffer buffer;
+
+ public WriterSingleBufferProvider() {
+ buffer = ByteBuffer.allocate(MIN_BUFFER_SIZE);
+ }
+
+ @Override
+ public ByteBuffer getBuffer() {
+ buffer.clear();
+ return buffer;
+ }
+
+ @Override
+ public void recycle(ByteBuffer buffer) {
+ // NoOp
+ }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
index 56ed3cf..a2aa21c 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
@@ -18,13 +18,18 @@
*/
package org.apache.asterix.cloud.clients.aws.s3;
+import java.io.Serializable;
+import java.util.Map;
+
import org.apache.asterix.common.config.CloudProperties;
+import org.apache.asterix.external.util.aws.s3.S3Constants;
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
-public class S3ClientConfig {
+public final class S3ClientConfig implements Serializable {
+ private static final long serialVersionUID = 548292720313565948L;
// The maximum number of file that can be deleted (AWS restriction)
static final int DELETE_BATCH_SIZE = 1000;
private final String region;
@@ -48,6 +53,21 @@
cloudProperties.getProfilerLogInterval());
}
+ public static S3ClientConfig of(Map<String, String> configuration) {
+ // Used to determine local vs. actual S3
+ String endPoint = configuration.get(S3Constants.SERVICE_END_POINT_FIELD_NAME);
+ // Disabled
+ long profilerLogInterval = 0;
+
+ // Dummy values;
+ String region = "";
+ String prefix = null;
+ boolean anonymousAuth = false;
+
+ return new S3ClientConfig(region, configuration.get(S3Constants.SERVICE_END_POINT_FIELD_NAME), "",
+ anonymousAuth, profilerLogInterval);
+ }
+
public String getRegion() {
return region;
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
index 02e1d4f..b47a6ff 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
@@ -19,8 +19,8 @@
package org.apache.asterix.cloud.clients.aws.s3;
import static org.apache.asterix.cloud.clients.aws.s3.S3ClientConfig.DELETE_BATCH_SIZE;
-import static org.apache.asterix.cloud.clients.aws.s3.S3Utils.encodeURI;
-import static org.apache.asterix.cloud.clients.aws.s3.S3Utils.listS3Objects;
+import static org.apache.asterix.cloud.clients.aws.s3.S3CloudClientUtils.encodeURI;
+import static org.apache.asterix.cloud.clients.aws.s3.S3CloudClientUtils.listS3Objects;
import java.io.FilenameFilter;
import java.io.IOException;
@@ -45,6 +45,7 @@
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.util.IoUtil;
import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.util.annotations.ThreadSafe;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -67,21 +68,25 @@
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Object;
+@ThreadSafe
public class S3CloudClient implements ICloudClient {
private final S3ClientConfig config;
private final S3Client s3Client;
private final IRequestProfiler profiler;
public S3CloudClient(S3ClientConfig config) {
+ this(config, buildClient(config));
+ }
+
+ public S3CloudClient(S3ClientConfig config, S3Client s3Client) {
this.config = config;
- s3Client = buildClient();
+ this.s3Client = s3Client;
long profilerInterval = config.getProfilerLogInterval();
if (profilerInterval > 0) {
profiler = new CountRequestProfiler(profilerInterval);
} else {
profiler = NoOpRequestProfiler.INSTANCE;
}
-
}
@Override
@@ -245,7 +250,7 @@
s3Client.close();
}
- private S3Client buildClient() {
+ private static S3Client buildClient(S3ClientConfig config) {
S3ClientBuilder builder = S3Client.builder();
builder.credentialsProvider(config.createCredentialsProvider());
builder.region(Region.of(config.getRegion()));
@@ -264,7 +269,7 @@
private Set<String> filterAndGet(List<S3Object> contents, FilenameFilter filter) {
Set<String> files = new HashSet<>();
for (S3Object s3Object : contents) {
- String path = config.isLocalS3Provider() ? S3Utils.decodeURI(s3Object.key()) : s3Object.key();
+ String path = config.isLocalS3Provider() ? S3CloudClientUtils.decodeURI(s3Object.key()) : s3Object.key();
if (filter.accept(null, IoUtil.getFileNameFromPath(path))) {
files.add(path);
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3Utils.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClientUtils.java
similarity index 95%
rename from asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3Utils.java
rename to asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClientUtils.java
index 8901dec..82b0ae1 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3Utils.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClientUtils.java
@@ -31,9 +31,9 @@
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.S3Object;
-public class S3Utils {
+public class S3CloudClientUtils {
- private S3Utils() {
+ private S3CloudClientUtils() {
throw new AssertionError("do not instantiate");
}
@@ -78,7 +78,7 @@
return URLDecoder.decode(path, Charset.defaultCharset());
}
- public static String toCloudPrefix(String path) {
+ private static String toCloudPrefix(String path) {
return path.startsWith(File.separator) ? path.substring(1) : path;
}
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/CloudExternalFileWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/CloudExternalFileWriter.java
new file mode 100644
index 0000000..22095ca
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/CloudExternalFileWriter.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.writer;
+
+import org.apache.asterix.cloud.CloudOutputStream;
+import org.apache.asterix.cloud.CloudResettableInputStream;
+import org.apache.asterix.cloud.IWriteBufferProvider;
+import org.apache.asterix.cloud.WriterSingleBufferProvider;
+import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
+import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.asterix.runtime.writer.IExternalFilePrinter;
+import org.apache.asterix.runtime.writer.IExternalFileWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+final class CloudExternalFileWriter implements IExternalFileWriter {
+ private final IExternalFilePrinter printer;
+ private final ICloudClient cloudClient;
+ private final String bucket;
+ private final IWriteBufferProvider bufferProvider;
+ private ICloudBufferedWriter bufferedWriter;
+
+ public CloudExternalFileWriter(IExternalFilePrinter printer, ICloudClient cloudClient, String bucket) {
+ this.printer = printer;
+ this.cloudClient = cloudClient;
+ this.bucket = bucket;
+ bufferProvider = new WriterSingleBufferProvider();
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ printer.open();
+ }
+
+ @Override
+ public void newFile(String path) throws HyracksDataException {
+ bufferedWriter = cloudClient.createBufferedWriter(bucket, path);
+ CloudResettableInputStream inputStream = new CloudResettableInputStream(bufferedWriter, bufferProvider);
+
+ CloudOutputStream outputStream = new CloudOutputStream(inputStream);
+ printer.newStream(outputStream);
+ }
+
+ @Override
+ public void write(IValueReference value) throws HyracksDataException {
+ printer.print(value);
+ }
+
+ @Override
+ public void abort() throws HyracksDataException {
+ bufferedWriter.abort();
+ printer.close();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ printer.close();
+ }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
new file mode 100644
index 0000000..204d5da
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.writer;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.asterix.cloud.CloudResettableInputStream;
+import org.apache.asterix.cloud.WriterSingleBufferProvider;
+import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
+import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.asterix.cloud.clients.aws.s3.S3ClientConfig;
+import org.apache.asterix.cloud.clients.aws.s3.S3CloudClient;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.aws.s3.S3Utils;
+import org.apache.asterix.runtime.writer.IExternalFileFilterWriterFactoryProvider;
+import org.apache.asterix.runtime.writer.IExternalFilePrinterFactory;
+import org.apache.asterix.runtime.writer.IExternalFileWriter;
+import org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.ExceptionUtils;
+import org.apache.hyracks.data.std.primitive.LongPointable;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import software.amazon.awssdk.services.s3.model.NoSuchBucketException;
+
+public final class S3ExternalFileWriterFactory implements IExternalFileWriterFactory {
+ private static final Logger LOGGER = LogManager.getLogger();
+ private static final long serialVersionUID = 4551318140901866805L;
+ public static final IExternalFileFilterWriterFactoryProvider PROVIDER = S3ExternalFileWriterFactory::new;
+ private final S3ClientConfig config;
+ private final Map<String, String> configuration;
+ private transient S3CloudClient cloudClient;
+
+ private S3ExternalFileWriterFactory(Map<String, String> configuration) {
+ this.config = S3ClientConfig.of(configuration);
+ this.configuration = configuration;
+ cloudClient = null;
+ }
+
+ @Override
+ public IExternalFileWriter createWriter(IHyracksTaskContext context, IExternalFilePrinterFactory printerFactory)
+ throws HyracksDataException {
+ buildClient();
+ String bucket = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+ return new CloudExternalFileWriter(printerFactory.createPrinter(), cloudClient, bucket);
+ }
+
+ private void buildClient() throws HyracksDataException {
+ try {
+ synchronized (this) {
+ if (cloudClient == null) {
+ // only a single client should be build
+ cloudClient = new S3CloudClient(config, S3Utils.buildAwsS3Client(configuration));
+ }
+ }
+ } catch (CompilationException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ @Override
+ public char getFileSeparator() {
+ return '/';
+ }
+
+ @Override
+ public void validate() throws AlgebricksException {
+ ICloudClient testClient = new S3CloudClient(config, S3Utils.buildAwsS3Client(configuration));
+ String bucket = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+ if (bucket == null || bucket.isEmpty()) {
+ throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED,
+ ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+ }
+ try {
+ doValidate(testClient, bucket);
+ } catch (IOException e) {
+ if (e.getCause() instanceof NoSuchBucketException) {
+ throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, bucket);
+ } else {
+ LOGGER.fatal(e);
+ throw CompilationException.create(ErrorCode.EXTERNAL_SOURCE_ERROR,
+ ExceptionUtils.getMessageOrToString(e));
+ }
+ }
+ }
+
+ private static void doValidate(ICloudClient testClient, String bucket) throws IOException {
+ Random random = new Random();
+ String pathPrefix = "testFile";
+ String path = pathPrefix + random.nextInt();
+ while (testClient.exists(bucket, path)) {
+ path = pathPrefix + random.nextInt();
+ }
+
+ long writeValue = random.nextLong();
+ byte[] data = new byte[Long.BYTES];
+ LongPointable.setLong(data, 0, writeValue);
+ ICloudBufferedWriter writer = testClient.createBufferedWriter(bucket, path);
+ CloudResettableInputStream stream = null;
+ boolean aborted = false;
+ try {
+ stream = new CloudResettableInputStream(writer, new WriterSingleBufferProvider());
+ stream.write(data, 0, data.length);
+ } catch (HyracksDataException e) {
+ stream.abort();
+ } finally {
+ if (stream != null && !aborted) {
+ stream.finish();
+ stream.close();
+ }
+ }
+
+ try {
+ long readValue = LongPointable.getLong(testClient.readAllBytes(bucket, path), 0);
+ if (writeValue != readValue) {
+ // This should never happen unless S3 is messed up. But log for sanity check
+ LOGGER.warn(
+ "The writer can write but the written values wasn't successfully read back (wrote: {}, read:{})",
+ writeValue, readValue);
+ }
+ } finally {
+ // Delete the written file
+ testClient.deleteObjects(bucket, Collections.singleton(path));
+ }
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriter.java
new file mode 100644
index 0000000..e8983d8
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriter.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import org.apache.asterix.runtime.writer.IExternalFilePrinter;
+import org.apache.asterix.runtime.writer.IExternalFileWriter;
+import org.apache.commons.io.FileUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+final class LocalFSExternalFileWriter implements IExternalFileWriter {
+ private final IExternalFilePrinter printer;
+
+ LocalFSExternalFileWriter(IExternalFilePrinter printer) {
+ this.printer = printer;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ printer.open();
+ }
+
+ @Override
+ public void newFile(String path) throws HyracksDataException {
+ try {
+ File currentFile = new File(path);
+ if (currentFile.exists()) {
+ currentFile.delete();
+ }
+ FileUtils.createParentDirectories(currentFile);
+ currentFile.createNewFile();
+ printer.newStream(new BufferedOutputStream(new FileOutputStream(currentFile)));
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ @Override
+ public void write(IValueReference value) throws HyracksDataException {
+ printer.print(value);
+ }
+
+ @Override
+ public void abort() {
+ printer.close();
+ }
+
+ @Override
+ public void close() {
+ printer.close();
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriterFactory.java
new file mode 100644
index 0000000..d206b2a
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriterFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer;
+
+import java.io.File;
+
+import org.apache.asterix.runtime.writer.IExternalFileFilterWriterFactoryProvider;
+import org.apache.asterix.runtime.writer.IExternalFilePrinterFactory;
+import org.apache.asterix.runtime.writer.IExternalFileWriter;
+import org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+public final class LocalFSExternalFileWriterFactory implements IExternalFileWriterFactory {
+ private static final long serialVersionUID = 871685327574547749L;
+ public static final IExternalFileFilterWriterFactoryProvider PROVIDER = c -> new LocalFSExternalFileWriterFactory();
+
+ private LocalFSExternalFileWriterFactory() {
+ }
+
+ @Override
+ public IExternalFileWriter createWriter(IHyracksTaskContext context, IExternalFilePrinterFactory printerFactory) {
+ return new LocalFSExternalFileWriter(printerFactory.createPrinter());
+ }
+
+ @Override
+ public char getFileSeparator() {
+ return File.separatorChar;
+ }
+
+ @Override
+ public void validate() {
+ // NoOp
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/compressor/GzipExternalFileCompressStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/compressor/GzipExternalFileCompressStreamFactory.java
new file mode 100644
index 0000000..5ef196d
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/compressor/GzipExternalFileCompressStreamFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer.compressor;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class GzipExternalFileCompressStreamFactory implements IExternalFileCompressStreamFactory {
+ private static final long serialVersionUID = -7364595253362922025L;
+ public static IExternalFileCompressStreamFactory INSTANCE = new GzipExternalFileCompressStreamFactory();
+
+ private GzipExternalFileCompressStreamFactory() {
+ }
+
+ @Override
+ public OutputStream createStream(OutputStream outputStream) throws HyracksDataException {
+ try {
+ return new GzipCompressorOutputStream(outputStream);
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/compressor/IExternalFileCompressStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/compressor/IExternalFileCompressStreamFactory.java
new file mode 100644
index 0000000..e46e5c0
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/compressor/IExternalFileCompressStreamFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer.compressor;
+
+import java.io.OutputStream;
+import java.io.Serializable;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * Creates a compression {@link OutputStream}
+ */
+@FunctionalInterface
+public interface IExternalFileCompressStreamFactory extends Serializable {
+
+ /**
+ * Create a compressed stream before writing to the provided stream
+ *
+ * @param outputStream destination output stream
+ * @return compressing stream
+ */
+ OutputStream createStream(OutputStream outputStream) throws HyracksDataException;
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/compressor/NoOpExternalFileCompressStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/compressor/NoOpExternalFileCompressStreamFactory.java
new file mode 100644
index 0000000..8876e0f
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/compressor/NoOpExternalFileCompressStreamFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer.compressor;
+
+import java.io.OutputStream;
+
+public class NoOpExternalFileCompressStreamFactory implements IExternalFileCompressStreamFactory {
+ private static final long serialVersionUID = -2211142209501287615L;
+ public static final IExternalFileCompressStreamFactory INSTANCE = new NoOpExternalFileCompressStreamFactory();
+
+ private NoOpExternalFileCompressStreamFactory() {
+ }
+
+ @Override
+ public OutputStream createStream(OutputStream outputStream) {
+ return outputStream;
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinter.java
new file mode 100644
index 0000000..bff0db8
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinter.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer.printer;
+
+import java.io.OutputStream;
+import java.io.PrintStream;
+
+import org.apache.asterix.external.writer.compressor.IExternalFileCompressStreamFactory;
+import org.apache.asterix.runtime.writer.IExternalFilePrinter;
+import org.apache.hyracks.algebricks.data.IPrinter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+final class TextualExternalFilePrinter implements IExternalFilePrinter {
+ private final IPrinter printer;
+ private final IExternalFileCompressStreamFactory compressStreamFactory;
+ private PrintStream printStream;
+
+ TextualExternalFilePrinter(IPrinter printer, IExternalFileCompressStreamFactory compressStreamFactory) {
+ this.printer = printer;
+ this.compressStreamFactory = compressStreamFactory;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ printer.init();
+ }
+
+ @Override
+ public void newStream(OutputStream outputStream) throws HyracksDataException {
+ if (printStream != null) {
+ printStream.close();
+ }
+ printStream = new PrintStream(compressStreamFactory.createStream(outputStream));
+ }
+
+ @Override
+ public void print(IValueReference value) throws HyracksDataException {
+ printer.print(value.getByteArray(), value.getStartOffset(), value.getLength(), printStream);
+ printStream.println();
+ }
+
+ @Override
+ public void close() {
+ printStream.close();
+ if (printStream.checkError()) {
+ throw new IllegalStateException("Print error. Check the logs for more information");
+ }
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinterFactory.java
new file mode 100644
index 0000000..6778532
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinterFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer.printer;
+
+import org.apache.asterix.external.writer.compressor.IExternalFileCompressStreamFactory;
+import org.apache.asterix.runtime.writer.IExternalFilePrinter;
+import org.apache.asterix.runtime.writer.IExternalFilePrinterFactory;
+import org.apache.hyracks.algebricks.data.IPrinterFactory;
+
+public class TextualExternalFilePrinterFactory implements IExternalFilePrinterFactory {
+ private static final long serialVersionUID = 9155959967258587588L;
+ private final IPrinterFactory printerFactory;
+ private final IExternalFileCompressStreamFactory compressStreamFactory;
+
+ public TextualExternalFilePrinterFactory(IPrinterFactory printerFactory,
+ IExternalFileCompressStreamFactory compressStreamFactory) {
+ this.printerFactory = printerFactory;
+ this.compressStreamFactory = compressStreamFactory;
+ }
+
+ @Override
+ public IExternalFilePrinter createPrinter() {
+ return new TextualExternalFilePrinter(printerFactory.createPrinter(), compressStreamFactory);
+ }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/PointableHelper.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/PointableHelper.java
index 7a017bd..deb0da9 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/PointableHelper.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/PointableHelper.java
@@ -62,6 +62,7 @@
private final UTF8StringWriter utf8Writer;
public static final IPointable NULL_REF = new VoidPointable();
+
static {
NULL_REF.set(NULL_BYTES, 0, NULL_BYTES.length);
}
@@ -139,12 +140,9 @@
}
/**
- * @param str
- * The input string
- * @param vs
- * The storage buffer
- * @param writeTag
- * Specifying whether a tag for the string should also be written
+ * @param str The input string
+ * @param vs The storage buffer
+ * @param writeTag Specifying whether a tag for the string should also be written
*/
public void serializeString(String str, IMutableValueStorage vs, boolean writeTag) throws HyracksDataException {
vs.reset();
@@ -219,22 +217,21 @@
* This method takes multiple pointables, the first pointable being the pointable to write the result to, and
* checks their ATypeTag value. If a missing or null ATypeTag is encountered, the method will set the result
* pointable to missing or null accordingly, and will return {@code true}.
- *
+ * <p>
* As the missing encounter has a higher priority than the null, the method will keep checking if any missing has
* been encountered first, if not, it will do a null check at the end.
- *
+ * <p>
* If the listAccessor is passed, this method will also go through any list pointable elements and search for
* a missing value to give it a higher priority over null values. If {@code null} is passed for the listAccessor,
* the list element check will be skipped.
*
- * @param result the result pointable that will hold the data
+ * @param result the result pointable that will hold the data
* @param listAccessor list accessor to use for check list elements.
- * @param pointable1 the first pointable to be checked
- * @param pointable2 the second pointable to be checked
- * @param pointable3 the third pointable to be checked
- * @param pointable4 the fourth pointable to be checked
- * @param pointable5 the fourth pointable to be checked
- *
+ * @param pointable1 the first pointable to be checked
+ * @param pointable2 the second pointable to be checked
+ * @param pointable3 the third pointable to be checked
+ * @param pointable4 the fourth pointable to be checked
+ * @param pointable5 the fourth pointable to be checked
* @return {@code true} if the pointable value is missing or null, {@code false} otherwise.
*/
public static boolean checkAndSetMissingOrNull(IPointable result, ListAccessor listAccessor, IPointable pointable1,
@@ -311,9 +308,8 @@
* Checks whether the pointable {@param pointable1} is null or missing, and if true, assigns null to the
* {@param result}.
*
- * @param result the result pointable that will hold the null value
+ * @param result the result pointable that will hold the null value
* @param pointable1 the pointable to be checked
- *
* @return {@code true} if the {@param pointable1} value is missing or null, {@code false} otherwise.
*/
public static boolean checkAndSetNull(IPointable result, IPointable pointable1) throws HyracksDataException {
@@ -331,10 +327,9 @@
* Checks whether any pointable argument is null or missing, and if true, assigns null to the
* {@param result}.
*
- * @param result the result pointable that will hold the null value
+ * @param result the result pointable that will hold the null value
* @param pointable1 the pointable to be checked
* @param pointable2 the pointable to be checked
- *
* @return {@code true} if any pointable is missing or null, {@code false} otherwise.
*/
public static boolean checkAndSetNull(IPointable result, IPointable pointable1, IPointable pointable2)
@@ -344,14 +339,13 @@
/**
* This method checks and returns the pointable value state.
- *
+ * <p>
* If a ListAccessor is passed to this function, it will check if the passed pointable is a list, and if so, it
* will search for a missing value inside the list before checking for null values. If the listAccessor value is
* null, no list elements check will be performed.
*
- * @param pointable the pointable to be checked
+ * @param pointable the pointable to be checked
* @param listAccessor list accessor used to check the list elements.
- *
* @return the pointable value state for the passed pointable
*/
private static PointableValueState getPointableValueState(IPointable pointable, ListAccessor listAccessor)
@@ -400,10 +394,9 @@
* Check if the provided bytes are of valid long type. In case floats and doubles are accepted, the accepted
* values will be 1.0 and 2.0, but not 2.5. (only zero decimals)
*
- * @param bytes data bytes
- * @param startOffset start offset
+ * @param bytes data bytes
+ * @param startOffset start offset
* @param acceptFloatAndDouble flag to accept float and double values or not
- *
* @return true if provided value is a valid long, false otherwise
*/
public static boolean isValidLongValue(byte[] bytes, int startOffset, boolean acceptFloatAndDouble) {
@@ -443,4 +436,9 @@
return true;
}
+
+ public static boolean isNullOrMissing(IValueReference value) {
+ byte typeTag = value.getByteArray()[0];
+ return ATypeTag.MISSING.serialize() == typeTag || ATypeTag.NULL.serialize() == typeTag;
+ }
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/AbstractPathResolver.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/AbstractPathResolver.java
new file mode 100644
index 0000000..88153de
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/AbstractPathResolver.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.writer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+abstract class AbstractPathResolver implements IPathResolver {
+ // TODO is 4-digits enough?
+ // TODO do we need jobId?
+ //4-digit format for the partition number, jobId, and file counter
+ private static final int NUMBER_OF_DIGITS = 4;
+ private static final String FILE_FORMAT = "%0" + NUMBER_OF_DIGITS + "d";
+
+ private final String fileExtension;
+ private final char fileSeparator;
+ private final int partition;
+ private final long jobId;
+ private final StringBuilder pathStringBuilder;
+ private int fileCounter;
+
+ AbstractPathResolver(String fileExtension, char fileSeparator, int partition, long jobId) {
+ this.fileExtension = fileExtension;
+ this.fileSeparator = fileSeparator;
+ this.partition = partition;
+ this.jobId = jobId;
+ pathStringBuilder = new StringBuilder();
+ fileCounter = 0;
+ }
+
+ @Override
+ public final String getPartitionPath(IFrameTupleReference tuple) throws HyracksDataException {
+ fileCounter = 0;
+ pathStringBuilder.setLength(0);
+ appendPrefix(pathStringBuilder, tuple);
+ if (pathStringBuilder.charAt(pathStringBuilder.length() - 1) != fileSeparator) {
+ pathStringBuilder.append(fileSeparator);
+ }
+ pathStringBuilder.append(String.format(FILE_FORMAT, partition));
+ pathStringBuilder.append('-');
+ pathStringBuilder.append(String.format(FILE_FORMAT, jobId));
+ pathStringBuilder.append('-');
+ pathStringBuilder.append(String.format(FILE_FORMAT, fileCounter++));
+ if (fileExtension != null && !fileExtension.isEmpty()) {
+ pathStringBuilder.append('.');
+ pathStringBuilder.append(fileExtension);
+ }
+ return pathStringBuilder.toString();
+ }
+
+ @Override
+ public final String getNextPath() {
+ int numOfCharToRemove = NUMBER_OF_DIGITS;
+ if (fileExtension != null && !fileExtension.isEmpty()) {
+ numOfCharToRemove += 1 + fileExtension.length();
+ }
+ pathStringBuilder.setLength(pathStringBuilder.length() - numOfCharToRemove);
+ pathStringBuilder.append(String.format(FILE_FORMAT, fileCounter++));
+ if (fileExtension != null && !fileExtension.isEmpty()) {
+ pathStringBuilder.append('.');
+ pathStringBuilder.append(fileExtension);
+ }
+ return pathStringBuilder.toString();
+ }
+
+ abstract void appendPrefix(StringBuilder pathStringBuilder, IFrameTupleReference tuple) throws HyracksDataException;
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/DynamicPathResolver.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/DynamicPathResolver.java
new file mode 100644
index 0000000..dbd97e4
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/DynamicPathResolver.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.writer;
+
+import java.io.UTFDataFormatException;
+
+import org.apache.asterix.runtime.evaluators.functions.PointableHelper;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import org.apache.hyracks.util.string.UTF8CharBuffer;
+import org.apache.hyracks.util.string.UTF8StringUtil;
+
+final class DynamicPathResolver extends AbstractPathResolver {
+ private final IScalarEvaluator pathEval;
+ private final String inappropriatePartitionPath;
+ private final VoidPointable pathResult;
+ private final UTF8CharBuffer charBuffer;
+
+ DynamicPathResolver(String fileExtension, char fileSeparator, int partition, long jobId, IScalarEvaluator pathEval,
+ String inappropriatePartitionPath, IWarningCollector warningCollector) {
+ super(fileExtension, fileSeparator, partition, jobId);
+ this.pathEval = pathEval;
+ this.inappropriatePartitionPath = inappropriatePartitionPath;
+ pathResult = new VoidPointable();
+ charBuffer = new UTF8CharBuffer();
+ }
+
+ @Override
+ void appendPrefix(StringBuilder pathStringBuilder, IFrameTupleReference tuple) throws HyracksDataException {
+ pathEval.evaluate(tuple, pathResult);
+ if (PointableHelper.isNullOrMissing(pathResult)) {
+ // TODO warn
+ pathStringBuilder.append(inappropriatePartitionPath);
+ return;
+ }
+
+ try {
+ UTF8StringUtil.readUTF8(pathResult.getByteArray(), pathResult.getStartOffset() + 1, charBuffer);
+ } catch (UTFDataFormatException e) {
+ throw HyracksDataException.create(e);
+ }
+ pathStringBuilder.append(charBuffer.getBuffer(), 0, charBuffer.getFilledLength());
+ }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriter.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriter.java
new file mode 100644
index 0000000..a4af805
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriter.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.writer;
+
+import org.apache.hyracks.algebricks.runtime.writers.IExternalWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+final class ExternalWriter implements IExternalWriter {
+ private final IPathResolver pathResolver;
+ private final IExternalFileWriter writer;
+ private final int maxResultPerFile;
+ private int tupleCounter;
+
+ public ExternalWriter(IPathResolver pathResolver, IExternalFileWriter writer, int maxResultPerFile) {
+ this.pathResolver = pathResolver;
+ this.writer = writer;
+ this.maxResultPerFile = maxResultPerFile;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ writer.open();
+ }
+
+ @Override
+ public void initNewPartition(IFrameTupleReference tuple) throws HyracksDataException {
+ tupleCounter = 0;
+ writer.newFile(pathResolver.getPartitionPath(tuple));
+ }
+
+ @Override
+ public void write(IValueReference value) throws HyracksDataException {
+ writer.write(value);
+ tupleCounter++;
+ if (tupleCounter >= maxResultPerFile) {
+ tupleCounter = 0;
+ writer.newFile(pathResolver.getNextPath());
+ }
+ }
+
+ @Override
+ public void abort() throws HyracksDataException {
+ writer.abort();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ writer.close();
+ }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriterFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriterFactory.java
new file mode 100644
index 0000000..350c5d6
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriterFactory.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.writer;
+
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.evaluators.EvaluatorContext;
+import org.apache.hyracks.algebricks.runtime.writers.IExternalWriter;
+import org.apache.hyracks.algebricks.runtime.writers.IExternalWriterFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+
+public class ExternalWriterFactory implements IExternalWriterFactory {
+ private static final long serialVersionUID = 1412969574113419638L;
+ private final IExternalFileWriterFactory writerFactory;
+ private final IExternalFilePrinterFactory printerFactory;
+ private final String fileExtension;
+ private final int maxResult;
+ private final IScalarEvaluatorFactory pathEvalFactory;
+ private final String inappropriatePartitionPath;
+ private final String staticPath;
+
+ public ExternalWriterFactory(IExternalFileWriterFactory writerFactory, IExternalFilePrinterFactory printerFactory,
+ String fileExtension, int maxResult, IScalarEvaluatorFactory pathEvalFactory,
+ String inappropriatePartitionPath, String staticPath) {
+ this.writerFactory = writerFactory;
+ this.printerFactory = printerFactory;
+ this.fileExtension = fileExtension;
+ this.maxResult = maxResult;
+ this.pathEvalFactory = pathEvalFactory;
+ this.inappropriatePartitionPath = inappropriatePartitionPath;
+ this.staticPath = staticPath;
+ }
+
+ @Override
+ public IExternalWriter createWriter(IHyracksTaskContext context) throws HyracksDataException {
+ int partition = context.getTaskAttemptId().getTaskId().getPartition();
+ long jobId = context.getJobletContext().getJobId().getId();
+ char fileSeparator = writerFactory.getFileSeparator();
+ IPathResolver resolver;
+ if (staticPath == null) {
+ EvaluatorContext evaluatorContext = new EvaluatorContext(context);
+ IScalarEvaluator pathEval = pathEvalFactory.createScalarEvaluator(evaluatorContext);
+ IWarningCollector warningCollector = context.getWarningCollector();
+ resolver = new DynamicPathResolver(fileExtension, fileSeparator, partition, jobId, pathEval,
+ inappropriatePartitionPath, warningCollector);
+ } else {
+ resolver = new StaticPathResolver(fileExtension, fileSeparator, partition, jobId, staticPath);
+ }
+ IExternalFileWriter writer = writerFactory.createWriter(context, printerFactory);
+ return new ExternalWriter(resolver, writer, maxResult);
+ }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileFilterWriterFactoryProvider.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileFilterWriterFactoryProvider.java
new file mode 100644
index 0000000..2de5e11
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileFilterWriterFactoryProvider.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.writer;
+
+import java.util.Map;
+
+@FunctionalInterface
+public interface IExternalFileFilterWriterFactoryProvider {
+ IExternalFileWriterFactory create(Map<String, String> configuration);
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinter.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinter.java
new file mode 100644
index 0000000..fb89f4a
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.writer;
+
+import java.io.OutputStream;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+/**
+ * An {@link IExternalFileWriter} printer
+ */
+public interface IExternalFilePrinter {
+
+ /**
+ * Open the printer
+ */
+ void open() throws HyracksDataException;
+
+ /**
+ * Initialize the printer with a new stream
+ *
+ * @param outputStream to print to
+ */
+ void newStream(OutputStream outputStream) throws HyracksDataException;
+
+ /**
+ * Print the provided value
+ *
+ * @param value to print
+ */
+ void print(IValueReference value) throws HyracksDataException;
+
+ /**
+ * Flush and close the printer
+ */
+ void close();
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinterFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinterFactory.java
new file mode 100644
index 0000000..a4fa97b
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinterFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.writer;
+
+import java.io.Serializable;
+
+/**
+ * {@link IExternalFileWriter} printer factory
+ */
+public interface IExternalFilePrinterFactory extends Serializable {
+ /**
+ * @return a new external file printer
+ */
+ IExternalFilePrinter createPrinter();
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriter.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriter.java
new file mode 100644
index 0000000..c07e8cb
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriter.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.writer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+/**
+ * A file writer
+ */
+public interface IExternalFileWriter {
+
+ /**
+ * Open the writer
+ */
+ void open() throws HyracksDataException;
+
+ /**
+ * Initialize the writer to write to a new path
+ *
+ * @param path of the file to writer (including the file name)
+ */
+ void newFile(String path) throws HyracksDataException;
+
+ /**
+ * Writer the provided value
+ *
+ * @param value to write
+ */
+ void write(IValueReference value) throws HyracksDataException;
+
+ /**
+ * Run the abort sequence in case of a failure
+ */
+ void abort() throws HyracksDataException;
+
+ /**
+ * Flush the final result and close the writer
+ */
+ void close() throws HyracksDataException;
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriterFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriterFactory.java
new file mode 100644
index 0000000..2ab6dac
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriterFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.writer;
+
+import java.io.Serializable;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * An interface for writing to a storage device
+ * Implementer should also provide a singleton to {@link IExternalFileFilterWriterFactoryProvider}
+ */
+public interface IExternalFileWriterFactory extends Serializable {
+ /**
+ * Create a writer
+ *
+ * @param context task context
+ * @param printerFactory printer factory for writing the final result
+ * @return a new file writer
+ */
+ IExternalFileWriter createWriter(IHyracksTaskContext context, IExternalFilePrinterFactory printerFactory)
+ throws HyracksDataException;
+
+ /**
+ * @return file (or path) separator
+ */
+ char getFileSeparator();
+
+ /**
+ * Validate the writer by running a test write routine to ensure the writer has the appropriate permissions
+ */
+ void validate() throws AlgebricksException;
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IPathResolver.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IPathResolver.java
new file mode 100644
index 0000000..2419210
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IPathResolver.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.writer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+/**
+ * Path resolver which generates paths for the written files
+ */
+interface IPathResolver {
+
+ /**
+ * Extract the partitioning values from the provided tuple and generates the file path
+ *
+ * @param tuple contains the partitioning values
+ * @return the final path which includes the partitioning values
+ */
+ String getPartitionPath(IFrameTupleReference tuple) throws HyracksDataException;
+
+ /**
+ * @return the path of the next file to be written for the same partition
+ */
+ String getNextPath();
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/StaticPathResolver.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/StaticPathResolver.java
new file mode 100644
index 0000000..34c6575
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/StaticPathResolver.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.writer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+final class StaticPathResolver extends AbstractPathResolver {
+ private final String prefix;
+
+ StaticPathResolver(String fileExtension, char fileSeparator, int partition, long jobId, String prefix) {
+ super(fileExtension, fileSeparator, partition, jobId);
+ this.prefix = prefix;
+ }
+
+ @Override
+ void appendPrefix(StringBuilder pathStringBuilder, IFrameTupleReference tuple) throws HyracksDataException {
+ pathStringBuilder.append(prefix);
+ }
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntime.java
new file mode 100644
index 0000000..e96531f
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntime.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.runtime.operators.writer;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputSinkPushRuntime;
+import org.apache.hyracks.algebricks.runtime.writers.IExternalWriter;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import org.apache.hyracks.dataflow.common.data.accessors.PermutingFrameTupleReference;
+import org.apache.hyracks.dataflow.common.data.accessors.PointableTupleReference;
+import org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupWriter;
+
+final class SinkExternalWriterRuntime extends AbstractOneInputSinkPushRuntime {
+ private final int sourceColumn;
+ private final int[] partitionColumns;
+ private final IPointable sourceValue;
+ private final PointableTupleReference partitionColumnsPrevCopy;
+ private final PermutingFrameTupleReference partitionColumnsRef;
+ private final IBinaryComparator[] partitionComparators;
+ private final IExternalWriter writer;
+ private FrameTupleAccessor tupleAccessor;
+ private FrameTupleReference tupleRef;
+ private boolean first;
+
+ SinkExternalWriterRuntime(int sourceColumn, int[] partitionColumns, IBinaryComparator[] partitionComparators,
+ RecordDescriptor inputRecordDesc, IExternalWriter writer) {
+ this.sourceColumn = sourceColumn;
+ this.partitionColumns = partitionColumns;
+ this.sourceValue = new VoidPointable();
+ partitionColumnsRef = new PermutingFrameTupleReference(partitionColumns);
+ partitionColumnsPrevCopy =
+ PointableTupleReference.create(partitionColumns.length, ArrayBackedValueStorage::new);
+ this.partitionComparators = partitionComparators;
+ this.inputRecordDesc = inputRecordDesc;
+ this.writer = writer;
+ first = true;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ if (tupleAccessor == null) {
+ writer.open();
+ tupleAccessor = new FrameTupleAccessor(inputRecordDesc);
+ tupleRef = new FrameTupleReference();
+ }
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ tupleAccessor.reset(buffer);
+ for (int i = 0; i < tupleAccessor.getTupleCount(); i++) {
+ tupleRef.reset(tupleAccessor, i);
+ if (isNewPartition(i)) {
+ writer.initNewPartition(tupleRef);
+ }
+ setValue(tupleRef, sourceColumn, sourceValue);
+ writer.write(sourceValue);
+ partitionColumnsRef.reset(tupleAccessor, i);
+ partitionColumnsPrevCopy.set(partitionColumnsRef);
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ writer.abort();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ writer.close();
+ }
+
+ private boolean isNewPartition(int index) throws HyracksDataException {
+ if (first) {
+ first = false;
+ return true;
+ }
+
+ return !PreclusteredGroupWriter.sameGroup(partitionColumnsPrevCopy, tupleAccessor, index, partitionColumns,
+ partitionComparators);
+ }
+
+ private void setValue(IFrameTupleReference tuple, int column, IPointable value) {
+ byte[] data = tuple.getFieldData(column);
+ int start = tuple.getFieldStart(column);
+ int length = tuple.getFieldLength(column);
+ value.set(data, start, length);
+ }
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntimeFactory.java
new file mode 100644
index 0000000..6220dec
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntimeFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.runtime.operators.writer;
+
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
+import org.apache.hyracks.algebricks.runtime.operators.base.AbstractPushRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.writers.IExternalWriter;
+import org.apache.hyracks.algebricks.runtime.writers.IExternalWriterFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public final class SinkExternalWriterRuntimeFactory extends AbstractPushRuntimeFactory {
+ private static final long serialVersionUID = -2215789207336628581L;
+ private final int sourceColumn;
+ private final int[] partitionColumn;
+ private final IBinaryComparatorFactory[] partitionComparatorFactories;
+ private final RecordDescriptor inputRecordDescriptor;
+ private final IExternalWriterFactory writerFactory;
+
+ public SinkExternalWriterRuntimeFactory(int sourceColumn, int[] partitionColumn,
+ IBinaryComparatorFactory[] partitionComparatorFactories, RecordDescriptor inputRecordDescriptor,
+ IExternalWriterFactory writerFactory) {
+ this.sourceColumn = sourceColumn;
+ this.partitionColumn = partitionColumn;
+ this.partitionComparatorFactories = partitionComparatorFactories;
+ this.inputRecordDescriptor = inputRecordDescriptor;
+ this.writerFactory = writerFactory;
+ }
+
+ @Override
+ public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException {
+ IExternalWriter writer = writerFactory.createWriter(ctx);
+ IBinaryComparator[] partitionComparators = new IBinaryComparator[partitionComparatorFactories.length];
+ for (int i = 0; i < partitionComparatorFactories.length; i++) {
+ partitionComparators[i] = partitionComparatorFactories[i].createBinaryComparator();
+ }
+ SinkExternalWriterRuntime runtime = new SinkExternalWriterRuntime(sourceColumn, partitionColumn,
+ partitionComparators, inputRecordDescriptor, writer);
+ return new IPushRuntime[] { runtime };
+ }
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IExternalWriter.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IExternalWriter.java
new file mode 100644
index 0000000..81ed880
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IExternalWriter.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.runtime.writers;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+/**
+ * An external writer of a query (or dataset) result
+ */
+public interface IExternalWriter {
+ /**
+ * Open the writer
+ */
+ void open() throws HyracksDataException;
+
+ /**
+ * Initialize the writer for a new partition
+ *
+ * @param tuple which contains the partitioning columns
+ */
+ void initNewPartition(IFrameTupleReference tuple) throws HyracksDataException;
+
+ /**
+ * Write the provided value
+ *
+ * @param value to be written
+ */
+ void write(IValueReference value) throws HyracksDataException;
+
+ /**
+ * Run the abort sequence in case of a failure
+ */
+ void abort() throws HyracksDataException;
+
+ /**
+ * Flush the final result and close the writer
+ */
+ void close() throws HyracksDataException;
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IExternalWriterFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IExternalWriterFactory.java
new file mode 100644
index 0000000..e6899a6
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IExternalWriterFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.runtime.writers;
+
+import java.io.Serializable;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * A writer factory which creates a writer for the result of a query (or dataset) to a storage device
+ */
+@FunctionalInterface
+public interface IExternalWriterFactory extends Serializable {
+ /**
+ * Crete a new writer
+ *
+ * @param context task context
+ * @return new writer
+ */
+ IExternalWriter createWriter(IHyracksTaskContext context) throws HyracksDataException;
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8CharBuffer.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8CharBuffer.java
new file mode 100644
index 0000000..30ebf69
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8CharBuffer.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.util.string;
+
+public class UTF8CharBuffer {
+ private char[] buffer;
+ private int filledLength;
+
+ public char[] getBuffer() {
+ return buffer;
+ }
+
+ public int getFilledLength() {
+ return filledLength;
+ }
+
+ char[] getBuffer(int requiredLength) {
+ if (buffer == null || buffer.length < requiredLength) {
+ buffer = new char[requiredLength];
+ }
+
+ return buffer;
+ }
+
+ void setFilledLength(int filledLength) {
+ this.filledLength = filledLength;
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8StringUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8StringUtil.java
index cde79cb..4d3aced 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8StringUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8StringUtil.java
@@ -26,6 +26,7 @@
import java.io.OutputStream;
import java.io.UTFDataFormatException;
import java.lang.ref.SoftReference;
+import java.util.Objects;
import org.apache.hyracks.util.encoding.VarLenIntEncoderDecoder;
@@ -535,14 +536,29 @@
chararr = reader.chararr;
}
+ in.readFully(bytearr, 0, utflen);
+
+ int chararr_count = readUTF8(bytearr, 0, utflen, chararr);
+
+ // The number of chars produced may be less than utflen
+ return new String(chararr, 0, chararr_count);
+ }
+
+ public static void readUTF8(byte[] bytearr, int start, UTF8CharBuffer buffer) throws UTFDataFormatException {
+ Objects.requireNonNull(buffer);
+ int utflen = VarLenIntEncoderDecoder.decode(bytearr, start);
+ int lengthIntSize = VarLenIntEncoderDecoder.getBytesRequired(utflen);
+ char[] chararr = buffer.getBuffer(utflen);
+ buffer.setFilledLength(readUTF8(bytearr, start + lengthIntSize, utflen, chararr));
+ }
+
+ private static int readUTF8(byte[] bytearr, int start, int utflen, char[] chararr) throws UTFDataFormatException {
int c, char2, char3;
int count = 0;
int chararr_count = 0;
- in.readFully(bytearr, 0, utflen);
-
while (count < utflen) {
- c = bytearr[count] & 0xff;
+ c = bytearr[start + count] & 0xff;
if (c > 127) {
break;
}
@@ -596,8 +612,7 @@
throw new UTFDataFormatException("malformed input around byte " + count);
}
}
- // The number of chars produced may be less than utflen
- return new String(chararr, 0, chararr_count);
+ return chararr_count;
}
/**