[ASTERIXDB-3357][COMP][RT] Compiler and runtime support for COPY TO statement
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
This change provides the compiler and runtime support
for COPY TO statement to write to different destinations.
Change-Id: Icea1ff9f32fe49ee83d0739f5c5a305c3345faa7
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18169
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Hussain Towaileb <hussainht@gmail.com>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
index 7dd0217..0949478 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
@@ -361,7 +361,7 @@
return translate(expr, outputDatasetName, (ICompiledDmlStatement) stmt, null, resultMetadata);
}
- private ILogicalPlan translateCopyTo(Query expr, CompiledStatements.ICompiledStatement stmt,
+ public ILogicalPlan translateCopyTo(Query expr, CompiledStatements.ICompiledStatement stmt,
IResultMetadata resultMetadata) throws AlgebricksException {
CompiledStatements.CompiledCopyToStatement copyTo = (CompiledStatements.CompiledCopyToStatement) stmt;
MutableObject<ILogicalOperator> base = new MutableObject<>(new EmptyTupleSourceOperator());
@@ -423,8 +423,7 @@
// astPathExpressions has at least one expression see CopyToStatement constructor
List<Expression> astPathExpressions = copyTo.getPathExpressions();
ILogicalExpression fullPathExpr = null;
- WriteDataSink writeDataSink;
- String separator = String.valueOf(ExternalWriterProvider.getSeparator(copyTo.getAdapter()));
+ String separator = getExternalWriterSeparator(copyTo.getAdapter());
List<Mutable<ILogicalExpression>> pathExprs = new ArrayList<>(astPathExpressions.size());
Pair<ILogicalExpression, Mutable<ILogicalOperator>> pathExprPair;
for (int i = 0; i < astPathExpressions.size(); i++) {
@@ -453,11 +452,25 @@
fullPathExpr = concat;
}
+ // Handle key
+ boolean autogenerated = copyTo.isAutogenerated();
+ List<Expression> astKeyExpressions = copyTo.getKeyExpressions();
+ List<Mutable<ILogicalExpression>> keyExpressionRefs = new ArrayList<>(astKeyExpressions.size());
+ for (int i = 0; i < copyTo.getKeyExpressions().size(); i++) {
+ Expression expression = astKeyExpressions.get(i);
+ Pair<ILogicalExpression, Mutable<ILogicalOperator>> expPair = langExprToAlgExpression(expression, topOpRef);
+ keyExpressionRefs.add(new MutableObject<>(expPair.first));
+ Pair<Mutable<ILogicalExpression>, Mutable<ILogicalOperator>> wrappedPair =
+ wrapInAssign(context.newVar(), expPair.first, expPair.second);
+ topOpRef = wrappedPair.second;
+ }
+
// Write adapter configuration
- writeDataSink = new WriteDataSink(copyTo.getAdapter(), copyTo.getProperties());
+ WriteDataSink writeDataSink = new WriteDataSink(copyTo.getAdapter(), copyTo.getProperties());
+
// writeOperator
WriteOperator writeOperator = new WriteOperator(sourceExprRef, new MutableObject<>(fullPathExpr),
- partitionExpressionRefs, orderExprListOut, writeDataSink);
+ partitionExpressionRefs, orderExprListOut, keyExpressionRefs, autogenerated, writeDataSink);
writeOperator.getInputs().add(topOpRef);
// We need DistributeResultOperator to ensure all warnings to be delivered to the user
@@ -470,6 +483,10 @@
return new ALogicalPlanImpl(globalPlanRoots);
}
+ protected String getExternalWriterSeparator(String adapter) {
+ return String.valueOf(ExternalWriterProvider.getSeparator(adapter));
+ }
+
public ILogicalPlan translate(Query expr, String outputDatasetName, ICompiledDmlStatement stmt,
ILogicalOperator baseOp, IResultMetadata resultMetadata) throws AlgebricksException {
MutableObject<ILogicalOperator> base = new MutableObject<>(new EmptyTupleSourceOperator());
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java
index 73a0aeb..e2c09eb 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java
@@ -28,8 +28,8 @@
import org.apache.asterix.cloud.clients.ICloudClient;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.asterix.runtime.writer.IExternalFilePrinter;
import org.apache.asterix.runtime.writer.IExternalFileWriter;
+import org.apache.asterix.runtime.writer.IExternalPrinter;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
import org.apache.hyracks.api.exceptions.SourceLocation;
@@ -39,7 +39,7 @@
import com.google.common.base.Utf8;
abstract class AbstractCloudExternalFileWriter implements IExternalFileWriter {
- private final IExternalFilePrinter printer;
+ private final IExternalPrinter printer;
private final ICloudClient cloudClient;
private final String bucket;
private final boolean partitionedPath;
@@ -48,7 +48,7 @@
private final IWriteBufferProvider bufferProvider;
private ICloudBufferedWriter bufferedWriter;
- AbstractCloudExternalFileWriter(IExternalFilePrinter printer, ICloudClient cloudClient, String bucket,
+ AbstractCloudExternalFileWriter(IExternalPrinter printer, ICloudClient cloudClient, String bucket,
boolean partitionedPath, IWarningCollector warningCollector, SourceLocation pathSourceLocation) {
this.printer = printer;
this.cloudClient = cloudClient;
@@ -118,7 +118,7 @@
if (isSdkException(e)) {
throw RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, e, getMessageOrToString(e));
}
- throw e;
+ throw HyracksDataException.create(e);
}
}
@@ -132,7 +132,7 @@
if (isSdkException(e)) {
throw RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, e, getMessageOrToString(e));
}
- throw e;
+ throw HyracksDataException.create(e);
}
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriter.java
index e896c05..30ae0fa 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriter.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriter.java
@@ -20,7 +20,7 @@
import org.apache.asterix.cloud.clients.ICloudClient;
import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.runtime.writer.IExternalFilePrinter;
+import org.apache.asterix.runtime.writer.IExternalPrinter;
import org.apache.hyracks.api.exceptions.IWarningCollector;
import org.apache.hyracks.api.exceptions.SourceLocation;
@@ -29,7 +29,7 @@
final class S3ExternalFileWriter extends AbstractCloudExternalFileWriter {
static int MAX_LENGTH_IN_BYTES = 1024;
- S3ExternalFileWriter(IExternalFilePrinter printer, ICloudClient cloudClient, String bucket, boolean partitionedPath,
+ S3ExternalFileWriter(IExternalPrinter printer, ICloudClient cloudClient, String bucket, boolean partitionedPath,
IWarningCollector warningCollector, SourceLocation pathSourceLocation) {
super(printer, cloudClient, bucket, partitionedPath, warningCollector, pathSourceLocation);
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
index 5036fc8..4477b1f 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
@@ -36,12 +36,12 @@
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.aws.s3.S3Utils;
-import org.apache.asterix.runtime.writer.ExternalFileWriterConfiguration;
-import org.apache.asterix.runtime.writer.IExternalFileFilterWriterFactoryProvider;
-import org.apache.asterix.runtime.writer.IExternalFilePrinter;
-import org.apache.asterix.runtime.writer.IExternalFilePrinterFactory;
+import org.apache.asterix.runtime.writer.ExternalWriterConfiguration;
import org.apache.asterix.runtime.writer.IExternalFileWriter;
import org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
+import org.apache.asterix.runtime.writer.IExternalFileWriterFactoryProvider;
+import org.apache.asterix.runtime.writer.IExternalPrinter;
+import org.apache.asterix.runtime.writer.IExternalPrinterFactory;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -59,24 +59,23 @@
private static final long serialVersionUID = 4551318140901866805L;
private static final Logger LOGGER = LogManager.getLogger();
static final char SEPARATOR = '/';
- public static final IExternalFileFilterWriterFactoryProvider PROVIDER =
- new IExternalFileFilterWriterFactoryProvider() {
- @Override
- public IExternalFileWriterFactory create(ExternalFileWriterConfiguration configuration) {
- return new S3ExternalFileWriterFactory(configuration);
- }
+ public static final IExternalFileWriterFactoryProvider PROVIDER = new IExternalFileWriterFactoryProvider() {
+ @Override
+ public IExternalFileWriterFactory create(ExternalWriterConfiguration configuration) {
+ return new S3ExternalFileWriterFactory(configuration);
+ }
- @Override
- public char getSeparator() {
- return SEPARATOR;
- }
- };
+ @Override
+ public char getSeparator() {
+ return SEPARATOR;
+ }
+ };
private final Map<String, String> configuration;
private final SourceLocation pathSourceLocation;
private final String staticPath;
private transient S3CloudClient cloudClient;
- private S3ExternalFileWriterFactory(ExternalFileWriterConfiguration externalConfig) {
+ private S3ExternalFileWriterFactory(ExternalWriterConfiguration externalConfig) {
configuration = externalConfig.getConfiguration();
pathSourceLocation = externalConfig.getPathSourceLocation();
staticPath = externalConfig.getStaticPath();
@@ -84,11 +83,11 @@
}
@Override
- public IExternalFileWriter createWriter(IHyracksTaskContext context, IExternalFilePrinterFactory printerFactory)
+ public IExternalFileWriter createWriter(IHyracksTaskContext context, IExternalPrinterFactory printerFactory)
throws HyracksDataException {
buildClient();
String bucket = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
- IExternalFilePrinter printer = printerFactory.createPrinter();
+ IExternalPrinter printer = printerFactory.createPrinter();
IWarningCollector warningCollector = context.getWarningCollector();
return new S3ExternalFileWriter(printer, cloudClient, bucket, staticPath == null, warningCollector,
pathSourceLocation);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriter.java
index a3a2f70..4166dde 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriter.java
@@ -25,19 +25,19 @@
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.asterix.runtime.writer.IExternalFilePrinter;
import org.apache.asterix.runtime.writer.IExternalFileWriter;
+import org.apache.asterix.runtime.writer.IExternalPrinter;
import org.apache.commons.io.FileUtils;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.data.std.api.IValueReference;
final class LocalFSExternalFileWriter implements IExternalFileWriter {
- private final IExternalFilePrinter printer;
+ private final IExternalPrinter printer;
private final ILocalFSValidator validator;
private final SourceLocation pathSourceLocation;
- LocalFSExternalFileWriter(IExternalFilePrinter printer, ILocalFSValidator validator,
+ LocalFSExternalFileWriter(IExternalPrinter printer, ILocalFSValidator validator,
SourceLocation pathSourceLocation) {
this.printer = printer;
this.validator = validator;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriterFactory.java
index 313757a..73f34f1 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriterFactory.java
@@ -23,11 +23,11 @@
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.asterix.runtime.writer.ExternalFileWriterConfiguration;
-import org.apache.asterix.runtime.writer.IExternalFileFilterWriterFactoryProvider;
-import org.apache.asterix.runtime.writer.IExternalFilePrinterFactory;
+import org.apache.asterix.runtime.writer.ExternalWriterConfiguration;
import org.apache.asterix.runtime.writer.IExternalFileWriter;
import org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
+import org.apache.asterix.runtime.writer.IExternalFileWriterFactoryProvider;
+import org.apache.asterix.runtime.writer.IExternalPrinterFactory;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -36,18 +36,17 @@
public final class LocalFSExternalFileWriterFactory implements IExternalFileWriterFactory {
private static final long serialVersionUID = 871685327574547749L;
private static final char SEPARATOR = File.separatorChar;
- public static final IExternalFileFilterWriterFactoryProvider PROVIDER =
- new IExternalFileFilterWriterFactoryProvider() {
- @Override
- public IExternalFileWriterFactory create(ExternalFileWriterConfiguration configuration) {
- return new LocalFSExternalFileWriterFactory(configuration);
- }
+ public static final IExternalFileWriterFactoryProvider PROVIDER = new IExternalFileWriterFactoryProvider() {
+ @Override
+ public IExternalFileWriterFactory create(ExternalWriterConfiguration configuration) {
+ return new LocalFSExternalFileWriterFactory(configuration);
+ }
- @Override
- public char getSeparator() {
- return SEPARATOR;
- }
- };
+ @Override
+ public char getSeparator() {
+ return SEPARATOR;
+ }
+ };
private static final ILocalFSValidator NO_OP_VALIDATOR = LocalFSExternalFileWriterFactory::noOpValidation;
private static final ILocalFSValidator VALIDATOR = LocalFSExternalFileWriterFactory::validate;
private final SourceLocation pathSourceLocation;
@@ -55,7 +54,7 @@
private final String staticPath;
private boolean validated;
- private LocalFSExternalFileWriterFactory(ExternalFileWriterConfiguration externalConfig) {
+ private LocalFSExternalFileWriterFactory(ExternalWriterConfiguration externalConfig) {
pathSourceLocation = externalConfig.getPathSourceLocation();
singleNodeCluster = externalConfig.isSingleNodeCluster();
staticPath = externalConfig.getStaticPath();
@@ -63,7 +62,7 @@
}
@Override
- public IExternalFileWriter createWriter(IHyracksTaskContext context, IExternalFilePrinterFactory printerFactory)
+ public IExternalFileWriter createWriter(IHyracksTaskContext context, IExternalPrinterFactory printerFactory)
throws HyracksDataException {
ILocalFSValidator validator = VALIDATOR;
if (staticPath != null) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinter.java
index 8f8c63a..57e7b58 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinter.java
@@ -22,12 +22,12 @@
import java.io.PrintStream;
import org.apache.asterix.external.writer.compressor.IExternalFileCompressStreamFactory;
-import org.apache.asterix.runtime.writer.IExternalFilePrinter;
+import org.apache.asterix.runtime.writer.IExternalPrinter;
import org.apache.hyracks.algebricks.data.IPrinter;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.IValueReference;
-final class TextualExternalFilePrinter implements IExternalFilePrinter {
+final class TextualExternalFilePrinter implements IExternalPrinter {
private final IPrinter printer;
private final IExternalFileCompressStreamFactory compressStreamFactory;
private TextualOutputStreamDelegate delegate;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinterFactory.java
index 6778532..e3d0a66 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFilePrinterFactory.java
@@ -19,23 +19,21 @@
package org.apache.asterix.external.writer.printer;
import org.apache.asterix.external.writer.compressor.IExternalFileCompressStreamFactory;
-import org.apache.asterix.runtime.writer.IExternalFilePrinter;
-import org.apache.asterix.runtime.writer.IExternalFilePrinterFactory;
+import org.apache.asterix.runtime.writer.IExternalPrinter;
import org.apache.hyracks.algebricks.data.IPrinterFactory;
-public class TextualExternalFilePrinterFactory implements IExternalFilePrinterFactory {
- private static final long serialVersionUID = 9155959967258587588L;
- private final IPrinterFactory printerFactory;
+public class TextualExternalFilePrinterFactory extends TextualExternalPrinterFactory {
+ private static final long serialVersionUID = 8971234908711234L;
private final IExternalFileCompressStreamFactory compressStreamFactory;
public TextualExternalFilePrinterFactory(IPrinterFactory printerFactory,
IExternalFileCompressStreamFactory compressStreamFactory) {
- this.printerFactory = printerFactory;
+ super(printerFactory);
this.compressStreamFactory = compressStreamFactory;
}
@Override
- public IExternalFilePrinter createPrinter() {
+ public IExternalPrinter createPrinter() {
return new TextualExternalFilePrinter(printerFactory.createPrinter(), compressStreamFactory);
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalPrinter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalPrinter.java
new file mode 100644
index 0000000..537af2e
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalPrinter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer.printer;
+
+import java.io.OutputStream;
+import java.io.PrintStream;
+
+import org.apache.asterix.runtime.writer.IExternalPrinter;
+import org.apache.hyracks.algebricks.data.IPrinter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+final class TextualExternalPrinter implements IExternalPrinter {
+ private final IPrinter printer;
+ private TextualOutputStreamDelegate delegate;
+ private PrintStream printStream;
+
+ TextualExternalPrinter(IPrinter printer) {
+ this.printer = printer;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ printer.init();
+ }
+
+ @Override
+ public void newStream(OutputStream outputStream) {
+ delegate = new TextualOutputStreamDelegate(outputStream);
+ printStream = new PrintStream(delegate);
+ }
+
+ @Override
+ public void print(IValueReference value) throws HyracksDataException {
+ printer.print(value.getByteArray(), value.getStartOffset(), value.getLength(), printStream);
+ delegate.checkError();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (printStream != null) {
+ printStream.close();
+ printStream = null;
+ delegate.checkError();
+ delegate = null;
+ }
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalPrinterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalPrinterFactory.java
new file mode 100644
index 0000000..d779793c
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalPrinterFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer.printer;
+
+import org.apache.asterix.runtime.writer.IExternalPrinter;
+import org.apache.asterix.runtime.writer.IExternalPrinterFactory;
+import org.apache.hyracks.algebricks.data.IPrinterFactory;
+
+public class TextualExternalPrinterFactory implements IExternalPrinterFactory {
+ private static final long serialVersionUID = 9155959967258587588L;
+ protected final IPrinterFactory printerFactory;
+
+ public TextualExternalPrinterFactory(IPrinterFactory printerFactory) {
+ this.printerFactory = printerFactory;
+ }
+
+ @Override
+ public IExternalPrinter createPrinter() {
+ return new TextualExternalPrinter(printerFactory.createPrinter());
+ }
+}
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CopyToStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CopyToStatement.java
index 2520755..599d528 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CopyToStatement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CopyToStatement.java
@@ -43,7 +43,7 @@
private final Map<Integer, VariableExpr> partitionsVariables;
private final List<OrderbyClause.OrderModifier> orderByModifiers;
private final List<OrderbyClause.NullOrderModifier> orderByNullModifierList;
- private final List<Expression> keyExpressions;
+ private List<Expression> keyExpressions;
private final boolean autogenerated;
private Namespace namespace;
private Query query;
@@ -89,7 +89,7 @@
this.orderByModifiers = orderByModifiers;
this.orderByNullModifierList = orderByNullModifierList;
this.varCounter = varCounter;
- this.keyExpressions = keyExpressions;
+ this.keyExpressions = keyExpressions != null ? keyExpressions : new ArrayList<>();
this.autogenerated = autogenerated;
if (pathExpressions.isEmpty()) {
@@ -214,6 +214,7 @@
topLevelExpressions.addAll(pathExpressions);
topLevelExpressions.addAll(partitionExpressions);
topLevelExpressions.addAll(orderByList);
+ topLevelExpressions.addAll(keyExpressions);
return topLevelExpressions;
}
@@ -231,15 +232,15 @@
return keyExpressions;
}
+ public void setKeyExpressions(List<Expression> keyExpressions) {
+ this.keyExpressions = keyExpressions;
+ }
+
public boolean isAutogenerated() {
return autogenerated;
}
- public boolean isSinkFileStore() {
+ public boolean isFileStoreSink() {
return keyExpressions.isEmpty() && !autogenerated;
}
-
- public boolean isSinkDatabaseWithKey() {
- return !keyExpressions.isEmpty() || autogenerated;
- }
}
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/AbstractInlineUdfsVisitor.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/AbstractInlineUdfsVisitor.java
index 8c7b915..3975425 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/AbstractInlineUdfsVisitor.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/AbstractInlineUdfsVisitor.java
@@ -302,6 +302,10 @@
changed |= order.first;
stmtCopy.setOrderByList(order.second);
+ Pair<Boolean, List<Expression>> key = inlineUdfsInExprList(stmtCopy.getKeyExpressions());
+ changed |= key.first;
+ stmtCopy.setKeyExpressions(key.second);
+
return changed;
}
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
index 52e2678..6151b02 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
@@ -573,12 +573,10 @@
cto.getSourceVariable().accept(this, step);
out.println();
- if (cto.isSinkFileStore()) {
+ if (cto.isFileStoreSink()) {
formatPrintCopyToFileStore(cto, step);
- } else if (cto.isSinkDatabaseWithKey()) {
- formatPrintCopyToDatabaseWithKey(cto, step);
} else {
- throw new IllegalStateException("NYI: This should never happen");
+ formatPrintCopyToDatabaseWithKey(cto, step);
}
out.println("with ");
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/AbstractSqlppExpressionScopingVisitor.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/AbstractSqlppExpressionScopingVisitor.java
index e3e2484..a7e1f90 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/AbstractSqlppExpressionScopingVisitor.java
+++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/AbstractSqlppExpressionScopingVisitor.java
@@ -437,6 +437,9 @@
// Visit path exprs
stmtCopy.setPathExpressions(visit(stmtCopy.getPathExpressions(), stmtCopy));
+ // Visit key exprs
+ stmtCopy.setKeyExpressions(visit(stmtCopy.getKeyExpressions(), stmtCopy));
+
return null;
}
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/AbstractSqlppSimpleExpressionVisitor.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/AbstractSqlppSimpleExpressionVisitor.java
index f3d2675..847393c 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/AbstractSqlppSimpleExpressionVisitor.java
+++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/base/AbstractSqlppSimpleExpressionVisitor.java
@@ -364,6 +364,7 @@
stmtCopy.setPathExpressions(visit(stmtCopy.getPathExpressions(), arg));
stmtCopy.setPartitionExpressions(visit(stmtCopy.getPartitionExpressions(), arg));
stmtCopy.setOrderByList(visit(stmtCopy.getOrderByList(), arg));
+ stmtCopy.setKeyExpressions(visit(stmtCopy.getKeyExpressions(), arg));
return null;
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 8d8ca80..340eb0a 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -106,9 +106,9 @@
import org.apache.asterix.runtime.operators.LSMSecondaryInsertDeleteWithNestedPlanOperatorDescriptor;
import org.apache.asterix.runtime.operators.LSMSecondaryUpsertOperatorDescriptor;
import org.apache.asterix.runtime.operators.LSMSecondaryUpsertWithNestedPlanOperatorDescriptor;
-import org.apache.asterix.runtime.writer.ExternalWriterFactory;
-import org.apache.asterix.runtime.writer.IExternalFilePrinterFactory;
+import org.apache.asterix.runtime.writer.ExternalFileWriterFactory;
import org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
+import org.apache.asterix.runtime.writer.IExternalPrinterFactory;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -761,8 +761,8 @@
fileWriterFactory.validate();
String fileExtension = ExternalWriterProvider.getFileExtension(sink);
int maxResult = ExternalWriterProvider.getMaxResult(sink);
- IExternalFilePrinterFactory printerFactory = ExternalWriterProvider.createPrinter(sink, sourceType);
- ExternalWriterFactory writerFactory = new ExternalWriterFactory(fileWriterFactory, printerFactory,
+ IExternalPrinterFactory printerFactory = ExternalWriterProvider.createPrinter(sink, sourceType);
+ ExternalFileWriterFactory writerFactory = new ExternalFileWriterFactory(fileWriterFactory, printerFactory,
fileExtension, maxResult, dynamicPathEvalFactory, staticPath, pathSourceLocation);
SinkExternalWriterRuntimeFactory runtime = new SinkExternalWriterRuntimeFactory(sourceColumn, partitionColumns,
partitionComparatorFactories, inputDesc, writerFactory);
@@ -770,6 +770,13 @@
}
@Override
+ public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteDatabaseWithKeyRuntime(int sourceColumn,
+ IScalarEvaluatorFactory[] keyEvaluatorFactories, boolean autogenerated, IWriteDataSink sink,
+ RecordDescriptor inputDesc, Object sourceType) throws AlgebricksException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink,
int[] printColumns, IPrinterFactory[] printerFactories, IAWriterFactory writerFactory,
IResultSerializerFactoryProvider resultSerializerFactoryProvider, RecordDescriptor inputDesc,
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
index 9142556..2d77a94 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
@@ -30,17 +30,17 @@
import org.apache.asterix.external.writer.compressor.NoOpExternalFileCompressStreamFactory;
import org.apache.asterix.external.writer.printer.TextualExternalFilePrinterFactory;
import org.apache.asterix.formats.nontagged.CleanJSONPrinterFactoryProvider;
-import org.apache.asterix.runtime.writer.ExternalFileWriterConfiguration;
-import org.apache.asterix.runtime.writer.IExternalFileFilterWriterFactoryProvider;
-import org.apache.asterix.runtime.writer.IExternalFilePrinterFactory;
+import org.apache.asterix.runtime.writer.ExternalWriterConfiguration;
import org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
+import org.apache.asterix.runtime.writer.IExternalFileWriterFactoryProvider;
+import org.apache.asterix.runtime.writer.IExternalPrinterFactory;
import org.apache.hyracks.algebricks.core.algebra.metadata.IWriteDataSink;
import org.apache.hyracks.algebricks.data.IPrinterFactory;
import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.control.cc.ClusterControllerService;
public class ExternalWriterProvider {
- private static final Map<String, IExternalFileFilterWriterFactoryProvider> CREATOR_MAP;
+ private static final Map<String, IExternalFileWriterFactoryProvider> CREATOR_MAP;
private static final Map<String, IExternalFileCompressStreamFactory> STREAM_COMPRESSORS;
private ExternalWriterProvider() {
@@ -59,7 +59,7 @@
public static IExternalFileWriterFactory createWriterFactory(ICcApplicationContext appCtx, IWriteDataSink sink,
String staticPath, SourceLocation pathExpressionLocation) {
String adapterName = sink.getAdapterName().toLowerCase();
- IExternalFileFilterWriterFactoryProvider creator = CREATOR_MAP.get(adapterName);
+ IExternalFileWriterFactoryProvider creator = CREATOR_MAP.get(adapterName);
if (creator == null) {
throw new UnsupportedOperationException("Unsupported adapter " + adapterName);
@@ -83,12 +83,12 @@
return Integer.parseInt(maxResultString);
}
- private static ExternalFileWriterConfiguration createConfiguration(ICcApplicationContext appCtx,
- IWriteDataSink sink, String staticPath, SourceLocation pathExpressionLocation) {
+ private static ExternalWriterConfiguration createConfiguration(ICcApplicationContext appCtx, IWriteDataSink sink,
+ String staticPath, SourceLocation pathExpressionLocation) {
Map<String, String> params = sink.getConfiguration();
boolean singleNodeCluster = isSingleNodeCluster(appCtx);
- return new ExternalFileWriterConfiguration(params, pathExpressionLocation, staticPath, singleNodeCluster);
+ return new ExternalWriterConfiguration(params, pathExpressionLocation, staticPath, singleNodeCluster);
}
private static boolean isSingleNodeCluster(ICcApplicationContext appCtx) {
@@ -96,8 +96,8 @@
return ccs.getNodeManager().getIpAddressNodeNameMap().size() == 1;
}
- private static void addCreator(String adapterName, IExternalFileFilterWriterFactoryProvider creator) {
- IExternalFileFilterWriterFactoryProvider registeredCreator = CREATOR_MAP.get(adapterName.toLowerCase());
+ private static void addCreator(String adapterName, IExternalFileWriterFactoryProvider creator) {
+ IExternalFileWriterFactoryProvider registeredCreator = CREATOR_MAP.get(adapterName.toLowerCase());
if (registeredCreator != null) {
throw new IllegalStateException(
"Adapter " + adapterName + " is registered to " + registeredCreator.getClass().getName());
@@ -105,7 +105,7 @@
CREATOR_MAP.put(adapterName.toLowerCase(), creator);
}
- public static IExternalFilePrinterFactory createPrinter(IWriteDataSink sink, Object sourceType) {
+ public static IExternalPrinterFactory createPrinter(IWriteDataSink sink, Object sourceType) {
Map<String, String> configuration = sink.getConfiguration();
String format = configuration.get(ExternalDataConstants.KEY_FORMAT);
@@ -131,7 +131,7 @@
}
public static char getSeparator(String adapterName) {
- IExternalFileFilterWriterFactoryProvider creator = CREATOR_MAP.get(adapterName.toLowerCase());
+ IExternalFileWriterFactoryProvider creator = CREATOR_MAP.get(adapterName.toLowerCase());
if (creator == null) {
throw new UnsupportedOperationException("Unsupported adapter " + adapterName);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/DynamicPathResolver.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/DynamicPathResolver.java
index 7105efa..c6d1dbe 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/DynamicPathResolver.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/DynamicPathResolver.java
@@ -54,7 +54,7 @@
@Override
public String getPartitionDirectory(IFrameTupleReference tuple) throws HyracksDataException {
if (!appendPrefix(tuple)) {
- return ExternalWriter.UNRESOLVABLE_PATH;
+ return ExternalFileWriter.UNRESOLVABLE_PATH;
}
if (dirStringBuilder.length() > 0 && dirStringBuilder.charAt(dirStringBuilder.length() - 1) != fileSeparator) {
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriter.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriter.java
similarity index 94%
rename from asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriter.java
rename to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriter.java
index 5fc07af..f9f98da 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriter.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriter.java
@@ -23,7 +23,7 @@
import org.apache.hyracks.data.std.api.IValueReference;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-final class ExternalWriter implements IExternalWriter {
+final class ExternalFileWriter implements IExternalWriter {
static final String UNRESOLVABLE_PATH = "UNRESOLVABLE_PATH";
private final IPathResolver pathResolver;
private final IExternalFileWriter writer;
@@ -31,7 +31,7 @@
private String partitionPath;
private int tupleCounter;
- public ExternalWriter(IPathResolver pathResolver, IExternalFileWriter writer, int maxResultPerFile) {
+ public ExternalFileWriter(IPathResolver pathResolver, IExternalFileWriter writer, int maxResultPerFile) {
this.pathResolver = pathResolver;
this.writer = writer;
this.maxResultPerFile = maxResultPerFile;
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriterFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriterFactory.java
similarity index 90%
rename from asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriterFactory.java
rename to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriterFactory.java
index e7c0db0..5981584 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriterFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriterFactory.java
@@ -28,17 +28,17 @@
import org.apache.hyracks.api.exceptions.IWarningCollector;
import org.apache.hyracks.api.exceptions.SourceLocation;
-public class ExternalWriterFactory implements IExternalWriterFactory {
+public class ExternalFileWriterFactory implements IExternalWriterFactory {
private static final long serialVersionUID = 1412969574113419638L;
private final IExternalFileWriterFactory writerFactory;
- private final IExternalFilePrinterFactory printerFactory;
+ private final IExternalPrinterFactory printerFactory;
private final String fileExtension;
private final int maxResult;
private final IScalarEvaluatorFactory pathEvalFactory;
private final String staticPath;
private final SourceLocation pathSourceLocation;
- public ExternalWriterFactory(IExternalFileWriterFactory writerFactory, IExternalFilePrinterFactory printerFactory,
+ public ExternalFileWriterFactory(IExternalFileWriterFactory writerFactory, IExternalPrinterFactory printerFactory,
String fileExtension, int maxResult, IScalarEvaluatorFactory pathEvalFactory, String staticPath,
SourceLocation pathSourceLocation) {
this.writerFactory = writerFactory;
@@ -65,6 +65,6 @@
resolver = new StaticPathResolver(fileExtension, fileSeparator, partition, staticPath);
}
IExternalFileWriter writer = writerFactory.createWriter(context, printerFactory);
- return new ExternalWriter(resolver, writer, maxResult);
+ return new ExternalFileWriter(resolver, writer, maxResult);
}
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriterConfiguration.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriterConfiguration.java
similarity index 91%
rename from asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriterConfiguration.java
rename to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriterConfiguration.java
index b62a07a..fbb05ee 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriterConfiguration.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriterConfiguration.java
@@ -22,13 +22,13 @@
import org.apache.hyracks.api.exceptions.SourceLocation;
-public final class ExternalFileWriterConfiguration {
+public final class ExternalWriterConfiguration {
private final Map<String, String> configuration;
private final SourceLocation pathSourceLocation;
private final String staticPath;
private final boolean singleNodeCluster;
- public ExternalFileWriterConfiguration(Map<String, String> configuration, SourceLocation pathSourceLocation,
+ public ExternalWriterConfiguration(Map<String, String> configuration, SourceLocation pathSourceLocation,
String staticPath, boolean singleNodeCluster) {
this.configuration = configuration;
this.pathSourceLocation = pathSourceLocation;
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriterFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriterFactory.java
index d8f1f84..31dba79 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriterFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriterFactory.java
@@ -20,15 +20,14 @@
import java.io.Serializable;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
/**
* An interface for writing to a storage device
- * Implementer should also provide a singleton to {@link IExternalFileFilterWriterFactoryProvider}
+ * Implementer should also provide a singleton to {@link IExternalFileWriterFactoryProvider}
*/
-public interface IExternalFileWriterFactory extends Serializable {
+public interface IExternalFileWriterFactory extends IExternalWriterFactoryValidator, Serializable {
/**
* Create a writer
*
@@ -36,16 +35,11 @@
* @param printerFactory printer factory for writing the final result
* @return a new file writer
*/
- IExternalFileWriter createWriter(IHyracksTaskContext context, IExternalFilePrinterFactory printerFactory)
+ IExternalFileWriter createWriter(IHyracksTaskContext context, IExternalPrinterFactory printerFactory)
throws HyracksDataException;
/**
* @return file (or path) separator
*/
char getSeparator();
-
- /**
- * Validate the writer by running a test write routine to ensure the writer has the appropriate permissions
- */
- void validate() throws AlgebricksException;
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileFilterWriterFactoryProvider.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriterFactoryProvider.java
similarity index 85%
rename from asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileFilterWriterFactoryProvider.java
rename to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriterFactoryProvider.java
index 7a863f7..eabd2cb 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileFilterWriterFactoryProvider.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFileWriterFactoryProvider.java
@@ -18,8 +18,8 @@
*/
package org.apache.asterix.runtime.writer;
-public interface IExternalFileFilterWriterFactoryProvider {
- IExternalFileWriterFactory create(ExternalFileWriterConfiguration configuration);
+public interface IExternalFileWriterFactoryProvider {
+ IExternalFileWriterFactory create(ExternalWriterConfiguration configuration);
char getSeparator();
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinter.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalPrinter.java
similarity index 97%
rename from asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinter.java
rename to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalPrinter.java
index ba5fa1d..54fd152 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinter.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalPrinter.java
@@ -26,7 +26,7 @@
/**
* An {@link IExternalFileWriter} printer
*/
-public interface IExternalFilePrinter {
+public interface IExternalPrinter {
/**
* Open the printer
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinterFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalPrinterFactory.java
similarity index 86%
rename from asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinterFactory.java
rename to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalPrinterFactory.java
index a4fa97b..4d9352a 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinterFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalPrinterFactory.java
@@ -23,9 +23,9 @@
/**
* {@link IExternalFileWriter} printer factory
*/
-public interface IExternalFilePrinterFactory extends Serializable {
+public interface IExternalPrinterFactory extends Serializable {
/**
- * @return a new external file printer
+ * @return a new external printer
*/
- IExternalFilePrinter createPrinter();
+ IExternalPrinter createPrinter();
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinterFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalWriterFactoryValidator.java
similarity index 76%
copy from asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinterFactory.java
copy to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalWriterFactoryValidator.java
index a4fa97b..4a75db6 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinterFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalWriterFactoryValidator.java
@@ -18,14 +18,11 @@
*/
package org.apache.asterix.runtime.writer;
-import java.io.Serializable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-/**
- * {@link IExternalFileWriter} printer factory
- */
-public interface IExternalFilePrinterFactory extends Serializable {
+public interface IExternalWriterFactoryValidator {
/**
- * @return a new external file printer
+ * Perform the necessary validation to ensure the writer has the proper permissions
*/
- IExternalFilePrinter createPrinter();
+ void validate() throws AlgebricksException;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index 2072dee..5240e0c 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -65,6 +65,10 @@
SourceLocation pathSourceLocation, IWriteDataSink sink, RecordDescriptor inputDesc, Object sourceType)
throws AlgebricksException;
+ Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteDatabaseWithKeyRuntime(int sourceColumn,
+ IScalarEvaluatorFactory[] keyEvaluatorFactories, boolean autogenerated, IWriteDataSink sink,
+ RecordDescriptor inputDesc, Object sourceType) throws AlgebricksException;
+
Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink, int[] printColumns,
IPrinterFactory[] printerFactories, IAWriterFactory writerFactory,
IResultSerializerFactoryProvider resultSerializerFactoryProvider, RecordDescriptor inputDesc,
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WriteOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WriteOperator.java
index 7eef90e..89a5148 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WriteOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/WriteOperator.java
@@ -41,17 +41,21 @@
private final Mutable<ILogicalExpression> pathExpression;
private final List<Mutable<ILogicalExpression>> partitionExpressions;
private final List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> orderExpressions;
+ private final List<Mutable<ILogicalExpression>> keyExpressions;
+ private final boolean autogenerated;
private final IWriteDataSink writeDataSink;
public WriteOperator(Mutable<ILogicalExpression> sourceExpression, Mutable<ILogicalExpression> pathExpression,
List<Mutable<ILogicalExpression>> partitionExpressions,
List<Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>> orderExpressions,
- IWriteDataSink writeDataSink) {
+ List<Mutable<ILogicalExpression>> keyExpressions, boolean autogenerated, IWriteDataSink writeDataSink) {
this.sourceExpression = sourceExpression;
this.pathExpression = pathExpression;
this.partitionExpressions = partitionExpressions;
this.orderExpressions = orderExpressions;
this.writeDataSink = writeDataSink;
+ this.keyExpressions = keyExpressions;
+ this.autogenerated = autogenerated;
}
public Mutable<ILogicalExpression> getSourceExpression() {
@@ -74,6 +78,18 @@
return orderExpressions;
}
+ public List<Mutable<ILogicalExpression>> getKeyExpressions() {
+ return keyExpressions;
+ }
+
+ public List<LogicalVariable> getKeyVariables() {
+ List<LogicalVariable> keyVariables = new ArrayList<>();
+ for (Mutable<ILogicalExpression> keyExpression : keyExpressions) {
+ keyVariables.add(VariableUtilities.getVariable(keyExpression.getValue()));
+ }
+ return keyVariables;
+ }
+
public List<LogicalVariable> getPartitionVariables() {
List<LogicalVariable> partitionVariables = new ArrayList<>();
for (Mutable<ILogicalExpression> partitionExpression : partitionExpressions) {
@@ -92,10 +108,18 @@
return orderColumns;
}
+ public boolean getAutogenerated() {
+ return autogenerated;
+ }
+
public IWriteDataSink getWriteDataSink() {
return writeDataSink;
}
+ public boolean isFileStoreSink() {
+ return keyExpressions.isEmpty() && !autogenerated;
+ }
+
@Override
public LogicalOperatorTag getOperatorTag() {
return LogicalOperatorTag.WRITE;
@@ -119,6 +143,10 @@
changed |= visitor.transform(orderExpressionPair.second);
}
+ for (Mutable<ILogicalExpression> expression : keyExpressions) {
+ changed |= visitor.transform(expression);
+ }
+
return changed;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
index fa47ae5..8664acf 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
@@ -297,9 +297,11 @@
deepCopyExpressionRefs(new ArrayList<>(), op.getPartitionExpressions());
List<Pair<IOrder, Mutable<ILogicalExpression>>> newOrderPairExpressions =
deepCopyOrderAndExpression(op.getOrderExpressions());
+ List<Mutable<ILogicalExpression>> newKeyPairExpressions =
+ deepCopyExpressionRefs(new ArrayList<>(), op.getKeyExpressions());
IWriteDataSink writeDataSink = op.getWriteDataSink().createCopy();
return new WriteOperator(newSourceExpression, newPathExpression, newPartitionExpressions,
- newOrderPairExpressions, writeDataSink);
+ newOrderPairExpressions, newKeyPairExpressions, op.getAutogenerated(), writeDataSink);
}
@Override
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index d0b0608..50401c3 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -389,6 +389,7 @@
for (Pair<IOrder, Mutable<ILogicalExpression>> orderExpr : op.getOrderExpressions()) {
substUsedVariablesInExpr(orderExpr.second, pair.first, pair.second);
}
+ substUsedVariablesInExpr(op.getKeyExpressions(), pair.first, pair.second);
return null;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index d7b2555..7186f7e 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -362,6 +362,10 @@
for (Pair<IOrder, Mutable<ILogicalExpression>> orderExpr : op.getOrderExpressions()) {
orderExpr.second.getValue().getUsedVariables(usedVariables);
}
+
+ for (Mutable<ILogicalExpression> expr : op.getKeyExpressions()) {
+ expr.getValue().getUsedVariables(usedVariables);
+ }
return null;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
index d462cd5..71f5876 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
@@ -23,6 +23,7 @@
import java.util.Collections;
import java.util.List;
+import org.apache.commons.lang3.mutable.Mutable;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.ListSet;
@@ -61,12 +62,16 @@
private final LogicalVariable sourceVariable;
private final List<LogicalVariable> partitionVariables;
private final List<OrderColumn> orderColumns;
+ private final List<LogicalVariable> keyVariables;
+ private final boolean autogenerated;
public SinkWritePOperator(LogicalVariable sourceVariable, List<LogicalVariable> partitionVariables,
- List<OrderColumn> orderColumns) {
+ List<OrderColumn> orderColumns, List<LogicalVariable> keyVariables, boolean autogenerated) {
this.sourceVariable = sourceVariable;
this.partitionVariables = partitionVariables;
this.orderColumns = orderColumns;
+ this.keyVariables = keyVariables;
+ this.autogenerated = autogenerated;
}
@Override
@@ -145,6 +150,16 @@
IBinaryComparatorFactory[] partitionComparatorFactories =
JobGenHelper.variablesToAscBinaryComparatorFactories(partitionVariables, typeEnv, context);
+ // Key expressions
+ IScalarEvaluatorFactory[] keyEvalFactories = new IScalarEvaluatorFactory[write.getKeyExpressions().size()];
+ List<Mutable<ILogicalExpression>> keyExpressions = write.getKeyExpressions();
+ if (!keyExpressions.isEmpty()) {
+ for (int i = 0; i < keyExpressions.size(); i++) {
+ ILogicalExpression keyExpr = keyExpressions.get(i).getValue();
+ keyEvalFactories[i] = runtimeProvider.createEvaluatorFactory(keyExpr, typeEnv, inputSchemas, context);
+ }
+ }
+
RecordDescriptor recDesc =
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
RecordDescriptor inputDesc = JobGenHelper.mkRecordDescriptor(
@@ -152,9 +167,17 @@
IMetadataProvider<?, ?> mp = context.getMetadataProvider();
- Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> runtimeAndConstraints = mp.getWriteFileRuntime(
- sourceColumn, partitionColumns, partitionComparatorFactories, dynamicPathEvalFactory, staticPathExpr,
- pathExpr.getSourceLocation(), writeDataSink, inputDesc, typeEnv.getVarType(sourceVariable));
+ Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> runtimeAndConstraints;
+ if (write.isFileStoreSink()) {
+ runtimeAndConstraints = mp.getWriteFileRuntime(sourceColumn, partitionColumns, partitionComparatorFactories,
+ dynamicPathEvalFactory, staticPathExpr, pathExpr.getSourceLocation(), writeDataSink, inputDesc,
+ typeEnv.getVarType(sourceVariable));
+
+ } else {
+ runtimeAndConstraints = mp.getWriteDatabaseWithKeyRuntime(sourceColumn, keyEvalFactories, autogenerated,
+ writeDataSink, inputDesc, typeEnv.getVarType(sourceVariable));
+ }
+
IPushRuntimeFactory runtime = runtimeAndConstraints.first;
runtime.setSourceLocation(write.getSourceLocation());
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index f945994..3ea2631 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -408,7 +408,9 @@
}
ensureAllVariables(op.getPartitionExpressions(), v -> v);
ensureAllVariables(op.getOrderExpressions(), Pair::getSecond);
- return new SinkWritePOperator(op.getSourceVariable(), op.getPartitionVariables(), op.getOrderColumns());
+ ensureAllVariables(op.getKeyExpressions(), v -> v);
+ return new SinkWritePOperator(op.getSourceVariable(), op.getPartitionVariables(), op.getOrderColumns(),
+ op.getKeyVariables(), op.getAutogenerated());
}
@Override
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinterFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/IWriterPartitioner.java
similarity index 70%
copy from asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinterFactory.java
copy to hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/IWriterPartitioner.java
index a4fa97b..cb72aec 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalFilePrinterFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/IWriterPartitioner.java
@@ -16,16 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.runtime.writer;
+package org.apache.hyracks.algebricks.runtime.operators.writer;
-import java.io.Serializable;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-/**
- * {@link IExternalFileWriter} printer factory
- */
-public interface IExternalFilePrinterFactory extends Serializable {
- /**
- * @return a new external file printer
- */
- IExternalFilePrinter createPrinter();
-}
+interface IWriterPartitioner {
+ boolean isNewPartition(FrameTupleAccessor tupleAccessor, int index) throws HyracksDataException;
+
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/KeyWriterPartitioner.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/KeyWriterPartitioner.java
new file mode 100644
index 0000000..3cb44ff
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/KeyWriterPartitioner.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.runtime.operators.writer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+class KeyWriterPartitioner implements IWriterPartitioner {
+ public static final IWriterPartitioner INSTANCE = new KeyWriterPartitioner();
+
+ private KeyWriterPartitioner() {
+ }
+
+ @Override
+ public boolean isNewPartition(FrameTupleAccessor tupleAccessor, int index) throws HyracksDataException {
+ // Every key is a partition
+ return true;
+ }
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/NoOpWriterPartitioner.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/NoOpWriterPartitioner.java
new file mode 100644
index 0000000..3221d73
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/NoOpWriterPartitioner.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.runtime.operators.writer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+class NoOpWriterPartitioner implements IWriterPartitioner {
+ private boolean first = true;
+
+ public NoOpWriterPartitioner() {
+ }
+
+ @Override
+ public boolean isNewPartition(FrameTupleAccessor tupleAccessor, int index) throws HyracksDataException {
+ if (first) {
+ first = false;
+ return true;
+ }
+ return false;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntime.java
index 01e137b..9407c08 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntime.java
@@ -23,44 +23,30 @@
import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputSinkPushRuntime;
import org.apache.hyracks.algebricks.runtime.writers.IExternalWriter;
import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.IPointable;
import org.apache.hyracks.data.std.primitive.VoidPointable;
-import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-import org.apache.hyracks.dataflow.common.data.accessors.PermutingFrameTupleReference;
-import org.apache.hyracks.dataflow.common.data.accessors.PointableTupleReference;
-import org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupWriter;
final class SinkExternalWriterRuntime extends AbstractOneInputSinkPushRuntime {
private final int sourceColumn;
- private final int[] partitionColumns;
+ private final IWriterPartitioner partitioner;
private final IPointable sourceValue;
- private final PointableTupleReference partitionColumnsPrevCopy;
- private final PermutingFrameTupleReference partitionColumnsRef;
- private final IBinaryComparator[] partitionComparators;
private final IExternalWriter writer;
private FrameTupleAccessor tupleAccessor;
private FrameTupleReference tupleRef;
- private boolean first;
private IFrameWriter frameWriter;
- SinkExternalWriterRuntime(int sourceColumn, int[] partitionColumns, IBinaryComparator[] partitionComparators,
- RecordDescriptor inputRecordDesc, IExternalWriter writer) {
+ SinkExternalWriterRuntime(int sourceColumn, IWriterPartitioner partitioner, RecordDescriptor inputRecordDesc,
+ IExternalWriter writer) {
this.sourceColumn = sourceColumn;
- this.partitionColumns = partitionColumns;
+ this.partitioner = partitioner;
this.sourceValue = new VoidPointable();
- partitionColumnsRef = new PermutingFrameTupleReference(partitionColumns);
- partitionColumnsPrevCopy =
- PointableTupleReference.create(partitionColumns.length, ArrayBackedValueStorage::new);
- this.partitionComparators = partitionComparators;
this.inputRecordDesc = inputRecordDesc;
this.writer = writer;
- first = true;
}
@Override
@@ -83,8 +69,6 @@
}
setValue(tupleRef, sourceColumn, sourceValue);
writer.write(sourceValue);
- partitionColumnsRef.reset(tupleAccessor, i);
- partitionColumnsPrevCopy.set(partitionColumnsRef);
}
}
@@ -106,13 +90,7 @@
}
private boolean isNewPartition(int index) throws HyracksDataException {
- if (first) {
- first = false;
- return true;
- }
-
- return !PreclusteredGroupWriter.sameGroup(partitionColumnsPrevCopy, tupleAccessor, index, partitionColumns,
- partitionComparators);
+ return partitioner.isNewPartition(tupleAccessor, index);
}
private void setValue(IFrameTupleReference tuple, int column, IPointable value) {
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntimeFactory.java
index 6220dec..321828f 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntimeFactory.java
@@ -31,17 +31,30 @@
public final class SinkExternalWriterRuntimeFactory extends AbstractPushRuntimeFactory {
private static final long serialVersionUID = -2215789207336628581L;
private final int sourceColumn;
- private final int[] partitionColumn;
+ private final int[] partitionColumns;
private final IBinaryComparatorFactory[] partitionComparatorFactories;
+ private final boolean partitionByKey;
private final RecordDescriptor inputRecordDescriptor;
private final IExternalWriterFactory writerFactory;
- public SinkExternalWriterRuntimeFactory(int sourceColumn, int[] partitionColumn,
+ public SinkExternalWriterRuntimeFactory(int sourceColumn, int[] partitionColumns,
IBinaryComparatorFactory[] partitionComparatorFactories, RecordDescriptor inputRecordDescriptor,
IExternalWriterFactory writerFactory) {
+ this(sourceColumn, partitionColumns, partitionComparatorFactories, false, inputRecordDescriptor, writerFactory);
+ }
+
+ public SinkExternalWriterRuntimeFactory(int sourceColumn, RecordDescriptor inputRecordDescriptor,
+ IExternalWriterFactory writerFactory) {
+ this(sourceColumn, null, null, true, inputRecordDescriptor, writerFactory);
+ }
+
+ private SinkExternalWriterRuntimeFactory(int sourceColumn, int[] partitionColumns,
+ IBinaryComparatorFactory[] partitionComparatorFactories, boolean partitionByKey,
+ RecordDescriptor inputRecordDescriptor, IExternalWriterFactory writerFactory) {
this.sourceColumn = sourceColumn;
- this.partitionColumn = partitionColumn;
+ this.partitionColumns = partitionColumns;
this.partitionComparatorFactories = partitionComparatorFactories;
+ this.partitionByKey = partitionByKey;
this.inputRecordDescriptor = inputRecordDescriptor;
this.writerFactory = writerFactory;
}
@@ -49,12 +62,33 @@
@Override
public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException {
IExternalWriter writer = writerFactory.createWriter(ctx);
- IBinaryComparator[] partitionComparators = new IBinaryComparator[partitionComparatorFactories.length];
- for (int i = 0; i < partitionComparatorFactories.length; i++) {
- partitionComparators[i] = partitionComparatorFactories[i].createBinaryComparator();
- }
- SinkExternalWriterRuntime runtime = new SinkExternalWriterRuntime(sourceColumn, partitionColumn,
- partitionComparators, inputRecordDescriptor, writer);
+ SinkExternalWriterRuntime runtime =
+ new SinkExternalWriterRuntime(sourceColumn, createPartitioner(), inputRecordDescriptor, writer);
return new IPushRuntime[] { runtime };
}
+
+ /**
+ * Creates the writer partitioner based on the provided parameters
+ *
+ * @return writer partitioner
+ */
+ private IWriterPartitioner createPartitioner() {
+ // key writer partitioner
+ if (partitionByKey) {
+ return KeyWriterPartitioner.INSTANCE;
+ }
+
+ // writer partitioner
+ if (partitionColumns.length > 0) {
+ IBinaryComparator[] comparators = new IBinaryComparator[partitionComparatorFactories.length];
+ for (int i = 0; i < partitionComparatorFactories.length; i++) {
+ comparators[i] = partitionComparatorFactories[i].createBinaryComparator();
+ }
+
+ return new WriterPartitioner(partitionColumns, comparators);
+ }
+
+ // no-op partitioner
+ return new NoOpWriterPartitioner();
+ }
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/WriterPartitioner.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/WriterPartitioner.java
new file mode 100644
index 0000000..468ca4a
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/WriterPartitioner.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.runtime.operators.writer;
+
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.data.accessors.PermutingFrameTupleReference;
+import org.apache.hyracks.dataflow.common.data.accessors.PointableTupleReference;
+import org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupWriter;
+
+class WriterPartitioner implements IWriterPartitioner {
+ private final int[] partitionColumns;
+ private final IBinaryComparator[] partitionComparators;
+ private final PointableTupleReference partitionColumnsPrevCopy;
+ private final PermutingFrameTupleReference partitionColumnsRef;
+ private boolean first = true;
+
+ public WriterPartitioner(int[] partitionColumns, IBinaryComparator[] partitionComparators) {
+ this.partitionColumns = partitionColumns;
+ this.partitionComparators = partitionComparators;
+ partitionColumnsRef = new PermutingFrameTupleReference(partitionColumns);
+ partitionColumnsPrevCopy =
+ PointableTupleReference.create(partitionColumns.length, ArrayBackedValueStorage::new);
+ }
+
+ @Override
+ public boolean isNewPartition(FrameTupleAccessor tupleAccessor, int index) throws HyracksDataException {
+ if (first) {
+ first = false;
+ partitionColumnsRef.reset(tupleAccessor, index);
+ partitionColumnsPrevCopy.set(partitionColumnsRef);
+ return true;
+ }
+
+ boolean newPartition = !PreclusteredGroupWriter.sameGroup(partitionColumnsPrevCopy, tupleAccessor, index,
+ partitionColumns, partitionComparators);
+
+ // Set previous
+ partitionColumnsRef.reset(tupleAccessor, index);
+ partitionColumnsPrevCopy.set(partitionColumnsRef);
+
+ return newPartition;
+ }
+}
\ No newline at end of file