[NO ISSUE][COMP] Support copy statement with csv files
- user model changes: yes
- storage format changes: no
- interface changes: no
Details:
With this change, we add support for COPY statement with csv files.
Users need to provide inline types with csv files when using copy
statement with csv files for example:
COPY orders AS (
id int,
details string,
date string
)
USING localfs
(("path"="asterix_nc1:///csv/sample_10.csv"),("format"="csv"),("header"="False"));
Change-Id: I7ac452559ab02e35f5b5fa84fbd0853a08b2bc86
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17739
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Tested-by: Peeyush Gupta <peeyush.gupta@couchbase.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
index 4981f0e..3733970 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
@@ -27,6 +27,7 @@
import org.apache.asterix.lang.common.expression.VariableExpr;
import org.apache.asterix.lang.common.statement.Query;
import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Datatype;
import org.apache.asterix.metadata.entities.Index;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.exceptions.SourceLocation;
@@ -290,13 +291,15 @@
implements ICompiledDmlStatement {
private final DataverseName dataverseName;
private final String datasetName;
+ private final Datatype itemType;
private final String adapter;
private final Map<String, String> properties;
- public CompiledCopyFromFileStatement(DataverseName dataverseName, String datasetName, String adapter,
- Map<String, String> properties) {
+ public CompiledCopyFromFileStatement(DataverseName dataverseName, String datasetName, Datatype itemType,
+ String adapter, Map<String, String> properties) {
this.dataverseName = dataverseName;
this.datasetName = datasetName;
+ this.itemType = itemType;
this.adapter = adapter;
this.properties = properties;
}
@@ -319,6 +322,10 @@
return properties;
}
+ public Datatype getItemType() {
+ return itemType;
+ }
+
@Override
public Statement.Kind getKind() {
return Statement.Kind.COPY;
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
index 52cddc5..f1a1398 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
@@ -224,9 +224,9 @@
((CompiledLoadFromFileStatement) stmt).getAdapter(),
((CompiledLoadFromFileStatement) stmt).getProperties());
} else if (stmt.getKind() == Statement.Kind.COPY) {
- lds = new LoadableDataSource(dataset, itemType, metaItemType,
- ((CompiledCopyFromFileStatement) stmt).getAdapter(),
- ((CompiledCopyFromFileStatement) stmt).getProperties());
+ CompiledCopyFromFileStatement copyStmt = (CompiledCopyFromFileStatement) stmt;
+ lds = new LoadableDataSource(dataset, copyStmt.getItemType().getDatatype(), metaItemType,
+ copyStmt.getAdapter(), copyStmt.getProperties());
} else {
throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc, "Unrecognized Statement Type",
stmt.getKind());
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index d41b549..7cf7938 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -3504,9 +3504,22 @@
try {
metadataProvider.setWriteTransaction(true);
Dataset dataset = metadataProvider.findDataset(dataverseName, copyStmt.getDatasetName());
+ if (dataset == null) {
+ throw new CompilationException(ErrorCode.UNKNOWN_DATASET_IN_DATAVERSE, stmt.getSourceLocation(),
+ datasetName, dataverseName);
+ }
Datatype itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(),
dataset.getItemTypeName());
-
+ // Copy statement with csv files will have a type expression
+ if (copyStmt.getTypeExpr() != null) {
+ TypeExpression itemTypeExpr = copyStmt.getTypeExpr();
+ Triple<DataverseName, String, Boolean> itemTypeQualifiedName = extractDatasetItemTypeName(dataverseName,
+ datasetName, itemTypeExpr, false, stmt.getSourceLocation());
+ DataverseName itemTypeDataverseName = itemTypeQualifiedName.first;
+ String itemTypeName = itemTypeQualifiedName.second;
+ IAType itemTypeEntity = translateType(itemTypeDataverseName, itemTypeName, itemTypeExpr, mdTxnCtx);
+ itemType = new Datatype(itemTypeDataverseName, itemTypeName, itemTypeEntity, true);
+ }
ExternalDetailsDecl externalDetails = copyStmt.getExternalDetails();
Map<String, String> properties =
createExternalDataPropertiesForCopyStmt(dataverseName, copyStmt, itemType, mdTxnCtx);
@@ -3515,7 +3528,7 @@
validateExternalDatasetProperties(externalDetails, properties, copyStmt.getSourceLocation(), mdTxnCtx,
appCtx);
CompiledCopyFromFileStatement cls = new CompiledCopyFromFileStatement(dataverseName,
- copyStmt.getDatasetName(), externalDetails.getAdapter(), properties);
+ copyStmt.getDatasetName(), itemType, externalDetails.getAdapter(), properties);
cls.setSourceLocation(stmt.getSourceLocation());
JobSpecification spec = apiFramework.compileQuery(hcc, metadataProvider, null, 0, null, sessionOutput, cls,
null, responsePrinter, warningCollector, null);
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CopyStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CopyStatement.java
index 3baf81a..2a1a51a 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CopyStatement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CopyStatement.java
@@ -22,6 +22,7 @@
import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.lang.common.base.AbstractStatement;
import org.apache.asterix.lang.common.expression.RecordConstructor;
+import org.apache.asterix.lang.common.expression.TypeExpression;
import org.apache.asterix.lang.common.util.ExpressionUtils;
import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
import org.apache.asterix.object.base.AdmObjectNode;
@@ -30,13 +31,15 @@
private DataverseName dataverseName;
private String datasetName;
+ private TypeExpression typeExpr;
private ExternalDetailsDecl externalDetails;
private AdmObjectNode withObjectNode;
- public CopyStatement(DataverseName dataverseName, String datasetName, ExternalDetailsDecl externalDetails,
- RecordConstructor withRecord) throws CompilationException {
+ public CopyStatement(DataverseName dataverseName, String datasetName, TypeExpression typeExpr,
+ ExternalDetailsDecl externalDetails, RecordConstructor withRecord) throws CompilationException {
this.dataverseName = dataverseName;
this.datasetName = datasetName;
+ this.typeExpr = typeExpr;
this.externalDetails = externalDetails;
this.withObjectNode = withRecord == null ? new AdmObjectNode() : ExpressionUtils.toNode(withRecord);
}
@@ -87,4 +90,8 @@
public byte getCategory() {
return Category.UPDATE;
}
+
+ public TypeExpression getTypeExpr() {
+ return typeExpr;
+ }
}
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
index 9e3dea4..a7dc2d5 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
+++ b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
@@ -2730,6 +2730,7 @@
Token startToken = null;
DataverseName dataverseName = null;
Identifier datasetName = null;
+ TypeExpression typeExpr = null;
boolean alreadySorted = false;
String adapterName;
Map<String,String> properties;
@@ -2741,13 +2742,14 @@
dataverseName = nameComponents.first;
datasetName = nameComponents.second;
}
+ (<AS> typeExpr = DatasetTypeSpecification())?
<USING> adapterName = AdapterName() properties = Configuration()
{
ExternalDetailsDecl edd = new ExternalDetailsDecl();
edd.setAdapter(adapterName);
edd.setProperties(properties);
try {
- CopyStatement stmt = new CopyStatement(dataverseName, datasetName.getValue(), edd, null);
+ CopyStatement stmt = new CopyStatement(dataverseName, datasetName.getValue(), typeExpr, edd, null);
return addSourceLocation(stmt, startToken);
} catch (CompilationException e){
throw new SqlppParseException(getSourceLocation(startToken), e.getMessage());