[ASTERIXDB-3288][RT] Introduce COPY TO runtime

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
Introduce the necessary writers for COPY TO

Change-Id: Ie107e707189bdb7fc17b09439a08d47192cfa61f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17878
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-cloud/pom.xml b/asterixdb/asterix-cloud/pom.xml
index 9e840c2..79ee4c1 100644
--- a/asterixdb/asterix-cloud/pom.xml
+++ b/asterixdb/asterix-cloud/pom.xml
@@ -70,6 +70,11 @@
       <artifactId>asterix-common</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.asterix</groupId>
+      <artifactId>asterix-external-data</artifactId>
+      <version>${project.version}</version>
+    </dependency>
     <!-- aws s3 start -->
     <dependency>
       <groupId>software.amazon.awssdk</groupId>
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
index 8c7cd8a..dc8bc68 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
@@ -59,7 +59,7 @@
     //TODO(DB): change
     private final String metadataNamespacePath;
     protected final ICloudClient cloudClient;
-    protected final WriteBufferProvider writeBufferProvider;
+    protected final IWriteBufferProvider writeBufferProvider;
     protected final String bucket;
     protected final Set<Integer> partitions;
     protected final List<FileReference> partitionPaths;
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudFileHandle.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudFileHandle.java
index 8572014..14c44ad 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudFileHandle.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudFileHandle.java
@@ -30,7 +30,7 @@
     private final CloudResettableInputStream inputStream;
 
     public CloudFileHandle(ICloudClient cloudClient, String bucket, FileReference fileRef,
-            WriteBufferProvider bufferProvider) {
+            IWriteBufferProvider bufferProvider) {
         super(fileRef);
         ICloudBufferedWriter bufferedWriter = cloudClient.createBufferedWriter(bucket, fileRef.getRelativePath());
         inputStream = new CloudResettableInputStream(bufferedWriter, bufferProvider);
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudOutputStream.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudOutputStream.java
new file mode 100644
index 0000000..18a97ad
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudOutputStream.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class CloudOutputStream extends OutputStream {
+    private final CloudResettableInputStream inputStream;
+
+    public CloudOutputStream(CloudResettableInputStream inputStream) {
+        this.inputStream = inputStream;
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        inputStream.write(b, off, len);
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+        inputStream.write(b);
+    }
+
+    @Override
+    public void close() throws IOException {
+        inputStream.finish();
+    }
+
+    public void abort() throws IOException {
+        inputStream.abort();
+    }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
index 7ba95c5..f198001 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
@@ -24,16 +24,19 @@
 
 import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 public class CloudResettableInputStream extends InputStream {
+    private static final Logger LOGGER = LogManager.getLogger();
     // TODO: make configurable
     public static final int MIN_BUFFER_SIZE = 5 * 1024 * 1024;
-    private final WriteBufferProvider bufferProvider;
+    private final IWriteBufferProvider bufferProvider;
     private ByteBuffer writeBuffer;
 
     private final ICloudBufferedWriter bufferedWriter;
 
-    public CloudResettableInputStream(ICloudBufferedWriter bufferedWriter, WriteBufferProvider bufferProvider) {
+    public CloudResettableInputStream(ICloudBufferedWriter bufferedWriter, IWriteBufferProvider bufferProvider) {
         this.bufferedWriter = bufferedWriter;
         this.bufferProvider = bufferProvider;
     }
@@ -61,16 +64,24 @@
     }
 
     public void write(ByteBuffer header, ByteBuffer page) throws HyracksDataException {
-        open();
         write(header);
         write(page);
     }
 
     public int write(ByteBuffer page) throws HyracksDataException {
         open();
+        return write(page.array(), 0, page.limit());
+    }
 
-        // amount to write
-        int size = page.limit();
+    public void write(int b) throws HyracksDataException {
+        if (writeBuffer.remaining() == 0) {
+            uploadAndWait();
+        }
+        writeBuffer.put((byte) b);
+    }
+
+    public int write(byte[] b, int off, int len) throws HyracksDataException {
+        open();
 
         // full buffer = upload -> write all
         if (writeBuffer.remaining() == 0) {
@@ -78,23 +89,23 @@
         }
 
         // write partial -> upload -> write -> upload -> ...
-        int offset = 0;
-        int pageRemaining = size;
+        int offset = off;
+        int pageRemaining = len;
         while (pageRemaining > 0) {
             // enough to write all
             if (writeBuffer.remaining() > pageRemaining) {
-                writeBuffer.put(page.array(), offset, pageRemaining);
-                return size;
+                writeBuffer.put(b, offset, pageRemaining);
+                return len;
             }
 
             int remaining = writeBuffer.remaining();
-            writeBuffer.put(page.array(), offset, remaining);
+            writeBuffer.put(b, offset, remaining);
             pageRemaining -= remaining;
             offset += remaining;
             uploadAndWait();
         }
 
-        return size;
+        return len;
     }
 
     public void finish() throws HyracksDataException {
@@ -128,6 +139,7 @@
         try {
             bufferedWriter.upload(this, writeBuffer.limit());
         } catch (Exception e) {
+            LOGGER.fatal(e);
             throw HyracksDataException.create(e);
         }
 
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/IWriteBufferProvider.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/IWriteBufferProvider.java
new file mode 100644
index 0000000..693b73a
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/IWriteBufferProvider.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud;
+
+import java.nio.ByteBuffer;
+
+public interface IWriteBufferProvider {
+    ByteBuffer getBuffer();
+
+    void recycle(ByteBuffer buffer);
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java
index 5e49be3..ee17400 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java
@@ -24,17 +24,22 @@
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 
-public class WriteBufferProvider {
+import org.apache.hyracks.util.annotations.ThreadSafe;
+
+@ThreadSafe
+public class WriteBufferProvider implements IWriteBufferProvider {
     private final BlockingQueue<ByteBuffer> writeBuffers;
 
     public WriteBufferProvider(int ioParallelism) {
         writeBuffers = new ArrayBlockingQueue<>(ioParallelism);
     }
 
+    @Override
     public void recycle(ByteBuffer buffer) {
         writeBuffers.offer(buffer);
     }
 
+    @Override
     public ByteBuffer getBuffer() {
         ByteBuffer writeBuffer = writeBuffers.poll();
         if (writeBuffer == null) {
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriterSingleBufferProvider.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriterSingleBufferProvider.java
new file mode 100644
index 0000000..287900d
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriterSingleBufferProvider.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud;
+
+import static org.apache.asterix.cloud.CloudResettableInputStream.MIN_BUFFER_SIZE;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.util.annotations.NotThreadSafe;
+
+@NotThreadSafe
+public class WriterSingleBufferProvider implements IWriteBufferProvider {
+
+    private final ByteBuffer buffer;
+
+    public WriterSingleBufferProvider() {
+        buffer = ByteBuffer.allocate(MIN_BUFFER_SIZE);
+    }
+
+    @Override
+    public ByteBuffer getBuffer() {
+        buffer.clear();
+        return buffer;
+    }
+
+    @Override
+    public void recycle(ByteBuffer buffer) {
+        // NoOp
+    }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
index 56ed3cf..a2aa21c 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
@@ -18,13 +18,18 @@
  */
 package org.apache.asterix.cloud.clients.aws.s3;
 
+import java.io.Serializable;
+import java.util.Map;
+
 import org.apache.asterix.common.config.CloudProperties;
+import org.apache.asterix.external.util.aws.s3.S3Constants;
 
 import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
 
-public class S3ClientConfig {
+public final class S3ClientConfig implements Serializable {
+    private static final long serialVersionUID = 548292720313565948L;
     // The maximum number of file that can be deleted (AWS restriction)
     static final int DELETE_BATCH_SIZE = 1000;
     private final String region;
@@ -48,6 +53,21 @@
                 cloudProperties.getProfilerLogInterval());
     }
 
+    public static S3ClientConfig of(Map<String, String> configuration) {
+        // Used to determine local vs. actual S3
+        String endPoint = configuration.get(S3Constants.SERVICE_END_POINT_FIELD_NAME);
+        // Disabled
+        long profilerLogInterval = 0;
+
+        // Dummy values;
+        String region = "";
+        String prefix = null;
+        boolean anonymousAuth = false;
+
+        return new S3ClientConfig(region, configuration.get(S3Constants.SERVICE_END_POINT_FIELD_NAME), "",
+                anonymousAuth, profilerLogInterval);
+    }
+
     public String getRegion() {
         return region;
     }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
index 02e1d4f..b47a6ff 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
@@ -19,8 +19,8 @@
 package org.apache.asterix.cloud.clients.aws.s3;
 
 import static org.apache.asterix.cloud.clients.aws.s3.S3ClientConfig.DELETE_BATCH_SIZE;
-import static org.apache.asterix.cloud.clients.aws.s3.S3Utils.encodeURI;
-import static org.apache.asterix.cloud.clients.aws.s3.S3Utils.listS3Objects;
+import static org.apache.asterix.cloud.clients.aws.s3.S3CloudClientUtils.encodeURI;
+import static org.apache.asterix.cloud.clients.aws.s3.S3CloudClientUtils.listS3Objects;
 
 import java.io.FilenameFilter;
 import java.io.IOException;
@@ -45,6 +45,7 @@
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.util.IoUtil;
 import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.util.annotations.ThreadSafe;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -67,21 +68,25 @@
 import software.amazon.awssdk.services.s3.model.PutObjectRequest;
 import software.amazon.awssdk.services.s3.model.S3Object;
 
+@ThreadSafe
 public class S3CloudClient implements ICloudClient {
     private final S3ClientConfig config;
     private final S3Client s3Client;
     private final IRequestProfiler profiler;
 
     public S3CloudClient(S3ClientConfig config) {
+        this(config, buildClient(config));
+    }
+
+    public S3CloudClient(S3ClientConfig config, S3Client s3Client) {
         this.config = config;
-        s3Client = buildClient();
+        this.s3Client = s3Client;
         long profilerInterval = config.getProfilerLogInterval();
         if (profilerInterval > 0) {
             profiler = new CountRequestProfiler(profilerInterval);
         } else {
             profiler = NoOpRequestProfiler.INSTANCE;
         }
-
     }
 
     @Override
@@ -245,7 +250,7 @@
         s3Client.close();
     }
 
-    private S3Client buildClient() {
+    private static S3Client buildClient(S3ClientConfig config) {
         S3ClientBuilder builder = S3Client.builder();
         builder.credentialsProvider(config.createCredentialsProvider());
         builder.region(Region.of(config.getRegion()));
@@ -264,7 +269,7 @@
     private Set<String> filterAndGet(List<S3Object> contents, FilenameFilter filter) {
         Set<String> files = new HashSet<>();
         for (S3Object s3Object : contents) {
-            String path = config.isLocalS3Provider() ? S3Utils.decodeURI(s3Object.key()) : s3Object.key();
+            String path = config.isLocalS3Provider() ? S3CloudClientUtils.decodeURI(s3Object.key()) : s3Object.key();
             if (filter.accept(null, IoUtil.getFileNameFromPath(path))) {
                 files.add(path);
             }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3Utils.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClientUtils.java
similarity index 95%
rename from asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3Utils.java
rename to asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClientUtils.java
index 8901dec..82b0ae1 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3Utils.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClientUtils.java
@@ -31,9 +31,9 @@
 import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
 import software.amazon.awssdk.services.s3.model.S3Object;
 
-public class S3Utils {
+public class S3CloudClientUtils {
 
-    private S3Utils() {
+    private S3CloudClientUtils() {
         throw new AssertionError("do not instantiate");
     }
 
@@ -78,7 +78,7 @@
         return URLDecoder.decode(path, Charset.defaultCharset());
     }
 
-    public static String toCloudPrefix(String path) {
+    private static String toCloudPrefix(String path) {
         return path.startsWith(File.separator) ? path.substring(1) : path;
     }
 }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/CloudExternalFileWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/CloudExternalFileWriter.java
new file mode 100644
index 0000000..22095ca
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/CloudExternalFileWriter.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.writer;
+
+import org.apache.asterix.cloud.CloudOutputStream;
+import org.apache.asterix.cloud.CloudResettableInputStream;
+import org.apache.asterix.cloud.IWriteBufferProvider;
+import org.apache.asterix.cloud.WriterSingleBufferProvider;
+import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
+import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.asterix.runtime.writer.IExternalFilePrinter;
+import org.apache.asterix.runtime.writer.IExternalFileWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+final class CloudExternalFileWriter implements IExternalFileWriter {
+    private final IExternalFilePrinter printer;
+    private final ICloudClient cloudClient;
+    private final String bucket;
+    private final IWriteBufferProvider bufferProvider;
+    private ICloudBufferedWriter bufferedWriter;
+
+    public CloudExternalFileWriter(IExternalFilePrinter printer, ICloudClient cloudClient, String bucket) {
+        this.printer = printer;
+        this.cloudClient = cloudClient;
+        this.bucket = bucket;
+        bufferProvider = new WriterSingleBufferProvider();
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        printer.open();
+    }
+
+    @Override
+    public void newFile(String path) throws HyracksDataException {
+        bufferedWriter = cloudClient.createBufferedWriter(bucket, path);
+        CloudResettableInputStream inputStream = new CloudResettableInputStream(bufferedWriter, bufferProvider);
+
+        CloudOutputStream outputStream = new CloudOutputStream(inputStream);
+        printer.newStream(outputStream);
+    }
+
+    @Override
+    public void write(IValueReference value) throws HyracksDataException {
+        printer.print(value);
+    }
+
+    @Override
+    public void abort() throws HyracksDataException {
+        bufferedWriter.abort();
+        printer.close();
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        printer.close();
+    }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
new file mode 100644
index 0000000..204d5da
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.writer;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.asterix.cloud.CloudResettableInputStream;
+import org.apache.asterix.cloud.WriterSingleBufferProvider;
+import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
+import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.asterix.cloud.clients.aws.s3.S3ClientConfig;
+import org.apache.asterix.cloud.clients.aws.s3.S3CloudClient;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.aws.s3.S3Utils;
+import org.apache.asterix.runtime.writer.IExternalFileFilterWriterFactoryProvider;
+import org.apache.asterix.runtime.writer.IExternalFilePrinterFactory;
+import org.apache.asterix.runtime.writer.IExternalFileWriter;
+import org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.ExceptionUtils;
+import org.apache.hyracks.data.std.primitive.LongPointable;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import software.amazon.awssdk.services.s3.model.NoSuchBucketException;
+
+public final class S3ExternalFileWriterFactory implements IExternalFileWriterFactory {
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final long serialVersionUID = 4551318140901866805L;
+    public static final IExternalFileFilterWriterFactoryProvider PROVIDER = S3ExternalFileWriterFactory::new;
+    private final S3ClientConfig config;
+    private final Map<String, String> configuration;
+    private transient S3CloudClient cloudClient;
+
+    private S3ExternalFileWriterFactory(Map<String, String> configuration) {
+        this.config = S3ClientConfig.of(configuration);
+        this.configuration = configuration;
+        cloudClient = null;
+    }
+
+    @Override
+    public IExternalFileWriter createWriter(IHyracksTaskContext context, IExternalFilePrinterFactory printerFactory)
+            throws HyracksDataException {
+        buildClient();
+        String bucket = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+        return new CloudExternalFileWriter(printerFactory.createPrinter(), cloudClient, bucket);
+    }
+
+    private void buildClient() throws HyracksDataException {
+        try {
+            synchronized (this) {
+                if (cloudClient == null) {
+                    // only a single client should be build
+                    cloudClient = new S3CloudClient(config, S3Utils.buildAwsS3Client(configuration));
+                }
+            }
+        } catch (CompilationException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    @Override
+    public char getFileSeparator() {
+        return '/';
+    }
+
+    @Override
+    public void validate() throws AlgebricksException {
+        ICloudClient testClient = new S3CloudClient(config, S3Utils.buildAwsS3Client(configuration));
+        String bucket = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+        if (bucket == null || bucket.isEmpty()) {
+            throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED,
+                    ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+        }
+        try {
+            doValidate(testClient, bucket);
+        } catch (IOException e) {
+            if (e.getCause() instanceof NoSuchBucketException) {
+                throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, bucket);
+            } else {
+                LOGGER.fatal(e);
+                throw CompilationException.create(ErrorCode.EXTERNAL_SOURCE_ERROR,
+                        ExceptionUtils.getMessageOrToString(e));
+            }
+        }
+    }
+
+    private static void doValidate(ICloudClient testClient, String bucket) throws IOException {
+        Random random = new Random();
+        String pathPrefix = "testFile";
+        String path = pathPrefix + random.nextInt();
+        while (testClient.exists(bucket, path)) {
+            path = pathPrefix + random.nextInt();
+        }
+
+        long writeValue = random.nextLong();
+        byte[] data = new byte[Long.BYTES];
+        LongPointable.setLong(data, 0, writeValue);
+        ICloudBufferedWriter writer = testClient.createBufferedWriter(bucket, path);
+        CloudResettableInputStream stream = null;
+        boolean aborted = false;
+        try {
+            stream = new CloudResettableInputStream(writer, new WriterSingleBufferProvider());
+            stream.write(data, 0, data.length);
+        } catch (HyracksDataException e) {
+            stream.abort();
+        } finally {
+            if (stream != null && !aborted) {
+                stream.finish();
+                stream.close();
+            }
+        }
+
+        try {
+            long readValue = LongPointable.getLong(testClient.readAllBytes(bucket, path), 0);
+            if (writeValue != readValue) {
+                // This should never happen unless S3 is messed up. But log for sanity check
+                LOGGER.warn(
+                        "The writer can write but the written values wasn't successfully read back (wrote: {}, read:{})",
+                        writeValue, readValue);
+            }
+        } finally {
+            // Delete the written file
+            testClient.deleteObjects(bucket, Collections.singleton(path));
+        }
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriter.java
new file mode 100644
index 0000000..e8983d8
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriter.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import org.apache.asterix.runtime.writer.IExternalFilePrinter;
+import org.apache.asterix.runtime.writer.IExternalFileWriter;
+import org.apache.commons.io.FileUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+final class LocalFSExternalFileWriter implements IExternalFileWriter {
+    private final IExternalFilePrinter printer;
+
+    LocalFSExternalFileWriter(IExternalFilePrinter printer) {
+        this.printer = printer;
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        printer.open();
+    }
+
+    @Override
+    public void newFile(String path) throws HyracksDataException {
+        try {
+            File currentFile = new File(path);
+            if (currentFile.exists()) {
+                currentFile.delete();
+            }
+            FileUtils.createParentDirectories(currentFile);
+            currentFile.createNewFile();
+            printer.newStream(new BufferedOutputStream(new FileOutputStream(currentFile)));
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    @Override
+    public void write(IValueReference value) throws HyracksDataException {
+        printer.print(value);
+    }
+
+    @Override
+    public void abort() {
+        printer.close();
+    }
+
+    @Override
+    public void close() {
+        printer.close();
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriterFactory.java
new file mode 100644
index 0000000..d206b2a
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriterFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer;
+
+import java.io.File;
+
+import org.apache.asterix.runtime.writer.IExternalFileFilterWriterFactoryProvider;
+import org.apache.asterix.runtime.writer.IExternalFilePrinterFactory;
+import org.apache.asterix.runtime.writer.IExternalFileWriter;
+import org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+public final class LocalFSExternalFileWriterFactory implements IExternalFileWriterFactory {
+    private static final long serialVersionUID = 871685327574547749L;
+    public static final IExternalFileFilterWriterFactoryProvider PROVIDER = c -> new LocalFSExternalFileWriterFactory();
+
+    private LocalFSExternalFileWriterFactory() {
+    }
+
+    @Override
+    public IExternalFileWriter createWriter(IHyracksTaskContext context, IExternalFilePrinterFactory printerFactory) {
+        return new LocalFSExternalFileWriter(printerFactory.createPrinter());
+    }
+
+    @Override
+    public char getFileSeparator() {
+        return File.separatorChar;
+    }
+
+    @Override
+    public void validate() {
+        // NoOp
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/compressor/GzipExternalFileCompressStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/compressor/GzipExternalFileCompressStreamFactory.java
new file mode 100644
index 0000000..5ef196d
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/compressor/GzipExternalFileCompressStreamFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer.compressor;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class GzipExternalFileCompressStreamFactory implements IExternalFileCompressStreamFactory {
+    private static final long serialVersionUID = -7364595253362922025L;
+    public static IExternalFileCompressStreamFactory INSTANCE = new GzipExternalFileCompressStreamFactory();
+
+    private GzipExternalFileCompressStreamFactory() {
+    }
+
+    @Override
+    public OutputStream createStream(OutputStream outputStream) throws HyracksDataException {
+        try {
+            return new GzipCompressorOutputStream(outputStream);
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/compressor/IExternalFileCompressStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/compressor/IExternalFileCompressStreamFactory.java
new file mode 100644
index 0000000..e46e5c0
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/compressor/IExternalFileCompressStreamFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer.compressor;
+
+import java.io.OutputStream;
+import java.io.Serializable;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * Creates a compression {@link OutputStream}
+ */
+@FunctionalInterface
+public interface IExternalFileCompressStreamFactory extends Serializable {
+
+    /**
+     * Create a compressed stream before writing to the provided stream
+     *
+     * @param outputStream destination output stream
+     * @return compressing stream
+     */
+    OutputStream createStream(OutputStream outputStream) throws HyracksDataException;
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/compressor/NoOpExternalFileCompressStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/compressor/NoOpExternalFileCompressStreamFactory.java
new file mode 100644
index 0000000..8876e0f
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/compressor/NoOpExternalFileCompressStreamFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer.compressor;
+
+import java.io.OutputStream;
+
+public class NoOpExternalFileCompressStreamFactory implements IExternalFileCompressStreamFactory {
+    private static final long serialVersionUID = -2211142209501287615L;
+    public static final IExternalFileCompressStreamFactory INSTANCE = new NoOpExternalFileCompressStreamFactory();
+
+    private NoOpExternalFileCompressStreamFactory() {
+    }
+
+    @Override
+    public OutputStream createStream(OutputStream outputStream) {
+        return outputStream;
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinter.java
new file mode 100644
index 0000000..bff0db8
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinter.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer.printer;
+
+import java.io.OutputStream;
+import java.io.PrintStream;
+
+import org.apache.asterix.external.writer.compressor.IExternalFileCompressStreamFactory;
+import org.apache.asterix.runtime.writer.IExternalFilePrinter;
+import org.apache.hyracks.algebricks.data.IPrinter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+final class TextualExternalFilePrinter implements IExternalFilePrinter {
+    private final IPrinter printer;
+    private final IExternalFileCompressStreamFactory compressStreamFactory;
+    private PrintStream printStream;
+
+    TextualExternalFilePrinter(IPrinter printer, IExternalFileCompressStreamFactory compressStreamFactory) {
+        this.printer = printer;
+        this.compressStreamFactory = compressStreamFactory;
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        printer.init();
+    }
+
+    @Override
+    public void newStream(OutputStream outputStream) throws HyracksDataException {
+        if (printStream != null) {
+            printStream.close();
+        }
+        printStream = new PrintStream(compressStreamFactory.createStream(outputStream));
+    }
+
+    @Override
+    public void print(IValueReference value) throws HyracksDataException {
+        printer.print(value.getByteArray(), value.getStartOffset(), value.getLength(), printStream);
+        printStream.println();
+    }
+
+    @Override
+    public void close() {
+        printStream.close();
+        if (printStream.checkError()) {
+            throw new IllegalStateException("Print error. Check the logs for more information");
+        }
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinterFactory.java
new file mode 100644
index 0000000..6778532
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinterFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer.printer;
+
+import org.apache.asterix.external.writer.compressor.IExternalFileCompressStreamFactory;
+import org.apache.asterix.runtime.writer.IExternalFilePrinter;
+import org.apache.asterix.runtime.writer.IExternalFilePrinterFactory;
+import org.apache.hyracks.algebricks.data.IPrinterFactory;
+
+public class TextualExternalFilePrinterFactory implements IExternalFilePrinterFactory {
+    private static final long serialVersionUID = 9155959967258587588L;
+    private final IPrinterFactory printerFactory;
+    private final IExternalFileCompressStreamFactory compressStreamFactory;
+
+    public TextualExternalFilePrinterFactory(IPrinterFactory printerFactory,
+            IExternalFileCompressStreamFactory compressStreamFactory) {
+        this.printerFactory = printerFactory;
+        this.compressStreamFactory = compressStreamFactory;
+    }
+
+    @Override
+    public IExternalFilePrinter createPrinter() {
+        return new TextualExternalFilePrinter(printerFactory.createPrinter(), compressStreamFactory);
+    }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/PointableHelper.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/PointableHelper.java
index 7a017bd..deb0da9 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/PointableHelper.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/PointableHelper.java
@@ -62,6 +62,7 @@
     private final UTF8StringWriter utf8Writer;
 
     public static final IPointable NULL_REF = new VoidPointable();
+
     static {
         NULL_REF.set(NULL_BYTES, 0, NULL_BYTES.length);
     }
@@ -139,12 +140,9 @@
     }
 
     /**
-     * @param str
-     *            The input string
-     * @param vs
-     *            The storage buffer
-     * @param writeTag
-     *            Specifying whether a tag for the string should also be written
+     * @param str      The input string
+     * @param vs       The storage buffer
+     * @param writeTag Specifying whether a tag for the string should also be written
      */
     public void serializeString(String str, IMutableValueStorage vs, boolean writeTag) throws HyracksDataException {
         vs.reset();
@@ -219,22 +217,21 @@
      * This method takes multiple pointables, the first pointable being the pointable to write the result to, and
      * checks their ATypeTag value. If a missing or null ATypeTag is encountered, the method will set the result
      * pointable to missing or null accordingly, and will return {@code true}.
-     *
+     * <p>
      * As the missing encounter has a higher priority than the null, the method will keep checking if any missing has
      * been encountered first, if not, it will do a null check at the end.
-     *
+     * <p>
      * If the listAccessor is passed, this method will also go through any list pointable elements and search for
      * a missing value to give it a higher priority over null values. If {@code null} is passed for the listAccessor,
      * the list element check will be skipped.
      *
-     * @param result the result pointable that will hold the data
+     * @param result       the result pointable that will hold the data
      * @param listAccessor list accessor to use for check list elements.
-     * @param pointable1 the first pointable to be checked
-     * @param pointable2 the second pointable to be checked
-     * @param pointable3 the third pointable to be checked
-     * @param pointable4 the fourth pointable to be checked
-     * @param pointable5 the fourth pointable to be checked
-     *
+     * @param pointable1   the first pointable to be checked
+     * @param pointable2   the second pointable to be checked
+     * @param pointable3   the third pointable to be checked
+     * @param pointable4   the fourth pointable to be checked
+     * @param pointable5   the fourth pointable to be checked
      * @return {@code true} if the pointable value is missing or null, {@code false} otherwise.
      */
     public static boolean checkAndSetMissingOrNull(IPointable result, ListAccessor listAccessor, IPointable pointable1,
@@ -311,9 +308,8 @@
      * Checks whether the pointable {@param pointable1} is null or missing, and if true, assigns null to the
      * {@param result}.
      *
-     * @param result the result pointable that will hold the null value
+     * @param result     the result pointable that will hold the null value
      * @param pointable1 the pointable to be checked
-     *
      * @return {@code true} if the {@param pointable1} value is missing or null, {@code false} otherwise.
      */
     public static boolean checkAndSetNull(IPointable result, IPointable pointable1) throws HyracksDataException {
@@ -331,10 +327,9 @@
      * Checks whether any pointable argument is null or missing, and if true, assigns null to the
      * {@param result}.
      *
-     * @param result the result pointable that will hold the null value
+     * @param result     the result pointable that will hold the null value
      * @param pointable1 the pointable to be checked
      * @param pointable2 the pointable to be checked
-     *
      * @return {@code true} if any pointable is missing or null, {@code false} otherwise.
      */
     public static boolean checkAndSetNull(IPointable result, IPointable pointable1, IPointable pointable2)
@@ -344,14 +339,13 @@
 
     /**
      * This method checks and returns the pointable value state.
-     *
+     * <p>
      * If a ListAccessor is passed to this function, it will check if the passed pointable is a list, and if so, it
      * will search for a missing value inside the list before checking for null values. If the listAccessor value is
      * null, no list elements check will be performed.
      *
-     * @param pointable the pointable to be checked
+     * @param pointable    the pointable to be checked
      * @param listAccessor list accessor used to check the list elements.
-     *
      * @return the pointable value state for the passed pointable
      */
     private static PointableValueState getPointableValueState(IPointable pointable, ListAccessor listAccessor)
@@ -400,10 +394,9 @@
      * Check if the provided bytes are of valid long type. In case floats and doubles are accepted, the accepted
      * values will be 1.0 and 2.0, but not 2.5. (only zero decimals)
      *
-     * @param bytes data bytes
-     * @param startOffset start offset
+     * @param bytes                data bytes
+     * @param startOffset          start offset
      * @param acceptFloatAndDouble flag to accept float and double values or not
-     *
      * @return true if provided value is a valid long, false otherwise
      */
     public static boolean isValidLongValue(byte[] bytes, int startOffset, boolean acceptFloatAndDouble) {
@@ -443,4 +436,9 @@
 
         return true;
     }
+
+    public static boolean isNullOrMissing(IValueReference value) {
+        byte typeTag = value.getByteArray()[0];
+        return ATypeTag.MISSING.serialize() == typeTag || ATypeTag.NULL.serialize() == typeTag;
+    }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/AbstractPathResolver.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/AbstractPathResolver.java
new file mode 100644
index 0000000..88153de
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/AbstractPathResolver.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.writer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+abstract class AbstractPathResolver implements IPathResolver {
+    // TODO is 4-digits enough?
+    // TODO do we need jobId?
+    //4-digit format for the partition number, jobId, and file counter
+    private static final int NUMBER_OF_DIGITS = 4;
+    private static final String FILE_FORMAT = "%0" + NUMBER_OF_DIGITS + "d";
+
+    private final String fileExtension;
+    private final char fileSeparator;
+    private final int partition;
+    private final long jobId;
+    private final StringBuilder pathStringBuilder;
+    private int fileCounter;
+
+    AbstractPathResolver(String fileExtension, char fileSeparator, int partition, long jobId) {
+        this.fileExtension = fileExtension;
+        this.fileSeparator = fileSeparator;
+        this.partition = partition;
+        this.jobId = jobId;
+        pathStringBuilder = new StringBuilder();
+        fileCounter = 0;
+    }
+
+    @Override
+    public final String getPartitionPath(IFrameTupleReference tuple) throws HyracksDataException {
+        fileCounter = 0;
+        pathStringBuilder.setLength(0);
+        appendPrefix(pathStringBuilder, tuple);
+        if (pathStringBuilder.charAt(pathStringBuilder.length() - 1) != fileSeparator) {
+            pathStringBuilder.append(fileSeparator);
+        }
+        pathStringBuilder.append(String.format(FILE_FORMAT, partition));
+        pathStringBuilder.append('-');
+        pathStringBuilder.append(String.format(FILE_FORMAT, jobId));
+        pathStringBuilder.append('-');
+        pathStringBuilder.append(String.format(FILE_FORMAT, fileCounter++));
+        if (fileExtension != null && !fileExtension.isEmpty()) {
+            pathStringBuilder.append('.');
+            pathStringBuilder.append(fileExtension);
+        }
+        return pathStringBuilder.toString();
+    }
+
+    @Override
+    public final String getNextPath() {
+        int numOfCharToRemove = NUMBER_OF_DIGITS;
+        if (fileExtension != null && !fileExtension.isEmpty()) {
+            numOfCharToRemove += 1 + fileExtension.length();
+        }
+        pathStringBuilder.setLength(pathStringBuilder.length() - numOfCharToRemove);
+        pathStringBuilder.append(String.format(FILE_FORMAT, fileCounter++));
+        if (fileExtension != null && !fileExtension.isEmpty()) {
+            pathStringBuilder.append('.');
+            pathStringBuilder.append(fileExtension);
+        }
+        return pathStringBuilder.toString();
+    }
+
+    abstract void appendPrefix(StringBuilder pathStringBuilder, IFrameTupleReference tuple) throws HyracksDataException;
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/DynamicPathResolver.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/DynamicPathResolver.java
new file mode 100644
index 0000000..dbd97e4
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/DynamicPathResolver.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.writer;
+
+import java.io.UTFDataFormatException;
+
+import org.apache.asterix.runtime.evaluators.functions.PointableHelper;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import org.apache.hyracks.util.string.UTF8CharBuffer;
+import org.apache.hyracks.util.string.UTF8StringUtil;
+
+final class DynamicPathResolver extends AbstractPathResolver {
+    private final IScalarEvaluator pathEval;
+    private final String inappropriatePartitionPath;
+    private final VoidPointable pathResult;
+    private final UTF8CharBuffer charBuffer;
+
+    DynamicPathResolver(String fileExtension, char fileSeparator, int partition, long jobId, IScalarEvaluator pathEval,
+            String inappropriatePartitionPath, IWarningCollector warningCollector) {
+        super(fileExtension, fileSeparator, partition, jobId);
+        this.pathEval = pathEval;
+        this.inappropriatePartitionPath = inappropriatePartitionPath;
+        pathResult = new VoidPointable();
+        charBuffer = new UTF8CharBuffer();
+    }
+
+    @Override
+    void appendPrefix(StringBuilder pathStringBuilder, IFrameTupleReference tuple) throws HyracksDataException {
+        pathEval.evaluate(tuple, pathResult);
+        if (PointableHelper.isNullOrMissing(pathResult)) {
+            // TODO warn
+            pathStringBuilder.append(inappropriatePartitionPath);
+            return;
+        }
+
+        try {
+            UTF8StringUtil.readUTF8(pathResult.getByteArray(), pathResult.getStartOffset() + 1, charBuffer);
+        } catch (UTFDataFormatException e) {
+            throw HyracksDataException.create(e);
+        }
+        pathStringBuilder.append(charBuffer.getBuffer(), 0, charBuffer.getFilledLength());
+    }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriter.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriter.java
new file mode 100644
index 0000000..a4af805
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriter.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.writer;
+
+import org.apache.hyracks.algebricks.runtime.writers.IExternalWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+final class ExternalWriter implements IExternalWriter {
+    private final IPathResolver pathResolver;
+    private final IExternalFileWriter writer;
+    private final int maxResultPerFile;
+    private int tupleCounter;
+
+    public ExternalWriter(IPathResolver pathResolver, IExternalFileWriter writer, int maxResultPerFile) {
+        this.pathResolver = pathResolver;
+        this.writer = writer;
+        this.maxResultPerFile = maxResultPerFile;
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        writer.open();
+    }
+
+    @Override
+    public void initNewPartition(IFrameTupleReference tuple) throws HyracksDataException {
+        tupleCounter = 0;
+        writer.newFile(pathResolver.getPartitionPath(tuple));
+    }
+
+    @Override
+    public void write(IValueReference value) throws HyracksDataException {
+        writer.write(value);
+        tupleCounter++;
+        if (tupleCounter >= maxResultPerFile) {
+            tupleCounter = 0;
+            writer.newFile(pathResolver.getNextPath());
+        }
+    }
+
+    @Override
+    public void abort() throws HyracksDataException {
+        writer.abort();
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        writer.close();
+    }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriterFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriterFactory.java
new file mode 100644
index 0000000..350c5d6
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriterFactory.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.writer;
+
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.evaluators.EvaluatorContext;
+import org.apache.hyracks.algebricks.runtime.writers.IExternalWriter;
+import org.apache.hyracks.algebricks.runtime.writers.IExternalWriterFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+
+public class ExternalWriterFactory implements IExternalWriterFactory {
+    private static final long serialVersionUID = 1412969574113419638L;
+    private final IExternalFileWriterFactory writerFactory;
+    private final IExternalFilePrinterFactory printerFactory;
+    private final String fileExtension;
+    private final int maxResult;
+    private final IScalarEvaluatorFactory pathEvalFactory;
+    private final String inappropriatePartitionPath;
+    private final String staticPath;
+
+    public ExternalWriterFactory(IExternalFileWriterFactory writerFactory, IExternalFilePrinterFactory printerFactory,
+            String fileExtension, int maxResult, IScalarEvaluatorFactory pathEvalFactory,
+            String inappropriatePartitionPath, String staticPath) {
+        this.writerFactory = writerFactory;
+        this.printerFactory = printerFactory;
+        this.fileExtension = fileExtension;
+        this.maxResult = maxResult;
+        this.pathEvalFactory = pathEvalFactory;
+        this.inappropriatePartitionPath = inappropriatePartitionPath;
+        this.staticPath = staticPath;
+    }
+
+    @Override
+    public IExternalWriter createWriter(IHyracksTaskContext context) throws HyracksDataException {
+        int partition = context.getTaskAttemptId().getTaskId().getPartition();
+        long jobId = context.getJobletContext().getJobId().getId();
+        char fileSeparator = writerFactory.getFileSeparator();
+        IPathResolver resolver;
+        if (staticPath == null) {
+            EvaluatorContext evaluatorContext = new EvaluatorContext(context);
+            IScalarEvaluator pathEval = pathEvalFactory.createScalarEvaluator(evaluatorContext);
+            IWarningCollector warningCollector = context.getWarningCollector();
+            resolver = new DynamicPathResolver(fileExtension, fileSeparator, partition, jobId, pathEval,
+                    inappropriatePartitionPath, warningCollector);
+        } else {
+            resolver = new StaticPathResolver(fileExtension, fileSeparator, partition, jobId, staticPath);
+        }
+        IExternalFileWriter writer = writerFactory.createWriter(context, printerFactory);
+        return new ExternalWriter(resolver, writer, maxResult);
+    }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileFilterWriterFactoryProvider.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileFilterWriterFactoryProvider.java
new file mode 100644
index 0000000..2de5e11
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileFilterWriterFactoryProvider.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.writer;
+
+import java.util.Map;
+
+@FunctionalInterface
+public interface IExternalFileFilterWriterFactoryProvider {
+    IExternalFileWriterFactory create(Map<String, String> configuration);
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinter.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinter.java
new file mode 100644
index 0000000..fb89f4a
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.writer;
+
+import java.io.OutputStream;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+/**
+ * An {@link IExternalFileWriter} printer
+ */
+public interface IExternalFilePrinter {
+
+    /**
+     * Open the printer
+     */
+    void open() throws HyracksDataException;
+
+    /**
+     * Initialize the printer with a new stream
+     *
+     * @param outputStream to print to
+     */
+    void newStream(OutputStream outputStream) throws HyracksDataException;
+
+    /**
+     * Print the provided value
+     *
+     * @param value to print
+     */
+    void print(IValueReference value) throws HyracksDataException;
+
+    /**
+     * Flush and close the printer
+     */
+    void close();
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinterFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinterFactory.java
new file mode 100644
index 0000000..a4fa97b
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinterFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.writer;
+
+import java.io.Serializable;
+
+/**
+ * {@link IExternalFileWriter} printer factory
+ */
+public interface IExternalFilePrinterFactory extends Serializable {
+    /**
+     * @return a new external file printer
+     */
+    IExternalFilePrinter createPrinter();
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriter.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriter.java
new file mode 100644
index 0000000..c07e8cb
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriter.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.writer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+/**
+ * A file writer
+ */
+public interface IExternalFileWriter {
+
+    /**
+     * Open the writer
+     */
+    void open() throws HyracksDataException;
+
+    /**
+     * Initialize the writer to write to a new path
+     *
+     * @param path of the file to writer (including the file name)
+     */
+    void newFile(String path) throws HyracksDataException;
+
+    /**
+     * Writer the provided value
+     *
+     * @param value to write
+     */
+    void write(IValueReference value) throws HyracksDataException;
+
+    /**
+     * Run the abort sequence in case of a failure
+     */
+    void abort() throws HyracksDataException;
+
+    /**
+     * Flush the final result and close the writer
+     */
+    void close() throws HyracksDataException;
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriterFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriterFactory.java
new file mode 100644
index 0000000..2ab6dac
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriterFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.writer;
+
+import java.io.Serializable;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * An interface for writing to a storage device
+ * Implementer should also provide a singleton to {@link IExternalFileFilterWriterFactoryProvider}
+ */
+public interface IExternalFileWriterFactory extends Serializable {
+    /**
+     * Create a writer
+     *
+     * @param context        task context
+     * @param printerFactory printer factory for writing the final result
+     * @return a new file writer
+     */
+    IExternalFileWriter createWriter(IHyracksTaskContext context, IExternalFilePrinterFactory printerFactory)
+            throws HyracksDataException;
+
+    /**
+     * @return file (or path) separator
+     */
+    char getFileSeparator();
+
+    /**
+     * Validate the writer by running a test write routine to ensure the writer has the appropriate permissions
+     */
+    void validate() throws AlgebricksException;
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IPathResolver.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IPathResolver.java
new file mode 100644
index 0000000..2419210
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IPathResolver.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.writer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+/**
+ * Path resolver which generates paths for the written files
+ */
+interface IPathResolver {
+
+    /**
+     * Extract the partitioning values from the provided tuple and generates the file path
+     *
+     * @param tuple contains the partitioning values
+     * @return the final path which includes the partitioning values
+     */
+    String getPartitionPath(IFrameTupleReference tuple) throws HyracksDataException;
+
+    /**
+     * @return the path of the next file to be written for the same partition
+     */
+    String getNextPath();
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/StaticPathResolver.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/StaticPathResolver.java
new file mode 100644
index 0000000..34c6575
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/StaticPathResolver.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.writer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+final class StaticPathResolver extends AbstractPathResolver {
+    private final String prefix;
+
+    StaticPathResolver(String fileExtension, char fileSeparator, int partition, long jobId, String prefix) {
+        super(fileExtension, fileSeparator, partition, jobId);
+        this.prefix = prefix;
+    }
+
+    @Override
+    void appendPrefix(StringBuilder pathStringBuilder, IFrameTupleReference tuple) throws HyracksDataException {
+        pathStringBuilder.append(prefix);
+    }
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntime.java
new file mode 100644
index 0000000..e96531f
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntime.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.runtime.operators.writer;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputSinkPushRuntime;
+import org.apache.hyracks.algebricks.runtime.writers.IExternalWriter;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import org.apache.hyracks.dataflow.common.data.accessors.PermutingFrameTupleReference;
+import org.apache.hyracks.dataflow.common.data.accessors.PointableTupleReference;
+import org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupWriter;
+
+final class SinkExternalWriterRuntime extends AbstractOneInputSinkPushRuntime {
+    private final int sourceColumn;
+    private final int[] partitionColumns;
+    private final IPointable sourceValue;
+    private final PointableTupleReference partitionColumnsPrevCopy;
+    private final PermutingFrameTupleReference partitionColumnsRef;
+    private final IBinaryComparator[] partitionComparators;
+    private final IExternalWriter writer;
+    private FrameTupleAccessor tupleAccessor;
+    private FrameTupleReference tupleRef;
+    private boolean first;
+
+    SinkExternalWriterRuntime(int sourceColumn, int[] partitionColumns, IBinaryComparator[] partitionComparators,
+            RecordDescriptor inputRecordDesc, IExternalWriter writer) {
+        this.sourceColumn = sourceColumn;
+        this.partitionColumns = partitionColumns;
+        this.sourceValue = new VoidPointable();
+        partitionColumnsRef = new PermutingFrameTupleReference(partitionColumns);
+        partitionColumnsPrevCopy =
+                PointableTupleReference.create(partitionColumns.length, ArrayBackedValueStorage::new);
+        this.partitionComparators = partitionComparators;
+        this.inputRecordDesc = inputRecordDesc;
+        this.writer = writer;
+        first = true;
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        if (tupleAccessor == null) {
+            writer.open();
+            tupleAccessor = new FrameTupleAccessor(inputRecordDesc);
+            tupleRef = new FrameTupleReference();
+        }
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        tupleAccessor.reset(buffer);
+        for (int i = 0; i < tupleAccessor.getTupleCount(); i++) {
+            tupleRef.reset(tupleAccessor, i);
+            if (isNewPartition(i)) {
+                writer.initNewPartition(tupleRef);
+            }
+            setValue(tupleRef, sourceColumn, sourceValue);
+            writer.write(sourceValue);
+            partitionColumnsRef.reset(tupleAccessor, i);
+            partitionColumnsPrevCopy.set(partitionColumnsRef);
+        }
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        writer.abort();
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        writer.close();
+    }
+
+    private boolean isNewPartition(int index) throws HyracksDataException {
+        if (first) {
+            first = false;
+            return true;
+        }
+
+        return !PreclusteredGroupWriter.sameGroup(partitionColumnsPrevCopy, tupleAccessor, index, partitionColumns,
+                partitionComparators);
+    }
+
+    private void setValue(IFrameTupleReference tuple, int column, IPointable value) {
+        byte[] data = tuple.getFieldData(column);
+        int start = tuple.getFieldStart(column);
+        int length = tuple.getFieldLength(column);
+        value.set(data, start, length);
+    }
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntimeFactory.java
new file mode 100644
index 0000000..6220dec
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntimeFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.runtime.operators.writer;
+
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
+import org.apache.hyracks.algebricks.runtime.operators.base.AbstractPushRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.writers.IExternalWriter;
+import org.apache.hyracks.algebricks.runtime.writers.IExternalWriterFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public final class SinkExternalWriterRuntimeFactory extends AbstractPushRuntimeFactory {
+    private static final long serialVersionUID = -2215789207336628581L;
+    private final int sourceColumn;
+    private final int[] partitionColumn;
+    private final IBinaryComparatorFactory[] partitionComparatorFactories;
+    private final RecordDescriptor inputRecordDescriptor;
+    private final IExternalWriterFactory writerFactory;
+
+    public SinkExternalWriterRuntimeFactory(int sourceColumn, int[] partitionColumn,
+            IBinaryComparatorFactory[] partitionComparatorFactories, RecordDescriptor inputRecordDescriptor,
+            IExternalWriterFactory writerFactory) {
+        this.sourceColumn = sourceColumn;
+        this.partitionColumn = partitionColumn;
+        this.partitionComparatorFactories = partitionComparatorFactories;
+        this.inputRecordDescriptor = inputRecordDescriptor;
+        this.writerFactory = writerFactory;
+    }
+
+    @Override
+    public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException {
+        IExternalWriter writer = writerFactory.createWriter(ctx);
+        IBinaryComparator[] partitionComparators = new IBinaryComparator[partitionComparatorFactories.length];
+        for (int i = 0; i < partitionComparatorFactories.length; i++) {
+            partitionComparators[i] = partitionComparatorFactories[i].createBinaryComparator();
+        }
+        SinkExternalWriterRuntime runtime = new SinkExternalWriterRuntime(sourceColumn, partitionColumn,
+                partitionComparators, inputRecordDescriptor, writer);
+        return new IPushRuntime[] { runtime };
+    }
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IExternalWriter.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IExternalWriter.java
new file mode 100644
index 0000000..81ed880
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IExternalWriter.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.runtime.writers;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+/**
+ * An external writer of a query (or dataset) result
+ */
+public interface IExternalWriter {
+    /**
+     * Open the writer
+     */
+    void open() throws HyracksDataException;
+
+    /**
+     * Initialize the writer for a new partition
+     *
+     * @param tuple which contains the partitioning columns
+     */
+    void initNewPartition(IFrameTupleReference tuple) throws HyracksDataException;
+
+    /**
+     * Write the provided value
+     *
+     * @param value to be written
+     */
+    void write(IValueReference value) throws HyracksDataException;
+
+    /**
+     * Run the abort sequence in case of a failure
+     */
+    void abort() throws HyracksDataException;
+
+    /**
+     * Flush the final result and close the writer
+     */
+    void close() throws HyracksDataException;
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IExternalWriterFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IExternalWriterFactory.java
new file mode 100644
index 0000000..e6899a6
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/IExternalWriterFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.runtime.writers;
+
+import java.io.Serializable;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * A writer factory which creates a writer for the result of a query (or dataset) to a storage device
+ */
+@FunctionalInterface
+public interface IExternalWriterFactory extends Serializable {
+    /**
+     * Crete a new writer
+     *
+     * @param context task context
+     * @return new writer
+     */
+    IExternalWriter createWriter(IHyracksTaskContext context) throws HyracksDataException;
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8CharBuffer.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8CharBuffer.java
new file mode 100644
index 0000000..30ebf69
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8CharBuffer.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.util.string;
+
+public class UTF8CharBuffer {
+    private char[] buffer;
+    private int filledLength;
+
+    public char[] getBuffer() {
+        return buffer;
+    }
+
+    public int getFilledLength() {
+        return filledLength;
+    }
+
+    char[] getBuffer(int requiredLength) {
+        if (buffer == null || buffer.length < requiredLength) {
+            buffer = new char[requiredLength];
+        }
+
+        return buffer;
+    }
+
+    void setFilledLength(int filledLength) {
+        this.filledLength = filledLength;
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8StringUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8StringUtil.java
index cde79cb..4d3aced 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8StringUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8StringUtil.java
@@ -26,6 +26,7 @@
 import java.io.OutputStream;
 import java.io.UTFDataFormatException;
 import java.lang.ref.SoftReference;
+import java.util.Objects;
 
 import org.apache.hyracks.util.encoding.VarLenIntEncoderDecoder;
 
@@ -535,14 +536,29 @@
             chararr = reader.chararr;
         }
 
+        in.readFully(bytearr, 0, utflen);
+
+        int chararr_count = readUTF8(bytearr, 0, utflen, chararr);
+
+        // The number of chars produced may be less than utflen
+        return new String(chararr, 0, chararr_count);
+    }
+
+    public static void readUTF8(byte[] bytearr, int start, UTF8CharBuffer buffer) throws UTFDataFormatException {
+        Objects.requireNonNull(buffer);
+        int utflen = VarLenIntEncoderDecoder.decode(bytearr, start);
+        int lengthIntSize = VarLenIntEncoderDecoder.getBytesRequired(utflen);
+        char[] chararr = buffer.getBuffer(utflen);
+        buffer.setFilledLength(readUTF8(bytearr, start + lengthIntSize, utflen, chararr));
+    }
+
+    private static int readUTF8(byte[] bytearr, int start, int utflen, char[] chararr) throws UTFDataFormatException {
         int c, char2, char3;
         int count = 0;
         int chararr_count = 0;
 
-        in.readFully(bytearr, 0, utflen);
-
         while (count < utflen) {
-            c = bytearr[count] & 0xff;
+            c = bytearr[start + count] & 0xff;
             if (c > 127) {
                 break;
             }
@@ -596,8 +612,7 @@
                     throw new UTFDataFormatException("malformed input around byte " + count);
             }
         }
-        // The number of chars produced may be less than utflen
-        return new String(chararr, 0, chararr_count);
+        return chararr_count;
     }
 
     /**