[ASTERIXDB-3170][COMP] Support COPY Statment
- user model changes: yes
- storage format changes: no
- interface changes: no
Details:
With this change, we add support for COPY statement, that can be
used to upsert data into a dataset from an external data source.
Change-Id: I612978472f090ab3c32e901aa37087ed5b7edf92
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17499
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Peeyush Gupta <peeyush.gupta@couchbase.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/base/ILangExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/base/ILangExpressionToPlanTranslator.java
index fea9340..0116576 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/base/ILangExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/base/ILangExpressionToPlanTranslator.java
@@ -51,10 +51,10 @@
*
* @param stmt,
* the compiled load statement.
- * @return a logical query plan for the load statement.
+ * @return a logical query plan for the Copy/Load statement.
* @throws AlgebricksException
*/
- public ILogicalPlan translateLoad(ICompiledDmlStatement stmt) throws AlgebricksException;
+ public ILogicalPlan translateCopyOrLoad(ICompiledDmlStatement stmt) throws AlgebricksException;
/**
* @return the current minimum available variable id.
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
index 088676c..4981f0e 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
@@ -286,6 +286,50 @@
}
}
+ public static class CompiledCopyFromFileStatement extends AbstractCompiledStatement
+ implements ICompiledDmlStatement {
+ private final DataverseName dataverseName;
+ private final String datasetName;
+ private final String adapter;
+ private final Map<String, String> properties;
+
+ public CompiledCopyFromFileStatement(DataverseName dataverseName, String datasetName, String adapter,
+ Map<String, String> properties) {
+ this.dataverseName = dataverseName;
+ this.datasetName = datasetName;
+ this.adapter = adapter;
+ this.properties = properties;
+ }
+
+ @Override
+ public DataverseName getDataverseName() {
+ return dataverseName;
+ }
+
+ @Override
+ public String getDatasetName() {
+ return datasetName;
+ }
+
+ public String getAdapter() {
+ return adapter;
+ }
+
+ public Map<String, String> getProperties() {
+ return properties;
+ }
+
+ @Override
+ public Statement.Kind getKind() {
+ return Statement.Kind.COPY;
+ }
+
+ @Override
+ public byte getCategory() {
+ return Statement.Category.UPDATE;
+ }
+ }
+
public static class CompiledInsertStatement extends AbstractCompiledStatement implements ICompiledDmlStatement {
private final DataverseName dataverseName;
private final String datasetName;
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
index bd06729..52cddc5 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
@@ -44,6 +44,7 @@
import org.apache.asterix.lang.common.base.Expression;
import org.apache.asterix.lang.common.base.Expression.Kind;
import org.apache.asterix.lang.common.base.ILangExpression;
+import org.apache.asterix.lang.common.base.Statement;
import org.apache.asterix.lang.common.clause.GroupbyClause;
import org.apache.asterix.lang.common.clause.LetClause;
import org.apache.asterix.lang.common.clause.LimitClause;
@@ -99,6 +100,7 @@
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.translator.CompiledStatements.CompiledCopyFromFileStatement;
import org.apache.asterix.translator.CompiledStatements.CompiledInsertStatement;
import org.apache.asterix.translator.CompiledStatements.CompiledLoadFromFileStatement;
import org.apache.asterix.translator.CompiledStatements.CompiledUpsertStatement;
@@ -194,15 +196,13 @@
return context.getVarCounter();
}
- @Override
- public ILogicalPlan translateLoad(ICompiledDmlStatement stmt) throws AlgebricksException {
- CompiledLoadFromFileStatement clffs = (CompiledLoadFromFileStatement) stmt;
+ public ILogicalPlan translateCopyOrLoad(ICompiledDmlStatement stmt) throws AlgebricksException {
SourceLocation sourceLoc = stmt.getSourceLocation();
- Dataset dataset = metadataProvider.findDataset(clffs.getDataverseName(), clffs.getDatasetName());
+ Dataset dataset = metadataProvider.findDataset(stmt.getDataverseName(), stmt.getDatasetName());
if (dataset == null) {
// This would never happen since we check for this in AqlTranslator
- throw new CompilationException(ErrorCode.UNKNOWN_DATASET_IN_DATAVERSE, sourceLoc, clffs.getDatasetName(),
- clffs.getDataverseName());
+ throw new CompilationException(ErrorCode.UNKNOWN_DATASET_IN_DATAVERSE, sourceLoc, stmt.getDatasetName(),
+ stmt.getDataverseName());
}
IAType itemType = metadataProvider.findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
IAType metaItemType =
@@ -219,7 +219,18 @@
LoadableDataSource lds;
try {
- lds = new LoadableDataSource(dataset, itemType, metaItemType, clffs.getAdapter(), clffs.getProperties());
+ if (stmt.getKind() == Statement.Kind.LOAD) {
+ lds = new LoadableDataSource(dataset, itemType, metaItemType,
+ ((CompiledLoadFromFileStatement) stmt).getAdapter(),
+ ((CompiledLoadFromFileStatement) stmt).getProperties());
+ } else if (stmt.getKind() == Statement.Kind.COPY) {
+ lds = new LoadableDataSource(dataset, itemType, metaItemType,
+ ((CompiledCopyFromFileStatement) stmt).getAdapter(),
+ ((CompiledCopyFromFileStatement) stmt).getProperties());
+ } else {
+ throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc, "Unrecognized Statement Type",
+ stmt.getKind());
+ }
} catch (IOException e) {
throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc, e.toString(), e);
}
@@ -258,14 +269,16 @@
assign.getInputs().add(new MutableObject<>(dssOp));
assign.setSourceLocation(sourceLoc);
- // If the input is pre-sorted, we set the ordering property explicitly in the
- // assign
- if (clffs.alreadySorted()) {
- List<OrderColumn> orderColumns = new ArrayList<>();
- for (int i = 0; i < pkVars.size(); ++i) {
- orderColumns.add(new OrderColumn(pkVars.get(i), OrderKind.ASC));
+ if (stmt.getKind() == Statement.Kind.LOAD) {
+ // If the input is pre-sorted, we set the ordering property explicitly in the
+ // assign
+ if (((CompiledLoadFromFileStatement) stmt).alreadySorted()) {
+ List<OrderColumn> orderColumns = new ArrayList<>();
+ for (int i = 0; i < pkVars.size(); ++i) {
+ orderColumns.add(new OrderColumn(pkVars.get(i), OrderKind.ASC));
+ }
+ assign.setExplicitOrderingProperty(new LocalOrderProperty(orderColumns));
}
- assign.setExplicitOrderingProperty(new LocalOrderProperty(orderColumns));
}
// Load does not support meta record now.
@@ -285,22 +298,49 @@
additionalFilteringAssign.setSourceLocation(sourceLoc);
}
- InsertDeleteUpsertOperator insertOp = new InsertDeleteUpsertOperator(targetDatasource, payloadRef,
- varRefsForLoading, InsertDeleteUpsertOperator.Kind.INSERT, true);
- insertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
- insertOp.setSourceLocation(sourceLoc);
+ if (stmt.getKind() == Statement.Kind.LOAD) {
+ InsertDeleteUpsertOperator insertOp = new InsertDeleteUpsertOperator(targetDatasource, payloadRef,
+ varRefsForLoading, InsertDeleteUpsertOperator.Kind.INSERT, true);
+ insertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
+ insertOp.setSourceLocation(sourceLoc);
- if (additionalFilteringAssign != null) {
- additionalFilteringAssign.getInputs().add(new MutableObject<>(assign));
- insertOp.getInputs().add(new MutableObject<>(additionalFilteringAssign));
+ if (additionalFilteringAssign != null) {
+ additionalFilteringAssign.getInputs().add(new MutableObject<>(assign));
+ insertOp.getInputs().add(new MutableObject<>(additionalFilteringAssign));
+ } else {
+ insertOp.getInputs().add(new MutableObject<>(assign));
+ }
+
+ SinkOperator leafOperator = new SinkOperator();
+ leafOperator.getInputs().add(new MutableObject<>(insertOp));
+ leafOperator.setSourceLocation(sourceLoc);
+ return new ALogicalPlanImpl(new MutableObject<>(leafOperator));
+ } else if (stmt.getKind() == Statement.Kind.COPY) {
+ InsertDeleteUpsertOperator upsertOp = new InsertDeleteUpsertOperator(targetDatasource, payloadRef,
+ varRefsForLoading, InsertDeleteUpsertOperator.Kind.UPSERT, false);
+ upsertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
+ upsertOp.setSourceLocation(sourceLoc);
+
+ if (additionalFilteringAssign != null) {
+ additionalFilteringAssign.getInputs().add(new MutableObject<>(assign));
+ upsertOp.getInputs().add(new MutableObject<>(additionalFilteringAssign));
+ } else {
+ upsertOp.getInputs().add(new MutableObject<>(assign));
+ }
+ upsertOp.setOperationVar(context.newVar());
+ upsertOp.setOperationVarType(BuiltinType.AINT8);
+ // Create and add a new variable used for representing the original record
+ upsertOp.setPrevRecordVar(context.newVar());
+ upsertOp.setPrevRecordType(itemType);
+
+ DelegateOperator delegateOperator = new DelegateOperator(new CommitOperator(true));
+ delegateOperator.getInputs().add(new MutableObject<>(upsertOp));
+ delegateOperator.setSourceLocation(sourceLoc);
+ return new ALogicalPlanImpl(new MutableObject<>(delegateOperator));
} else {
- insertOp.getInputs().add(new MutableObject<>(assign));
+ throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc, "Unrecognized Statement Type",
+ stmt.getKind());
}
-
- SinkOperator leafOperator = new SinkOperator();
- leafOperator.getInputs().add(new MutableObject<>(insertOp));
- leafOperator.setSourceLocation(sourceLoc);
- return new ALogicalPlanImpl(new MutableObject<>(leafOperator));
}
@Override
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index 41be44b..afdcee4 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -192,6 +192,7 @@
// establish facts
final boolean isQuery = query != null;
final boolean isLoad = statement != null && statement.getKind() == Statement.Kind.LOAD;
+ final boolean isCopy = statement != null && statement.getKind() == Statement.Kind.COPY;
final SourceLocation sourceLoc =
query != null ? query.getSourceLocation() : statement != null ? statement.getSourceLocation() : null;
final boolean isExplainOnly = isQuery && query.isExplain();
@@ -207,8 +208,8 @@
ILangExpressionToPlanTranslator t =
translatorFactory.createExpressionToPlanTranslator(metadataProvider, varCounter, externalVars);
ResultMetadata resultMetadata = new ResultMetadata(output.config().fmt());
- ILogicalPlan plan =
- isLoad ? t.translateLoad(statement) : t.translate(query, outputDatasetName, statement, resultMetadata);
+ ILogicalPlan plan = isLoad || isCopy ? t.translateCopyOrLoad(statement)
+ : t.translate(query, outputDatasetName, statement, resultMetadata);
ICcApplicationContext ccAppContext = metadataProvider.getApplicationContext();
CompilerProperties compilerProperties = ccAppContext.getCompilerProperties();
@@ -233,7 +234,7 @@
builder.setWarningCollector(warningCollector);
builder.setMaxWarnings(conf.getMaxWarnings());
- if ((isQuery || isLoad) && !conf.is(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS)
+ if ((isQuery || isLoad || isCopy) && !conf.is(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS)
&& conf.is(SessionConfig.OOB_LOGICAL_PLAN)) {
generateLogicalPlan(plan, output.config().getPlanFormat(), cboMode);
}
@@ -272,7 +273,7 @@
PlanPrettyPrinter.printPhysicalOps(plan, buf, 0, true);
output.out().write(buf.toString());
} else {
- if (isQuery || isLoad) {
+ if (isQuery || isLoad || isCopy) {
generateOptimizedLogicalPlan(plan, output.config().getPlanFormat(), cboMode);
}
}
@@ -299,7 +300,7 @@
}
if (!conf.isGenerateJobSpec()) {
- if (isQuery || isLoad) {
+ if (isQuery || isLoad || isCopy) {
generateOptimizedLogicalPlan(plan, output.config().getPlanFormat(), cboMode);
}
return null;
@@ -324,7 +325,7 @@
}
if (conf.is(SessionConfig.OOB_OPTIMIZED_LOGICAL_PLAN) || isExplainOnly) {
- if (isQuery || isLoad) {
+ if (isQuery || isLoad || isCopy) {
generateOptimizedLogicalPlan(plan, spec.getLogical2PhysicalMap(), output.config().getPlanFormat(),
cboMode);
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index f48e2b8..43218d0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -112,6 +112,7 @@
import org.apache.asterix.lang.common.statement.AnalyzeStatement;
import org.apache.asterix.lang.common.statement.CompactStatement;
import org.apache.asterix.lang.common.statement.ConnectFeedStatement;
+import org.apache.asterix.lang.common.statement.CopyStatement;
import org.apache.asterix.lang.common.statement.CreateAdapterStatement;
import org.apache.asterix.lang.common.statement.CreateDataverseStatement;
import org.apache.asterix.lang.common.statement.CreateFeedPolicyStatement;
@@ -209,6 +210,7 @@
import org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory;
import org.apache.asterix.translator.AbstractLangTranslator;
import org.apache.asterix.translator.ClientRequest;
+import org.apache.asterix.translator.CompiledStatements.CompiledCopyFromFileStatement;
import org.apache.asterix.translator.CompiledStatements.CompiledDeleteStatement;
import org.apache.asterix.translator.CompiledStatements.CompiledInsertStatement;
import org.apache.asterix.translator.CompiledStatements.CompiledLoadFromFileStatement;
@@ -436,6 +438,12 @@
}
handleLoadStatement(metadataProvider, stmt, hcc);
break;
+ case COPY:
+ if (stats.getProfileType() == Stats.ProfileType.FULL) {
+ this.jobFlags.add(JobFlag.PROFILE_RUNTIME);
+ }
+ handleCopyStatement(metadataProvider, stmt, hcc);
+ break;
case INSERT:
case UPSERT:
if (((InsertStatement) stmt).getReturnExpression() != null) {
@@ -3442,6 +3450,57 @@
}
}
+ protected Map<String, String> createExternalDataPropertiesForCopyStmt(DataverseName dataverseName,
+ CopyStatement copyStatement, Datatype itemType, MetadataTransactionContext mdTxnCtx)
+ throws AlgebricksException {
+ return copyStatement.getExternalDetails().getProperties();
+ }
+
+ protected void handleCopyStatement(MetadataProvider metadataProvider, Statement stmt, IHyracksClientConnection hcc)
+ throws Exception {
+ CopyStatement copyStmt = (CopyStatement) stmt;
+ String datasetName = copyStmt.getDatasetName();
+ metadataProvider.validateDatabaseObjectName(copyStmt.getDataverseName(), datasetName,
+ copyStmt.getSourceLocation());
+ DataverseName dataverseName = getActiveDataverseName(copyStmt.getDataverseName());
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ boolean bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ lockUtil.insertDeleteUpsertBegin(lockManager, metadataProvider.getLocks(), dataverseName, datasetName);
+ try {
+ metadataProvider.setWriteTransaction(true);
+ Dataset dataset = metadataProvider.findDataset(dataverseName, copyStmt.getDatasetName());
+ Datatype itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(),
+ dataset.getItemTypeName());
+
+ ExternalDetailsDecl externalDetails = copyStmt.getExternalDetails();
+ Map<String, String> properties =
+ createExternalDataPropertiesForCopyStmt(dataverseName, copyStmt, itemType, mdTxnCtx);
+ ExternalDataUtils.normalize(properties);
+ ExternalDataUtils.validate(properties);
+ validateExternalDatasetProperties(externalDetails, properties, copyStmt.getSourceLocation(), mdTxnCtx,
+ appCtx);
+ CompiledCopyFromFileStatement cls = new CompiledCopyFromFileStatement(dataverseName,
+ copyStmt.getDatasetName(), externalDetails.getAdapter(), properties);
+ cls.setSourceLocation(stmt.getSourceLocation());
+ JobSpecification spec = apiFramework.compileQuery(hcc, metadataProvider, null, 0, null, sessionOutput, cls,
+ null, responsePrinter, warningCollector, null);
+ afterCompile();
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ bActiveTxn = false;
+ if (spec != null && !isCompileOnly()) {
+ runJob(hcc, spec);
+ }
+ } catch (Exception e) {
+ if (bActiveTxn) {
+ abort(e, e, mdTxnCtx);
+ }
+ throw e;
+ } finally {
+ metadataProvider.getLocks().unlock();
+ }
+ }
+
public JobSpecification handleInsertUpsertStatement(MetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc, IResultSet resultSet, ResultDelivery resultDelivery,
ResultMetadata outMetadata, Stats stats, IRequestParameters requestParameters,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-1/copy-1.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-1/copy-1.1.ddl.sqlpp
new file mode 100644
index 0000000..e69b33b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-1/copy-1.1.ddl.sqlpp
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ create dataverse test if not exists;
+ use test;
+
+create type AddressType as open {
+ number: int64,
+ street: string,
+ city: string
+};
+
+create type CustomerType as closed {
+ cid: int64,
+ name: string,
+ cashBack: int64,
+ age: int64?,
+ address: AddressType?,
+ lastorder: {
+ oid: int64,
+ total: float
+ }
+};
+
+create dataset Customers(CustomerType) primary key cid;
+
+
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-1/copy-1.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-1/copy-1.2.update.sqlpp
new file mode 100644
index 0000000..002d43d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-1/copy-1.2.update.sqlpp
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use test;
+
+upsert into Customers([
+ {
+ "cid": 1,
+ "name": "Jodi Rotruck",
+ "cashBack": 100,
+ "lastorder": { "oid": 66, "total": 38.618626f }
+ },
+ {
+ "cid": 1000,
+ "name": "ABC",
+ "cashBack": 100,
+ "lastorder": { "oid": 66, "total": 38.618626f }
+ }
+]);
+
+copy Customers
+using localfs
+(("path"="asterix_nc1://data/nontagged/customerData.json"),("format"="adm"));
+
+upsert into Customers([
+ {
+ "cid": 1,
+ "name": "Jodi Rotruck",
+ "cashBack": 100,
+ "lastorder": { "oid": 66, "total": 38.618626f }
+ },
+ {
+ "cid": 1000,
+ "name": "Jodi Rotruck",
+ "cashBack": 100,
+ "lastorder": { "oid": 66, "total": 38.618626f }
+ },
+ {
+ "cid": 4,
+ "name": "ABC",
+ "cashBack": 100,
+ "lastorder": { "oid": 66, "total": 38.618626f }
+ },
+ {
+ "cid": 1001,
+ "name": "XYZ",
+ "cashBack": 100,
+ "lastorder": { "oid": 66, "total": 38.618626f }
+ }
+]);
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-1/copy-1.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-1/copy-1.3.query.sqlpp
new file mode 100644
index 0000000..e5a896b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-1/copy-1.3.query.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ use test;
+
+ select value count(*) from
+ Customers;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-2/copy-2.0.container.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-2/copy-2.0.container.sqlpp
new file mode 100644
index 0000000..ecbbe43
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-2/copy-2.0.container.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+// create container with data
+playground data_dir data/json/single-line/20-records.json
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-2/copy-2.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-2/copy-2.1.ddl.sqlpp
new file mode 100644
index 0000000..d44645c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-2/copy-2.1.ddl.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use test;
+
+drop dataset test1 if exists;
+create dataset test1 primary key (id: int);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-2/copy-2.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-2/copy-2.2.update.sqlpp
new file mode 100644
index 0000000..a974a62
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-2/copy-2.2.update.sqlpp
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+upsert into test1([
+ {
+ "id": 1
+ },
+ {
+ "id": 1000
+ }
+]);
+
+copy test1 USING S3 (
+("region"="us-west-2"),
+("serviceEndpoint"="http://localhost:8001"),
+("container"="playground"),
+("definition"="data_dir"),
+("format"="json")
+);
+
+upsert into test1([
+ {
+ "id": 1
+ },
+ {
+ "id": 1000
+ },
+ {
+ "id": 2
+ },
+ {
+ "id": 1001
+ }
+]);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-2/copy-2.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-2/copy-2.3.query.sqlpp
new file mode 100644
index 0000000..b72e741
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-2/copy-2.3.query.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+select count(*) `count` from test1;
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/copy/copy-1/copy-1.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/copy/copy-1/copy-1.1.adm
new file mode 100644
index 0000000..c793025
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/copy/copy-1/copy-1.1.adm
@@ -0,0 +1 @@
+7
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/copy/copy-2/copy-2.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/copy/copy-2/copy-2.1.adm
new file mode 100644
index 0000000..e5bcb83
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/copy/copy-2/copy-2.1.adm
@@ -0,0 +1 @@
+{ "count": 22 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
index 7242984..8db1798 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
@@ -479,4 +479,11 @@
</compilation-unit>
</test-case>
</test-group>
+ <test-group name="copy">
+ <test-case FilePath="copy">
+ <compilation-unit name="copy-2">
+ <output-dir compare="Text">copy-2</output-dir>
+ </compilation-unit>
+ </test-case>
+ </test-group>
</test-suite>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 2792f91..7bcb473 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -16274,4 +16274,11 @@
</compilation-unit>
</test-case>
</test-group>
+ <test-group name="copy">
+ <test-case FilePath="copy">
+ <compilation-unit name="copy-1">
+ <output-dir compare="Text">copy-1</output-dir>
+ </compilation-unit>
+ </test-case>
+ </test-group>
</test-suite>
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java
index 05d53b1..1654118 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java
@@ -111,5 +111,6 @@
COMPACT,
SUBSCRIBE_FEED,
EXTENSION,
+ COPY
}
}
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CopyStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CopyStatement.java
new file mode 100644
index 0000000..3baf81a
--- /dev/null
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CopyStatement.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.lang.common.statement;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.lang.common.base.AbstractStatement;
+import org.apache.asterix.lang.common.expression.RecordConstructor;
+import org.apache.asterix.lang.common.util.ExpressionUtils;
+import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
+import org.apache.asterix.object.base.AdmObjectNode;
+
+public class CopyStatement extends AbstractStatement {
+
+ private DataverseName dataverseName;
+ private String datasetName;
+ private ExternalDetailsDecl externalDetails;
+ private AdmObjectNode withObjectNode;
+
+ public CopyStatement(DataverseName dataverseName, String datasetName, ExternalDetailsDecl externalDetails,
+ RecordConstructor withRecord) throws CompilationException {
+ this.dataverseName = dataverseName;
+ this.datasetName = datasetName;
+ this.externalDetails = externalDetails;
+ this.withObjectNode = withRecord == null ? new AdmObjectNode() : ExpressionUtils.toNode(withRecord);
+ }
+
+ public DataverseName getDataverseName() {
+ return dataverseName;
+ }
+
+ public void setDataverseName(DataverseName dataverseName) {
+ this.dataverseName = dataverseName;
+ }
+
+ @Override
+ public Kind getKind() {
+ return Kind.COPY;
+ }
+
+ public String getDatasetName() {
+ return datasetName;
+ }
+
+ public void setDatasetName(String datasetName) {
+ this.datasetName = datasetName;
+ }
+
+ @Override
+ public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationException {
+ return visitor.visit(this, arg);
+ }
+
+ public ExternalDetailsDecl getExternalDetails() {
+ return externalDetails;
+ }
+
+ public void setExternalDetails(ExternalDetailsDecl externalDetails) {
+ this.externalDetails = externalDetails;
+ }
+
+ public AdmObjectNode getWithObjectNode() {
+ return withObjectNode;
+ }
+
+ public void setWithObjectNode(AdmObjectNode withObjectNode) {
+ this.withObjectNode = withObjectNode;
+ }
+
+ @Override
+ public byte getCategory() {
+ return Category.UPDATE;
+ }
+}
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
index 7424398..23455f1 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
@@ -69,6 +69,7 @@
import org.apache.asterix.lang.common.statement.AnalyzeStatement;
import org.apache.asterix.lang.common.statement.CompactStatement;
import org.apache.asterix.lang.common.statement.ConnectFeedStatement;
+import org.apache.asterix.lang.common.statement.CopyStatement;
import org.apache.asterix.lang.common.statement.CreateAdapterStatement;
import org.apache.asterix.lang.common.statement.CreateDataverseStatement;
import org.apache.asterix.lang.common.statement.CreateFeedPolicyStatement;
@@ -545,6 +546,16 @@
}
@Override
+ public Void visit(CopyStatement stmtCopy, Integer step) throws CompilationException {
+ out.print(skip(step) + "copy " + datasetSymbol
+ + generateFullName(stmtCopy.getDataverseName(), stmtCopy.getDatasetName()) + " using "
+ + revertStringToQuoted(stmtCopy.getExternalDetails().getAdapter()) + " ");
+ printConfiguration(stmtCopy.getExternalDetails().getProperties());
+ out.println();
+ return null;
+ }
+
+ @Override
public Void visit(DropDatasetStatement del, Integer step) throws CompilationException {
out.println(
skip(step) + "drop " + datasetSymbol + generateFullName(del.getDataverseName(), del.getDatasetName())
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/AbstractQueryExpressionVisitor.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/AbstractQueryExpressionVisitor.java
index a7444e9..accf476 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/AbstractQueryExpressionVisitor.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/AbstractQueryExpressionVisitor.java
@@ -29,6 +29,7 @@
import org.apache.asterix.lang.common.statement.AnalyzeStatement;
import org.apache.asterix.lang.common.statement.CompactStatement;
import org.apache.asterix.lang.common.statement.ConnectFeedStatement;
+import org.apache.asterix.lang.common.statement.CopyStatement;
import org.apache.asterix.lang.common.statement.CreateAdapterStatement;
import org.apache.asterix.lang.common.statement.CreateDataverseStatement;
import org.apache.asterix.lang.common.statement.CreateFeedPolicyStatement;
@@ -117,6 +118,11 @@
}
@Override
+ public R visit(CopyStatement stmtCopy, T arg) throws CompilationException {
+ return null;
+ }
+
+ @Override
public R visit(NodegroupDecl ngd, T arg) throws CompilationException {
return null;
}
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/ILangVisitor.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/ILangVisitor.java
index 541567d..9ad247c 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/ILangVisitor.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/ILangVisitor.java
@@ -47,6 +47,7 @@
import org.apache.asterix.lang.common.statement.AnalyzeStatement;
import org.apache.asterix.lang.common.statement.CompactStatement;
import org.apache.asterix.lang.common.statement.ConnectFeedStatement;
+import org.apache.asterix.lang.common.statement.CopyStatement;
import org.apache.asterix.lang.common.statement.CreateAdapterStatement;
import org.apache.asterix.lang.common.statement.CreateDataverseStatement;
import org.apache.asterix.lang.common.statement.CreateFeedPolicyStatement;
@@ -101,6 +102,8 @@
R visit(LoadStatement stmtLoad, T arg) throws CompilationException;
+ R visit(CopyStatement stmtCopy, T arg) throws CompilationException;
+
R visit(DropDatasetStatement del, T arg) throws CompilationException;
R visit(InsertStatement insert, T arg) throws CompilationException;
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
index 007e9af..01f72aa 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
+++ b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
@@ -159,6 +159,7 @@
import org.apache.asterix.lang.common.statement.InsertStatement;
import org.apache.asterix.lang.common.statement.InternalDetailsDecl;
import org.apache.asterix.lang.common.statement.LoadStatement;
+import org.apache.asterix.lang.common.statement.CopyStatement;
import org.apache.asterix.lang.common.statement.NodeGroupDropStatement;
import org.apache.asterix.lang.common.statement.NodegroupDecl;
import org.apache.asterix.lang.common.statement.Query;
@@ -929,6 +930,7 @@
| stmt = FunctionDeclaration()
| stmt = CreateStatement()
| stmt = LoadStatement()
+ | stmt = CopyStatement()
| stmt = DropStatement()
| stmt = SetStatement()
| stmt = InsertStatement()
@@ -2714,6 +2716,36 @@
}
}
+CopyStatement CopyStatement() throws ParseException:
+{
+ Token startToken = null;
+ DataverseName dataverseName = null;
+ Identifier datasetName = null;
+ boolean alreadySorted = false;
+ String adapterName;
+ Map<String,String> properties;
+ Pair<DataverseName,Identifier> nameComponents = null;
+}
+{
+ <COPY> (<INTO>)? { startToken = token; } nameComponents = QualifiedName()
+ {
+ dataverseName = nameComponents.first;
+ datasetName = nameComponents.second;
+ }
+ <USING> adapterName = AdapterName() properties = Configuration()
+ {
+ ExternalDetailsDecl edd = new ExternalDetailsDecl();
+ edd.setAdapter(adapterName);
+ edd.setProperties(properties);
+ try {
+ CopyStatement stmt = new CopyStatement(dataverseName, datasetName.getValue(), edd, null);
+ return addSourceLocation(stmt, startToken);
+ } catch (CompilationException e){
+ throw new SqlppParseException(getSourceLocation(startToken), e.getMessage());
+ }
+ }
+}
+
LoadStatement LoadStatement() throws ParseException:
{
Token startToken = null;
@@ -2743,7 +2775,6 @@
}
}
-
String AdapterName() throws ParseException :
{
String adapterName = null;
@@ -5709,6 +5740,7 @@
| <WHERE : "where">
| <WITH : "with">
| <WRITE : "write">
+ | <COPY : "copy">
}
<DEFAULT,IN_DBL_BRACE>