[ASTERIXDB-3288][RT] COPY TO runtime - Part 2

- user model changes: yes
- storage format changes: no
- interface changes: yes

Details:
- Add runtime directory path validation
- Fail if the writing path exists and
  is not empty
- Changed file counter to have the maxmimum
  number of digits that Java long can hold
  instead of only 4-digits

Change-Id: I3bd18d6c77feaf90600f66a437facdbab925aef1
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17890
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudOutputStream.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudOutputStream.java
index 18a97ad..349b1b1 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudOutputStream.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudOutputStream.java
@@ -21,7 +21,7 @@
 import java.io.IOException;
 import java.io.OutputStream;
 
-public class CloudOutputStream extends OutputStream {
+public final class CloudOutputStream extends OutputStream {
     private final CloudResettableInputStream inputStream;
 
     public CloudOutputStream(CloudResettableInputStream inputStream) {
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
index f198001..0533184 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
@@ -139,7 +139,7 @@
         try {
             bufferedWriter.upload(this, writeBuffer.limit());
         } catch (Exception e) {
-            LOGGER.fatal(e);
+            LOGGER.error(e);
             throw HyracksDataException.create(e);
         }
 
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
index 76a768a..7941ada 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
@@ -130,6 +130,8 @@
      */
     boolean exists(String bucket, String path) throws HyracksDataException;
 
+    boolean isEmptyPrefix(String bucket, String path) throws HyracksDataException;
+
     /**
      * Create a parallel downloader
      *
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
index a2aa21c..c23e437 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
@@ -18,7 +18,6 @@
  */
 package org.apache.asterix.cloud.clients.aws.s3;
 
-import java.io.Serializable;
 import java.util.Map;
 
 import org.apache.asterix.common.config.CloudProperties;
@@ -28,8 +27,7 @@
 import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
 
-public final class S3ClientConfig implements Serializable {
-    private static final long serialVersionUID = 548292720313565948L;
+public final class S3ClientConfig {
     // The maximum number of file that can be deleted (AWS restriction)
     static final int DELETE_BATCH_SIZE = 1000;
     private final String region;
@@ -55,17 +53,16 @@
 
     public static S3ClientConfig of(Map<String, String> configuration) {
         // Used to determine local vs. actual S3
-        String endPoint = configuration.get(S3Constants.SERVICE_END_POINT_FIELD_NAME);
+        String endPoint = configuration.getOrDefault(S3Constants.SERVICE_END_POINT_FIELD_NAME, "");
         // Disabled
         long profilerLogInterval = 0;
 
         // Dummy values;
         String region = "";
-        String prefix = null;
+        String prefix = "";
         boolean anonymousAuth = false;
 
-        return new S3ClientConfig(region, configuration.get(S3Constants.SERVICE_END_POINT_FIELD_NAME), "",
-                anonymousAuth, profilerLogInterval);
+        return new S3ClientConfig(region, endPoint, prefix, anonymousAuth, profilerLogInterval);
     }
 
     public String getRegion() {
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClientUtils.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientUtils.java
similarity index 88%
rename from asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClientUtils.java
rename to asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientUtils.java
index 82b0ae1..fe47537 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClientUtils.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientUtils.java
@@ -31,9 +31,9 @@
 import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
 import software.amazon.awssdk.services.s3.model.S3Object;
 
-public class S3CloudClientUtils {
+public class S3ClientUtils {
 
-    private S3CloudClientUtils() {
+    private S3ClientUtils() {
         throw new AssertionError("do not instantiate");
     }
 
@@ -63,6 +63,13 @@
         return files;
     }
 
+    public static boolean isEmptyPrefix(S3Client s3Client, String bucket, String path) {
+        ListObjectsV2Request.Builder listObjectsBuilder = ListObjectsV2Request.builder().bucket(bucket);
+        listObjectsBuilder.prefix(toCloudPrefix(path));
+
+        return s3Client.listObjectsV2(listObjectsBuilder.build()).contents().isEmpty();
+    }
+
     public static String encodeURI(String path) {
         if (path.isEmpty()) {
             return path;
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
index b47a6ff..5cdf971 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
@@ -19,8 +19,8 @@
 package org.apache.asterix.cloud.clients.aws.s3;
 
 import static org.apache.asterix.cloud.clients.aws.s3.S3ClientConfig.DELETE_BATCH_SIZE;
-import static org.apache.asterix.cloud.clients.aws.s3.S3CloudClientUtils.encodeURI;
-import static org.apache.asterix.cloud.clients.aws.s3.S3CloudClientUtils.listS3Objects;
+import static org.apache.asterix.cloud.clients.aws.s3.S3ClientUtils.encodeURI;
+import static org.apache.asterix.cloud.clients.aws.s3.S3ClientUtils.listS3Objects;
 
 import java.io.FilenameFilter;
 import java.io.IOException;
@@ -227,6 +227,12 @@
     }
 
     @Override
+    public boolean isEmptyPrefix(String bucket, String path) throws HyracksDataException {
+        profiler.objectsList();
+        return S3ClientUtils.isEmptyPrefix(s3Client, bucket, path);
+    }
+
+    @Override
     public IParallelDownloader createParallelDownloader(String bucket, IOManager ioManager) {
         return new S3ParallelDownloader(bucket, ioManager, config, profiler);
     }
@@ -269,7 +275,7 @@
     private Set<String> filterAndGet(List<S3Object> contents, FilenameFilter filter) {
         Set<String> files = new HashSet<>();
         for (S3Object s3Object : contents) {
-            String path = config.isLocalS3Provider() ? S3CloudClientUtils.decodeURI(s3Object.key()) : s3Object.key();
+            String path = config.isLocalS3Provider() ? S3ClientUtils.decodeURI(s3Object.key()) : s3Object.key();
             if (filter.accept(null, IoUtil.getFileNameFromPath(path))) {
                 files.add(path);
             }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java
new file mode 100644
index 0000000..64eee70
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.writer;
+
+import org.apache.asterix.cloud.CloudOutputStream;
+import org.apache.asterix.cloud.CloudResettableInputStream;
+import org.apache.asterix.cloud.IWriteBufferProvider;
+import org.apache.asterix.cloud.WriterSingleBufferProvider;
+import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
+import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.runtime.writer.IExternalFilePrinter;
+import org.apache.asterix.runtime.writer.IExternalFileWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.api.exceptions.Warning;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+import com.google.common.base.Utf8;
+
+abstract class AbstractCloudExternalFileWriter implements IExternalFileWriter {
+    private final IExternalFilePrinter printer;
+    private final ICloudClient cloudClient;
+    private final String bucket;
+    private final boolean partitionedPath;
+    private final IWarningCollector warningCollector;
+    private final SourceLocation pathSourceLocation;
+    private final IWriteBufferProvider bufferProvider;
+    private ICloudBufferedWriter bufferedWriter;
+
+    AbstractCloudExternalFileWriter(IExternalFilePrinter printer, ICloudClient cloudClient, String bucket,
+            boolean partitionedPath, IWarningCollector warningCollector, SourceLocation pathSourceLocation) {
+        this.printer = printer;
+        this.cloudClient = cloudClient;
+        this.bucket = bucket;
+        this.partitionedPath = partitionedPath;
+        this.warningCollector = warningCollector;
+        this.pathSourceLocation = pathSourceLocation;
+        bufferProvider = new WriterSingleBufferProvider();
+    }
+
+    @Override
+    public final void open() throws HyracksDataException {
+        printer.open();
+    }
+
+    @Override
+    public void validate(String directory) throws HyracksDataException {
+        if (partitionedPath && !cloudClient.isEmptyPrefix(bucket, directory)) {
+            throw new RuntimeDataException(ErrorCode.DIRECTORY_IS_NOT_EMPTY, pathSourceLocation, directory);
+        }
+    }
+
+    @Override
+    public final boolean newFile(String directory, String fileName) throws HyracksDataException {
+        String fullPath = directory + fileName;
+        if (isExceedingMaxLength(fullPath)) {
+            if (warningCollector.shouldWarn()) {
+                warningCollector.warn(Warning.of(pathSourceLocation, ErrorCode.WRITE_PATH_LENGTH_EXCEEDS_MAX_LENGTH,
+                        fullPath, getPathMaxLengthInBytes(), getAdapterName()));
+            }
+            return false;
+        }
+
+        bufferedWriter = cloudClient.createBufferedWriter(bucket, fullPath);
+        CloudResettableInputStream inputStream = new CloudResettableInputStream(bufferedWriter, bufferProvider);
+
+        CloudOutputStream outputStream = new CloudOutputStream(inputStream);
+        printer.newStream(outputStream);
+
+        return true;
+    }
+
+    @Override
+    public final void write(IValueReference value) throws HyracksDataException {
+        printer.print(value);
+    }
+
+    @Override
+    public final void abort() throws HyracksDataException {
+        bufferedWriter.abort();
+        printer.close();
+    }
+
+    @Override
+    public final void close() throws HyracksDataException {
+        printer.close();
+    }
+
+    abstract String getAdapterName();
+
+    abstract int getPathMaxLengthInBytes();
+
+    private boolean isExceedingMaxLength(String path) {
+        return Utf8.encodedLength(path) >= getPathMaxLengthInBytes();
+    }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/CloudExternalFileWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/CloudExternalFileWriter.java
deleted file mode 100644
index 22095ca..0000000
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/CloudExternalFileWriter.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.cloud.writer;
-
-import org.apache.asterix.cloud.CloudOutputStream;
-import org.apache.asterix.cloud.CloudResettableInputStream;
-import org.apache.asterix.cloud.IWriteBufferProvider;
-import org.apache.asterix.cloud.WriterSingleBufferProvider;
-import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
-import org.apache.asterix.cloud.clients.ICloudClient;
-import org.apache.asterix.runtime.writer.IExternalFilePrinter;
-import org.apache.asterix.runtime.writer.IExternalFileWriter;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.data.std.api.IValueReference;
-
-final class CloudExternalFileWriter implements IExternalFileWriter {
-    private final IExternalFilePrinter printer;
-    private final ICloudClient cloudClient;
-    private final String bucket;
-    private final IWriteBufferProvider bufferProvider;
-    private ICloudBufferedWriter bufferedWriter;
-
-    public CloudExternalFileWriter(IExternalFilePrinter printer, ICloudClient cloudClient, String bucket) {
-        this.printer = printer;
-        this.cloudClient = cloudClient;
-        this.bucket = bucket;
-        bufferProvider = new WriterSingleBufferProvider();
-    }
-
-    @Override
-    public void open() throws HyracksDataException {
-        printer.open();
-    }
-
-    @Override
-    public void newFile(String path) throws HyracksDataException {
-        bufferedWriter = cloudClient.createBufferedWriter(bucket, path);
-        CloudResettableInputStream inputStream = new CloudResettableInputStream(bufferedWriter, bufferProvider);
-
-        CloudOutputStream outputStream = new CloudOutputStream(inputStream);
-        printer.newStream(outputStream);
-    }
-
-    @Override
-    public void write(IValueReference value) throws HyracksDataException {
-        printer.print(value);
-    }
-
-    @Override
-    public void abort() throws HyracksDataException {
-        bufferedWriter.abort();
-        printer.close();
-    }
-
-    @Override
-    public void close() throws HyracksDataException {
-        printer.close();
-    }
-}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriter.java
new file mode 100644
index 0000000..5fd4ea6
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.writer;
+
+import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.runtime.writer.IExternalFilePrinter;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+
+final class S3ExternalFileWriter extends AbstractCloudExternalFileWriter {
+
+    S3ExternalFileWriter(IExternalFilePrinter printer, ICloudClient cloudClient, String bucket, boolean partitionedPath,
+            IWarningCollector warningCollector, SourceLocation pathSourceLocation) {
+        super(printer, cloudClient, bucket, partitionedPath, warningCollector, pathSourceLocation);
+    }
+
+    @Override
+    String getAdapterName() {
+        return ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3;
+    }
+
+    @Override
+    int getPathMaxLengthInBytes() {
+        return 1024;
+    }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
index 204d5da..d7a51cd 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
@@ -33,13 +33,17 @@
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.aws.s3.S3Utils;
+import org.apache.asterix.runtime.writer.ExternalFileWriterConfiguration;
 import org.apache.asterix.runtime.writer.IExternalFileFilterWriterFactoryProvider;
+import org.apache.asterix.runtime.writer.IExternalFilePrinter;
 import org.apache.asterix.runtime.writer.IExternalFilePrinterFactory;
 import org.apache.asterix.runtime.writer.IExternalFileWriter;
 import org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.api.util.ExceptionUtils;
 import org.apache.hyracks.data.std.primitive.LongPointable;
 import org.apache.logging.log4j.LogManager;
@@ -48,16 +52,30 @@
 import software.amazon.awssdk.services.s3.model.NoSuchBucketException;
 
 public final class S3ExternalFileWriterFactory implements IExternalFileWriterFactory {
-    private static final Logger LOGGER = LogManager.getLogger();
     private static final long serialVersionUID = 4551318140901866805L;
-    public static final IExternalFileFilterWriterFactoryProvider PROVIDER = S3ExternalFileWriterFactory::new;
-    private final S3ClientConfig config;
+    private static final Logger LOGGER = LogManager.getLogger();
+    static final char SEPARATOR = '/';
+    public static final IExternalFileFilterWriterFactoryProvider PROVIDER =
+            new IExternalFileFilterWriterFactoryProvider() {
+                @Override
+                public IExternalFileWriterFactory create(ExternalFileWriterConfiguration configuration) {
+                    return new S3ExternalFileWriterFactory(configuration);
+                }
+
+                @Override
+                public char getSeparator() {
+                    return SEPARATOR;
+                }
+            };
     private final Map<String, String> configuration;
+    private final SourceLocation pathSourceLocation;
+    private final String staticPath;
     private transient S3CloudClient cloudClient;
 
-    private S3ExternalFileWriterFactory(Map<String, String> configuration) {
-        this.config = S3ClientConfig.of(configuration);
-        this.configuration = configuration;
+    private S3ExternalFileWriterFactory(ExternalFileWriterConfiguration externalConfig) {
+        configuration = externalConfig.getConfiguration();
+        pathSourceLocation = externalConfig.getPathSourceLocation();
+        staticPath = externalConfig.getStaticPath();
         cloudClient = null;
     }
 
@@ -66,14 +84,18 @@
             throws HyracksDataException {
         buildClient();
         String bucket = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
-        return new CloudExternalFileWriter(printerFactory.createPrinter(), cloudClient, bucket);
+        IExternalFilePrinter printer = printerFactory.createPrinter();
+        IWarningCollector warningCollector = context.getWarningCollector();
+        return new S3ExternalFileWriter(printer, cloudClient, bucket, staticPath == null, warningCollector,
+                pathSourceLocation);
     }
 
     private void buildClient() throws HyracksDataException {
         try {
             synchronized (this) {
                 if (cloudClient == null) {
-                    // only a single client should be build
+                    // only a single client should be built
+                    S3ClientConfig config = S3ClientConfig.of(configuration);
                     cloudClient = new S3CloudClient(config, S3Utils.buildAwsS3Client(configuration));
                 }
             }
@@ -83,32 +105,40 @@
     }
 
     @Override
-    public char getFileSeparator() {
-        return '/';
+    public char getSeparator() {
+        return SEPARATOR;
     }
 
     @Override
     public void validate() throws AlgebricksException {
+        S3ClientConfig config = S3ClientConfig.of(configuration);
         ICloudClient testClient = new S3CloudClient(config, S3Utils.buildAwsS3Client(configuration));
         String bucket = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+
         if (bucket == null || bucket.isEmpty()) {
             throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED,
                     ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
         }
+
         try {
             doValidate(testClient, bucket);
         } catch (IOException e) {
             if (e.getCause() instanceof NoSuchBucketException) {
                 throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, bucket);
             } else {
-                LOGGER.fatal(e);
+                LOGGER.error(e);
                 throw CompilationException.create(ErrorCode.EXTERNAL_SOURCE_ERROR,
                         ExceptionUtils.getMessageOrToString(e));
             }
         }
     }
 
-    private static void doValidate(ICloudClient testClient, String bucket) throws IOException {
+    private void doValidate(ICloudClient testClient, String bucket) throws IOException, AlgebricksException {
+        if (staticPath != null && !testClient.isEmptyPrefix(bucket, staticPath)) {
+            // Ensure that the static path is empty
+            throw new CompilationException(ErrorCode.DIRECTORY_IS_NOT_EMPTY, pathSourceLocation, staticPath);
+        }
+
         Random random = new Random();
         String pathPrefix = "testFile";
         String path = pathPrefix + random.nextInt();
@@ -127,6 +157,7 @@
             stream.write(data, 0, data.length);
         } catch (HyracksDataException e) {
             stream.abort();
+            aborted = true;
         } finally {
             if (stream != null && !aborted) {
                 stream.finish();
diff --git a/asterixdb/asterix-cloud/src/test/java/org/apach/asterix/cloud/LSMTest.java b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/LSMTest.java
similarity index 96%
rename from asterixdb/asterix-cloud/src/test/java/org/apach/asterix/cloud/LSMTest.java
rename to asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/LSMTest.java
index 612aa1d..92d7f12 100644
--- a/asterixdb/asterix-cloud/src/test/java/org/apach/asterix/cloud/LSMTest.java
+++ b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/LSMTest.java
@@ -16,15 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apach.asterix.cloud;
+package org.apache.asterix.cloud;
 
 import java.io.FilenameFilter;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collections;
 
-import org.apache.asterix.cloud.CloudResettableInputStream;
-import org.apache.asterix.cloud.WriteBufferProvider;
 import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
 import org.apache.asterix.cloud.clients.ICloudClient;
 import org.apache.logging.log4j.LogManager;
diff --git a/asterixdb/asterix-cloud/src/test/java/org/apach/asterix/cloud/s3/LSMS3Test.java b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java
similarity index 97%
rename from asterixdb/asterix-cloud/src/test/java/org/apach/asterix/cloud/s3/LSMS3Test.java
rename to asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java
index 06286cc..0452da0 100644
--- a/asterixdb/asterix-cloud/src/test/java/org/apach/asterix/cloud/s3/LSMS3Test.java
+++ b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java
@@ -16,11 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apach.asterix.cloud.s3;
+package org.apache.asterix.cloud.s3;
 
 import java.net.URI;
 
-import org.apach.asterix.cloud.LSMTest;
+import org.apache.asterix.cloud.LSMTest;
 import org.apache.asterix.cloud.clients.aws.s3.S3ClientConfig;
 import org.apache.asterix.cloud.clients.aws.s3.S3CloudClient;
 import org.junit.AfterClass;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 9ac8513..0c3d8ca 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -89,6 +89,10 @@
     INVALID_PARQUET_FILE(59),
     TYPE_MISMATCH_EXTRA_FIELD(60),
     TYPE_MISMATCH_MISSING_FIELD(61),
+    DIRECTORY_IS_NOT_EMPTY(62),
+    COULD_NOT_CREATE_FILE(63),
+    NON_STRING_WRITE_PATH(64),
+    WRITE_PATH_LENGTH_EXCEEDS_MAX_LENGTH(65),
 
     UNSUPPORTED_JRE(100),
 
@@ -282,6 +286,8 @@
     UNKNOWN_DATABASE(1185),
     DATABASE_EXISTS(1186),
     CANNOT_DROP_DATABASE_DEPENDENT_EXISTS(1187),
+    UNSUPPORTED_WRITING_ADAPTER(1188),
+    UNSUPPORTED_WRITING_FORMAT(1189),
 
     // Feed errors
     DATAFLOW_ILLEGAL_STATE(3001),
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 122bff6..1d48e1e 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -96,6 +96,10 @@
 59 = Invalid Parquet file: %1$s. Reason: %2$s
 60 = Type mismatch: including an extra field %1$s
 61 = Type mismatch: missing a required field %1$s: %2$s
+62 = Cannot write to a non-empty directory '%1$s'
+63 = Could not create file '%1$s'
+64 = Path expression produced a value of type '%1$s'. Path must be of type string
+65 = Length of the file path '%1$s' exceeds the maximum length of '%2$s bytes' allowed in %3$s
 
 100 = Unsupported JRE: %1$s
 
@@ -284,6 +288,8 @@
 1185 = Cannot find database with name %1$s
 1186 = A database with this name %1$s already exists
 1187 = Cannot drop database: %1$s %2$s being used by %3$s %4$s
+1188 = Unsupported writing adapter '%1$s'. Supported adapters: %2$s
+1189 = Unsupported writing format '%1$s'. Supported formats: %2$s
 
 # Feed Errors
 3001 = Illegal state.
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 7428a4b..7c30b64 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -18,8 +18,6 @@
  */
 package org.apache.asterix.external.util;
 
-import java.util.Collections;
-import java.util.HashSet;
 import java.util.Set;
 import java.util.TimeZone;
 import java.util.function.LongSupplier;
@@ -207,23 +205,13 @@
     public static final String FORMAT_PARQUET = "parquet";
     public static final String FORMAT_APACHE_ICEBERG = "apache-iceberg";
     public static final Set<String> ALL_FORMATS;
+    public static final Set<String> TEXTUAL_FORMATS;
 
     static {
-        Set<String> formats = new HashSet<>(14);
-        formats.add(FORMAT_BINARY);
-        formats.add(FORMAT_ADM);
-        formats.add(FORMAT_JSON_LOWER_CASE);
-        formats.add(FORMAT_DELIMITED_TEXT);
-        formats.add(FORMAT_TWEET);
-        formats.add(FORMAT_RSS);
-        formats.add(FORMAT_SEMISTRUCTURED);
-        formats.add(FORMAT_LINE_SEPARATED);
-        formats.add(FORMAT_HDFS_WRITABLE);
-        formats.add(FORMAT_KV);
-        formats.add(FORMAT_CSV);
-        formats.add(FORMAT_TSV);
-        formats.add(FORMAT_PARQUET);
-        ALL_FORMATS = Collections.unmodifiableSet(formats);
+        ALL_FORMATS = Set.of(FORMAT_BINARY, FORMAT_ADM, FORMAT_JSON_LOWER_CASE, FORMAT_DELIMITED_TEXT, FORMAT_TWEET,
+                FORMAT_RSS, FORMAT_SEMISTRUCTURED, FORMAT_LINE_SEPARATED, FORMAT_HDFS_WRITABLE, FORMAT_KV, FORMAT_CSV,
+                FORMAT_TSV, FORMAT_PARQUET);
+        TEXTUAL_FORMATS = Set.of(FORMAT_ADM, FORMAT_JSON_LOWER_CASE, FORMAT_CSV, FORMAT_TSV);
     }
 
     /**
@@ -311,6 +299,27 @@
     public static final String PREFIX_DEFAULT_DELIMITER = "/";
     public static final Pattern COMPUTED_FIELD_PATTERN = Pattern.compile("\\{[^{}:]+:[^{}:]+}");
 
+    /**
+     * Compression constants
+     */
+    public static final String KEY_COMPRESSION_GZIP = "gzip";
+
+    /**
+     * Writer Constants
+     */
+    public static final String KEY_WRITER_MAX_RESULT = "max-objects-per-file";
+    public static final String KEY_WRITER_COMPRESSION = "compression";
+    public static final int WRITER_MAX_RESULT_DEFAULT = 1000;
+    public static final Set<String> WRITER_SUPPORTED_FORMATS;
+    public static final Set<String> WRITER_SUPPORTED_ADAPTERS;
+    public static final Set<String> WRITER_SUPPORTED_COMPRESSION;
+
+    static {
+        WRITER_SUPPORTED_FORMATS = Set.of(FORMAT_JSON_LOWER_CASE);
+        WRITER_SUPPORTED_ADAPTERS = Set.of(ALIAS_LOCALFS_ADAPTER.toLowerCase(), KEY_ADAPTER_NAME_AWS_S3.toLowerCase());
+        WRITER_SUPPORTED_COMPRESSION = Set.of(KEY_COMPRESSION_GZIP);
+    }
+
     public static class ParquetOptions {
         private ParquetOptions() {
         }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/WriterValidationUtil.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/WriterValidationUtil.java
new file mode 100644
index 0000000..4c848b1
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/WriterValidationUtil.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+
+public class WriterValidationUtil {
+
+    private WriterValidationUtil() {
+    }
+
+    public static void validateWriterConfiguration(String adapter, Set<String> supportedAdapters,
+            Map<String, String> configuration, SourceLocation sourceLocation) throws CompilationException {
+        validateAdapter(adapter, supportedAdapters, sourceLocation);
+        validateFormat(configuration, sourceLocation);
+        validateCompression(configuration, sourceLocation);
+        validateMaxResult(configuration, sourceLocation);
+    }
+
+    private static void validateAdapter(String adapter, Set<String> supportedAdapters, SourceLocation sourceLocation)
+            throws CompilationException {
+        checkSupported(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE, adapter, supportedAdapters,
+                ErrorCode.UNSUPPORTED_WRITING_ADAPTER, sourceLocation, false);
+    }
+
+    private static void validateFormat(Map<String, String> configuration, SourceLocation sourceLocation)
+            throws CompilationException {
+        String format = configuration.get(ExternalDataConstants.KEY_FORMAT);
+        checkSupported(ExternalDataConstants.KEY_FORMAT, format, ExternalDataConstants.WRITER_SUPPORTED_FORMATS,
+                ErrorCode.UNSUPPORTED_WRITING_FORMAT, sourceLocation, false);
+    }
+
+    private static void validateCompression(Map<String, String> configuration, SourceLocation sourceLocation)
+            throws CompilationException {
+        String compression = configuration.get(ExternalDataConstants.KEY_WRITER_COMPRESSION);
+        checkSupported(ExternalDataConstants.KEY_WRITER_COMPRESSION, compression,
+                ExternalDataConstants.WRITER_SUPPORTED_COMPRESSION, ErrorCode.UNKNOWN_COMPRESSION_SCHEME,
+                sourceLocation, true);
+    }
+
+    private static void validateMaxResult(Map<String, String> configuration, SourceLocation sourceLocation)
+            throws CompilationException {
+        String maxResult = configuration.get(ExternalDataConstants.KEY_WRITER_MAX_RESULT);
+        if (maxResult == null) {
+            return;
+        }
+
+        try {
+            Integer.parseInt(maxResult);
+        } catch (NumberFormatException e) {
+            throw CompilationException.create(ErrorCode.INTEGER_VALUE_EXPECTED, sourceLocation, maxResult);
+        }
+    }
+
+    private static void checkSupported(String paramKey, String value, Set<String> supportedSet, ErrorCode errorCode,
+            SourceLocation sourceLocation, boolean optional) throws CompilationException {
+        if (optional && value == null) {
+            return;
+        }
+
+        if (value == null) {
+            throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, sourceLocation, paramKey);
+        }
+
+        String normalizedValue = value.toLowerCase();
+        if (!supportedSet.contains(normalizedValue)) {
+            throw CompilationException.create(errorCode, sourceLocation, value, supportedSet.toString());
+        }
+    }
+
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/ILocalFSValidator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/ILocalFSValidator.java
new file mode 100644
index 0000000..c026607
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/ILocalFSValidator.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+
+interface ILocalFSValidator {
+    void validate(String directory, SourceLocation sourceLocation) throws HyracksDataException;
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriter.java
index e8983d8..a3a2f70 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriter.java
@@ -23,17 +23,25 @@
 import java.io.FileOutputStream;
 import java.io.IOException;
 
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.runtime.writer.IExternalFilePrinter;
 import org.apache.asterix.runtime.writer.IExternalFileWriter;
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.data.std.api.IValueReference;
 
 final class LocalFSExternalFileWriter implements IExternalFileWriter {
     private final IExternalFilePrinter printer;
+    private final ILocalFSValidator validator;
+    private final SourceLocation pathSourceLocation;
 
-    LocalFSExternalFileWriter(IExternalFilePrinter printer) {
+    LocalFSExternalFileWriter(IExternalFilePrinter printer, ILocalFSValidator validator,
+            SourceLocation pathSourceLocation) {
         this.printer = printer;
+        this.validator = validator;
+        this.pathSourceLocation = pathSourceLocation;
     }
 
     @Override
@@ -42,18 +50,24 @@
     }
 
     @Override
-    public void newFile(String path) throws HyracksDataException {
+    public void validate(String directory) throws HyracksDataException {
+        validator.validate(directory, pathSourceLocation);
+    }
+
+    @Override
+    public boolean newFile(String directory, String fileName) throws HyracksDataException {
         try {
-            File currentFile = new File(path);
-            if (currentFile.exists()) {
-                currentFile.delete();
-            }
+            File parentDirectory = new File(directory);
+            File currentFile = new File(parentDirectory, fileName);
             FileUtils.createParentDirectories(currentFile);
-            currentFile.createNewFile();
+            if (!currentFile.createNewFile()) {
+                throw RuntimeDataException.create(ErrorCode.COULD_NOT_CREATE_FILE, currentFile.getAbsolutePath());
+            }
             printer.newStream(new BufferedOutputStream(new FileOutputStream(currentFile)));
         } catch (IOException e) {
             throw HyracksDataException.create(e);
         }
+        return true;
     }
 
     @Override
@@ -62,12 +76,12 @@
     }
 
     @Override
-    public void abort() {
+    public void abort() throws HyracksDataException {
         printer.close();
     }
 
     @Override
-    public void close() {
+    public void close() throws HyracksDataException {
         printer.close();
     }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriterFactory.java
index d206b2a..313757a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriterFactory.java
@@ -20,31 +20,106 @@
 
 import java.io.File;
 
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.runtime.writer.ExternalFileWriterConfiguration;
 import org.apache.asterix.runtime.writer.IExternalFileFilterWriterFactoryProvider;
 import org.apache.asterix.runtime.writer.IExternalFilePrinterFactory;
 import org.apache.asterix.runtime.writer.IExternalFileWriter;
 import org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
 
 public final class LocalFSExternalFileWriterFactory implements IExternalFileWriterFactory {
     private static final long serialVersionUID = 871685327574547749L;
-    public static final IExternalFileFilterWriterFactoryProvider PROVIDER = c -> new LocalFSExternalFileWriterFactory();
+    private static final char SEPARATOR = File.separatorChar;
+    public static final IExternalFileFilterWriterFactoryProvider PROVIDER =
+            new IExternalFileFilterWriterFactoryProvider() {
+                @Override
+                public IExternalFileWriterFactory create(ExternalFileWriterConfiguration configuration) {
+                    return new LocalFSExternalFileWriterFactory(configuration);
+                }
 
-    private LocalFSExternalFileWriterFactory() {
+                @Override
+                public char getSeparator() {
+                    return SEPARATOR;
+                }
+            };
+    private static final ILocalFSValidator NO_OP_VALIDATOR = LocalFSExternalFileWriterFactory::noOpValidation;
+    private static final ILocalFSValidator VALIDATOR = LocalFSExternalFileWriterFactory::validate;
+    private final SourceLocation pathSourceLocation;
+    private final boolean singleNodeCluster;
+    private final String staticPath;
+    private boolean validated;
+
+    private LocalFSExternalFileWriterFactory(ExternalFileWriterConfiguration externalConfig) {
+        pathSourceLocation = externalConfig.getPathSourceLocation();
+        singleNodeCluster = externalConfig.isSingleNodeCluster();
+        staticPath = externalConfig.getStaticPath();
+        validated = false;
     }
 
     @Override
-    public IExternalFileWriter createWriter(IHyracksTaskContext context, IExternalFilePrinterFactory printerFactory) {
-        return new LocalFSExternalFileWriter(printerFactory.createPrinter());
+    public IExternalFileWriter createWriter(IHyracksTaskContext context, IExternalFilePrinterFactory printerFactory)
+            throws HyracksDataException {
+        ILocalFSValidator validator = VALIDATOR;
+        if (staticPath != null) {
+            synchronized (this) {
+                validateStaticPath();
+            }
+            validator = NO_OP_VALIDATOR;
+        }
+        return new LocalFSExternalFileWriter(printerFactory.createPrinter(), validator, pathSourceLocation);
     }
 
     @Override
-    public char getFileSeparator() {
-        return File.separatorChar;
+    public char getSeparator() {
+        return SEPARATOR;
     }
 
     @Override
-    public void validate() {
+    public void validate() throws AlgebricksException {
+        // A special case validation for a single node cluster
+        if (singleNodeCluster && staticPath != null) {
+            if (isNonEmptyDirectory(new File(staticPath))) {
+                throw new CompilationException(ErrorCode.DIRECTORY_IS_NOT_EMPTY, pathSourceLocation, staticPath);
+            }
+
+            // Ensure that it is not validated again in a single node cluster
+            validated = true;
+        }
+    }
+
+    private void validateStaticPath() throws HyracksDataException {
+        if (validated) {
+            return;
+        }
+
+        if (isNonEmptyDirectory(new File(staticPath))) {
+            throw new RuntimeDataException(ErrorCode.DIRECTORY_IS_NOT_EMPTY, pathSourceLocation, staticPath);
+        }
+        validated = true;
+    }
+
+    static boolean isNonEmptyDirectory(File parentDirectory) {
+        if (parentDirectory.exists()) {
+            String[] files = parentDirectory.list();
+            return files != null && files.length != 0;
+        }
+
+        return false;
+    }
+
+    private static void noOpValidation(String directory, SourceLocation sourceLocation) {
         // NoOp
     }
+
+    private static void validate(String directory, SourceLocation sourceLocation) throws HyracksDataException {
+        if (isNonEmptyDirectory(new File(directory))) {
+            throw new RuntimeDataException(ErrorCode.DIRECTORY_IS_NOT_EMPTY, sourceLocation, directory);
+        }
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinter.java
index bff0db8..8f8c63a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinter.java
@@ -30,6 +30,7 @@
 final class TextualExternalFilePrinter implements IExternalFilePrinter {
     private final IPrinter printer;
     private final IExternalFileCompressStreamFactory compressStreamFactory;
+    private TextualOutputStreamDelegate delegate;
     private PrintStream printStream;
 
     TextualExternalFilePrinter(IPrinter printer, IExternalFileCompressStreamFactory compressStreamFactory) {
@@ -45,22 +46,26 @@
     @Override
     public void newStream(OutputStream outputStream) throws HyracksDataException {
         if (printStream != null) {
-            printStream.close();
+            close();
         }
-        printStream = new PrintStream(compressStreamFactory.createStream(outputStream));
+        delegate = new TextualOutputStreamDelegate(compressStreamFactory.createStream(outputStream));
+        printStream = new PrintStream(delegate);
     }
 
     @Override
     public void print(IValueReference value) throws HyracksDataException {
         printer.print(value.getByteArray(), value.getStartOffset(), value.getLength(), printStream);
         printStream.println();
+        delegate.checkError();
     }
 
     @Override
-    public void close() {
-        printStream.close();
-        if (printStream.checkError()) {
-            throw new IllegalStateException("Print error. Check the logs for more information");
+    public void close() throws HyracksDataException {
+        if (printStream != null) {
+            printStream.close();
+            printStream = null;
+            delegate.checkError();
+            delegate = null;
         }
     }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualOutputStreamDelegate.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualOutputStreamDelegate.java
new file mode 100644
index 0000000..2ec2661
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualOutputStreamDelegate.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer.printer;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Objects;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+/**
+ * The purpose of this is to catch and check an error that could be thrown by {@link #stream} when printing results.
+ * {@link java.io.PrintStream} doesn't throw any exception on write. Hence, we can miss all errors thrown during
+ * write operations.
+ *
+ * @see TextualExternalFilePrinter#print(IValueReference)
+ */
+final class TextualOutputStreamDelegate extends OutputStream {
+    private final OutputStream stream;
+    private Exception exception;
+
+    TextualOutputStreamDelegate(OutputStream stream) {
+        this.stream = Objects.requireNonNull(stream);
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        try {
+            stream.write(b, off, len);
+        } catch (Exception e) {
+            this.exception = e;
+            throw e;
+        }
+
+    }
+
+    @Override
+    public void write(byte[] b) throws IOException {
+        try {
+            stream.write(b);
+        } catch (Exception e) {
+            this.exception = e;
+            throw e;
+        }
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+        try {
+            stream.write(b);
+        } catch (Exception e) {
+            this.exception = e;
+            throw e;
+        }
+    }
+
+    @Override
+    public synchronized void flush() throws IOException {
+        try {
+            stream.flush();
+        } catch (Exception e) {
+            this.exception = e;
+            throw e;
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        try {
+            stream.close();
+        } catch (Exception e) {
+            this.exception = e;
+            throw e;
+        }
+    }
+
+    void checkError() throws HyracksDataException {
+        if (exception != null) {
+            throw HyracksDataException.create(exception);
+        }
+    }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/AbstractPathResolver.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/AbstractPathResolver.java
index 88153de..bacf434 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/AbstractPathResolver.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/AbstractPathResolver.java
@@ -18,66 +18,52 @@
  */
 package org.apache.asterix.runtime.writer;
 
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-
 abstract class AbstractPathResolver implements IPathResolver {
-    // TODO is 4-digits enough?
-    // TODO do we need jobId?
     //4-digit format for the partition number, jobId, and file counter
-    private static final int NUMBER_OF_DIGITS = 4;
-    private static final String FILE_FORMAT = "%0" + NUMBER_OF_DIGITS + "d";
+    private static final int PREFIX_NUMBER_OF_DIGITS = 4;
+    private static final String PREFIX_FILE_FORMAT = "%0" + PREFIX_NUMBER_OF_DIGITS + "d";
+    private static final String FILE_COUNTER_FORMAT = "%0" + getMaxNumberOfDigits() + "d";
 
     private final String fileExtension;
-    private final char fileSeparator;
-    private final int partition;
-    private final long jobId;
-    private final StringBuilder pathStringBuilder;
-    private int fileCounter;
+    private final int fileCounterOffset;
+    private final StringBuilder fileStringBuilder;
+    protected final char fileSeparator;
+    private long fileCounter;
 
-    AbstractPathResolver(String fileExtension, char fileSeparator, int partition, long jobId) {
+    AbstractPathResolver(String fileExtension, char fileSeparator, int partition) {
         this.fileExtension = fileExtension;
         this.fileSeparator = fileSeparator;
-        this.partition = partition;
-        this.jobId = jobId;
-        pathStringBuilder = new StringBuilder();
+        fileStringBuilder = new StringBuilder();
+        fileCounterOffset = initFileBuilder(fileStringBuilder, partition, fileExtension);
         fileCounter = 0;
     }
 
     @Override
-    public final String getPartitionPath(IFrameTupleReference tuple) throws HyracksDataException {
-        fileCounter = 0;
-        pathStringBuilder.setLength(0);
-        appendPrefix(pathStringBuilder, tuple);
-        if (pathStringBuilder.charAt(pathStringBuilder.length() - 1) != fileSeparator) {
-            pathStringBuilder.append(fileSeparator);
-        }
-        pathStringBuilder.append(String.format(FILE_FORMAT, partition));
-        pathStringBuilder.append('-');
-        pathStringBuilder.append(String.format(FILE_FORMAT, jobId));
-        pathStringBuilder.append('-');
-        pathStringBuilder.append(String.format(FILE_FORMAT, fileCounter++));
+    public final String getNextFileName() {
+        fileStringBuilder.setLength(fileCounterOffset);
+        fileStringBuilder.append(String.format(FILE_COUNTER_FORMAT, fileCounter++));
         if (fileExtension != null && !fileExtension.isEmpty()) {
-            pathStringBuilder.append('.');
-            pathStringBuilder.append(fileExtension);
+            fileStringBuilder.append('.');
+            fileStringBuilder.append(fileExtension);
         }
-        return pathStringBuilder.toString();
+        return fileStringBuilder.toString();
     }
 
-    @Override
-    public final String getNextPath() {
-        int numOfCharToRemove = NUMBER_OF_DIGITS;
+    private static int initFileBuilder(StringBuilder fileStringBuilder, int partition, String fileExtension) {
+        fileStringBuilder.append(String.format(PREFIX_FILE_FORMAT, partition));
+        fileStringBuilder.append('-');
+        int offset = fileStringBuilder.length();
+        // dummy
+        fileStringBuilder.append(String.format(FILE_COUNTER_FORMAT, 0));
         if (fileExtension != null && !fileExtension.isEmpty()) {
-            numOfCharToRemove += 1 + fileExtension.length();
+            fileStringBuilder.append('.');
+            fileStringBuilder.append(fileExtension);
         }
-        pathStringBuilder.setLength(pathStringBuilder.length() - numOfCharToRemove);
-        pathStringBuilder.append(String.format(FILE_FORMAT, fileCounter++));
-        if (fileExtension != null && !fileExtension.isEmpty()) {
-            pathStringBuilder.append('.');
-            pathStringBuilder.append(fileExtension);
-        }
-        return pathStringBuilder.toString();
+        return offset;
     }
 
-    abstract void appendPrefix(StringBuilder pathStringBuilder, IFrameTupleReference tuple) throws HyracksDataException;
+    private static int getMaxNumberOfDigits() {
+        return Long.toString(Long.MAX_VALUE).length();
+    }
+
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/DynamicPathResolver.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/DynamicPathResolver.java
index dbd97e4..a1d2411 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/DynamicPathResolver.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/DynamicPathResolver.java
@@ -20,10 +20,13 @@
 
 import java.io.UTFDataFormatException;
 
-import org.apache.asterix.runtime.evaluators.functions.PointableHelper;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.om.types.ATypeTag;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.api.exceptions.Warning;
 import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 import org.apache.hyracks.util.string.UTF8CharBuffer;
@@ -31,26 +34,44 @@
 
 final class DynamicPathResolver extends AbstractPathResolver {
     private final IScalarEvaluator pathEval;
-    private final String inappropriatePartitionPath;
+    private final IWarningCollector warningCollector;
+    private final StringBuilder dirStringBuilder;
     private final VoidPointable pathResult;
     private final UTF8CharBuffer charBuffer;
+    private final SourceLocation pathSourceLocation;
 
-    DynamicPathResolver(String fileExtension, char fileSeparator, int partition, long jobId, IScalarEvaluator pathEval,
-            String inappropriatePartitionPath, IWarningCollector warningCollector) {
-        super(fileExtension, fileSeparator, partition, jobId);
+    DynamicPathResolver(String fileExtension, char fileSeparator, int partition, IScalarEvaluator pathEval,
+            IWarningCollector warningCollector, SourceLocation pathSourceLocation) {
+        super(fileExtension, fileSeparator, partition);
         this.pathEval = pathEval;
-        this.inappropriatePartitionPath = inappropriatePartitionPath;
+        this.warningCollector = warningCollector;
+        this.pathSourceLocation = pathSourceLocation;
+        dirStringBuilder = new StringBuilder();
         pathResult = new VoidPointable();
         charBuffer = new UTF8CharBuffer();
     }
 
     @Override
-    void appendPrefix(StringBuilder pathStringBuilder, IFrameTupleReference tuple) throws HyracksDataException {
+    public String getPartitionDirectory(IFrameTupleReference tuple) throws HyracksDataException {
+        if (!appendPrefix(tuple)) {
+            return ExternalWriter.UNRESOLVABLE_PATH;
+        }
+
+        if (dirStringBuilder.charAt(dirStringBuilder.length() - 1) != fileSeparator) {
+            dirStringBuilder.append(fileSeparator);
+        }
+        return dirStringBuilder.toString();
+    }
+
+    private boolean appendPrefix(IFrameTupleReference tuple) throws HyracksDataException {
+        dirStringBuilder.setLength(0);
         pathEval.evaluate(tuple, pathResult);
-        if (PointableHelper.isNullOrMissing(pathResult)) {
-            // TODO warn
-            pathStringBuilder.append(inappropriatePartitionPath);
-            return;
+        ATypeTag typeTag = ATypeTag.VALUE_TYPE_MAPPING[pathResult.getByteArray()[0]];
+        if (typeTag != ATypeTag.STRING) {
+            if (warningCollector.shouldWarn()) {
+                warningCollector.warn(Warning.of(pathSourceLocation, ErrorCode.NON_STRING_WRITE_PATH, typeTag));
+            }
+            return false;
         }
 
         try {
@@ -58,6 +79,7 @@
         } catch (UTFDataFormatException e) {
             throw HyracksDataException.create(e);
         }
-        pathStringBuilder.append(charBuffer.getBuffer(), 0, charBuffer.getFilledLength());
+        dirStringBuilder.append(charBuffer.getBuffer(), 0, charBuffer.getFilledLength());
+        return true;
     }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriterConfiguration.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriterConfiguration.java
new file mode 100644
index 0000000..b62a07a
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriterConfiguration.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.writer;
+
+import java.util.Map;
+
+import org.apache.hyracks.api.exceptions.SourceLocation;
+
+public final class ExternalFileWriterConfiguration {
+    private final Map<String, String> configuration;
+    private final SourceLocation pathSourceLocation;
+    private final String staticPath;
+    private final boolean singleNodeCluster;
+
+    public ExternalFileWriterConfiguration(Map<String, String> configuration, SourceLocation pathSourceLocation,
+            String staticPath, boolean singleNodeCluster) {
+        this.configuration = configuration;
+        this.pathSourceLocation = pathSourceLocation;
+        this.staticPath = staticPath;
+        this.singleNodeCluster = singleNodeCluster;
+    }
+
+    public Map<String, String> getConfiguration() {
+        return configuration;
+    }
+
+    public SourceLocation getPathSourceLocation() {
+        return pathSourceLocation;
+    }
+
+    public String getStaticPath() {
+        return staticPath;
+    }
+
+    public boolean isSingleNodeCluster() {
+        return singleNodeCluster;
+    }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriter.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriter.java
index a4af805..5fc07af 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriter.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriter.java
@@ -24,9 +24,11 @@
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 final class ExternalWriter implements IExternalWriter {
+    static final String UNRESOLVABLE_PATH = "UNRESOLVABLE_PATH";
     private final IPathResolver pathResolver;
     private final IExternalFileWriter writer;
     private final int maxResultPerFile;
+    private String partitionPath;
     private int tupleCounter;
 
     public ExternalWriter(IPathResolver pathResolver, IExternalFileWriter writer, int maxResultPerFile) {
@@ -42,17 +44,23 @@
 
     @Override
     public void initNewPartition(IFrameTupleReference tuple) throws HyracksDataException {
-        tupleCounter = 0;
-        writer.newFile(pathResolver.getPartitionPath(tuple));
+        partitionPath = pathResolver.getPartitionDirectory(tuple);
+        if (UNRESOLVABLE_PATH != partitionPath) {
+            writer.validate(partitionPath);
+            newFile();
+        }
     }
 
     @Override
     public void write(IValueReference value) throws HyracksDataException {
+        if (UNRESOLVABLE_PATH == partitionPath) {
+            // Ignore writing values for unresolvable partition paths
+            return;
+        }
         writer.write(value);
         tupleCounter++;
         if (tupleCounter >= maxResultPerFile) {
-            tupleCounter = 0;
-            writer.newFile(pathResolver.getNextPath());
+            newFile();
         }
     }
 
@@ -65,4 +73,12 @@
     public void close() throws HyracksDataException {
         writer.close();
     }
+
+    private void newFile() throws HyracksDataException {
+        tupleCounter = 0;
+        if (!writer.newFile(partitionPath, pathResolver.getNextFileName())) {
+            // the partitionPath could contain illegal chars or the length of the total path is too long
+            partitionPath = UNRESOLVABLE_PATH;
+        }
+    }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriterFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriterFactory.java
index 350c5d6..e7c0db0 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriterFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriterFactory.java
@@ -26,6 +26,7 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.SourceLocation;
 
 public class ExternalWriterFactory implements IExternalWriterFactory {
     private static final long serialVersionUID = 1412969574113419638L;
@@ -34,35 +35,34 @@
     private final String fileExtension;
     private final int maxResult;
     private final IScalarEvaluatorFactory pathEvalFactory;
-    private final String inappropriatePartitionPath;
     private final String staticPath;
+    private final SourceLocation pathSourceLocation;
 
     public ExternalWriterFactory(IExternalFileWriterFactory writerFactory, IExternalFilePrinterFactory printerFactory,
-            String fileExtension, int maxResult, IScalarEvaluatorFactory pathEvalFactory,
-            String inappropriatePartitionPath, String staticPath) {
+            String fileExtension, int maxResult, IScalarEvaluatorFactory pathEvalFactory, String staticPath,
+            SourceLocation pathSourceLocation) {
         this.writerFactory = writerFactory;
         this.printerFactory = printerFactory;
         this.fileExtension = fileExtension;
         this.maxResult = maxResult;
         this.pathEvalFactory = pathEvalFactory;
-        this.inappropriatePartitionPath = inappropriatePartitionPath;
         this.staticPath = staticPath;
+        this.pathSourceLocation = pathSourceLocation;
     }
 
     @Override
     public IExternalWriter createWriter(IHyracksTaskContext context) throws HyracksDataException {
         int partition = context.getTaskAttemptId().getTaskId().getPartition();
-        long jobId = context.getJobletContext().getJobId().getId();
-        char fileSeparator = writerFactory.getFileSeparator();
+        char fileSeparator = writerFactory.getSeparator();
         IPathResolver resolver;
         if (staticPath == null) {
             EvaluatorContext evaluatorContext = new EvaluatorContext(context);
             IScalarEvaluator pathEval = pathEvalFactory.createScalarEvaluator(evaluatorContext);
             IWarningCollector warningCollector = context.getWarningCollector();
-            resolver = new DynamicPathResolver(fileExtension, fileSeparator, partition, jobId, pathEval,
-                    inappropriatePartitionPath, warningCollector);
+            resolver = new DynamicPathResolver(fileExtension, fileSeparator, partition, pathEval, warningCollector,
+                    pathSourceLocation);
         } else {
-            resolver = new StaticPathResolver(fileExtension, fileSeparator, partition, jobId, staticPath);
+            resolver = new StaticPathResolver(fileExtension, fileSeparator, partition, staticPath);
         }
         IExternalFileWriter writer = writerFactory.createWriter(context, printerFactory);
         return new ExternalWriter(resolver, writer, maxResult);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileFilterWriterFactoryProvider.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileFilterWriterFactoryProvider.java
index 2de5e11..7a863f7 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileFilterWriterFactoryProvider.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileFilterWriterFactoryProvider.java
@@ -18,9 +18,8 @@
  */
 package org.apache.asterix.runtime.writer;
 
-import java.util.Map;
-
-@FunctionalInterface
 public interface IExternalFileFilterWriterFactoryProvider {
-    IExternalFileWriterFactory create(Map<String, String> configuration);
+    IExternalFileWriterFactory create(ExternalFileWriterConfiguration configuration);
+
+    char getSeparator();
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinter.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinter.java
index fb89f4a..ba5fa1d 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinter.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinter.java
@@ -50,5 +50,5 @@
     /**
      * Flush and close the printer
      */
-    void close();
+    void close() throws HyracksDataException;
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriter.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriter.java
index c07e8cb..f8ae81b 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriter.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriter.java
@@ -32,11 +32,20 @@
     void open() throws HyracksDataException;
 
     /**
+     * Validate the writing directory
+     *
+     * @param directory to write
+     */
+    void validate(String directory) throws HyracksDataException;
+
+    /**
      * Initialize the writer to write to a new path
      *
-     * @param path of the file to writer (including the file name)
+     * @param directory of where to write the file
+     * @param fileName  of the file name to create
+     * @return true if a new file can be created, false otherwise
      */
-    void newFile(String path) throws HyracksDataException;
+    boolean newFile(String directory, String fileName) throws HyracksDataException;
 
     /**
      * Writer the provided value
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriterFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriterFactory.java
index 2ab6dac..d8f1f84 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriterFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriterFactory.java
@@ -42,7 +42,7 @@
     /**
      * @return file (or path) separator
      */
-    char getFileSeparator();
+    char getSeparator();
 
     /**
      * Validate the writer by running a test write routine to ensure the writer has the appropriate permissions
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IPathResolver.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IPathResolver.java
index 2419210..35b4ddd 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IPathResolver.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IPathResolver.java
@@ -32,10 +32,11 @@
      * @param tuple contains the partitioning values
      * @return the final path which includes the partitioning values
      */
-    String getPartitionPath(IFrameTupleReference tuple) throws HyracksDataException;
+    String getPartitionDirectory(IFrameTupleReference tuple) throws HyracksDataException;
 
     /**
      * @return the path of the next file to be written for the same partition
      */
-    String getNextPath();
+    String getNextFileName();
+
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/StaticPathResolver.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/StaticPathResolver.java
index 34c6575..4caf44f 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/StaticPathResolver.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/StaticPathResolver.java
@@ -18,19 +18,23 @@
  */
 package org.apache.asterix.runtime.writer;
 
-import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 final class StaticPathResolver extends AbstractPathResolver {
-    private final String prefix;
+    private final String directoryPath;
 
-    StaticPathResolver(String fileExtension, char fileSeparator, int partition, long jobId, String prefix) {
-        super(fileExtension, fileSeparator, partition, jobId);
-        this.prefix = prefix;
+    StaticPathResolver(String fileExtension, char fileSeparator, int partition, String directoryPath) {
+        super(fileExtension, fileSeparator, partition);
+
+        if (directoryPath.charAt(directoryPath.length() - 1) != fileSeparator) {
+            this.directoryPath = directoryPath + fileSeparator;
+        } else {
+            this.directoryPath = directoryPath;
+        }
     }
 
     @Override
-    void appendPrefix(StringBuilder pathStringBuilder, IFrameTupleReference tuple) throws HyracksDataException {
-        pathStringBuilder.append(prefix);
+    public String getPartitionDirectory(IFrameTupleReference tuple) {
+        return directoryPath;
     }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntime.java
index e96531f..01e137b 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntime.java
@@ -22,6 +22,7 @@
 
 import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputSinkPushRuntime;
 import org.apache.hyracks.algebricks.runtime.writers.IExternalWriter;
+import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -46,6 +47,7 @@
     private FrameTupleAccessor tupleAccessor;
     private FrameTupleReference tupleRef;
     private boolean first;
+    private IFrameWriter frameWriter;
 
     SinkExternalWriterRuntime(int sourceColumn, int[] partitionColumns, IBinaryComparator[] partitionComparators,
             RecordDescriptor inputRecordDesc, IExternalWriter writer) {
@@ -68,6 +70,7 @@
             tupleAccessor = new FrameTupleAccessor(inputRecordDesc);
             tupleRef = new FrameTupleReference();
         }
+        this.frameWriter.open();
     }
 
     @Override
@@ -88,11 +91,18 @@
     @Override
     public void fail() throws HyracksDataException {
         writer.abort();
+        frameWriter.fail();
     }
 
     @Override
     public void close() throws HyracksDataException {
         writer.close();
+        frameWriter.close();
+    }
+
+    @Override
+    public void setOutputFrameWriter(int index, IFrameWriter frameWriter, RecordDescriptor recordDesc) {
+        this.frameWriter = frameWriter;
     }
 
     private boolean isNewPartition(int index) throws HyracksDataException {
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index 7291473..af95280 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -154,6 +154,7 @@
     PARSING_ERROR(124),
     INVALID_INVERTED_LIST_TYPE_TRAITS(125),
     ILLEGAL_STATE(126),
+    UNSUPPORTED_WRITE_SPEC(127),
 
     // Compilation error codes.
     RULECOLLECTION_NOT_INSTANCE_OF_LIST(10000),
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 4d9c60b..b2e6312 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -144,6 +144,7 @@
 124 = Parsing error %s: %s
 125 = Invalid inverted list type traits: %1$s
 126 = Illegal state. %1$s
+127 = Unsupported copy to specification: PARTITION BY %1$s, ORDER BY %2$s
 
 10000 = The given rule collection %1$s is not an instance of the List class.
 10001 = Cannot compose partition constraint %1$s with %2$s