[ASTERIXDB-3170][COMP] Support COPY Statment

- user model changes: yes
- storage format changes: no
- interface changes: no

Details:
With this change, we add support for COPY statement, that can be
used to upsert data into a dataset from an external data source.

Change-Id: I612978472f090ab3c32e901aa37087ed5b7edf92
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17499
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Peeyush Gupta <peeyush.gupta@couchbase.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/base/ILangExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/base/ILangExpressionToPlanTranslator.java
index fea9340..0116576 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/base/ILangExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/base/ILangExpressionToPlanTranslator.java
@@ -51,10 +51,10 @@
      *
      * @param stmt,
      *            the compiled load statement.
-     * @return a logical query plan for the load statement.
+     * @return a logical query plan for the Copy/Load statement.
      * @throws AlgebricksException
      */
-    public ILogicalPlan translateLoad(ICompiledDmlStatement stmt) throws AlgebricksException;
+    public ILogicalPlan translateCopyOrLoad(ICompiledDmlStatement stmt) throws AlgebricksException;
 
     /**
      * @return the current minimum available variable id.
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
index 088676c..4981f0e 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
@@ -286,6 +286,50 @@
         }
     }
 
+    public static class CompiledCopyFromFileStatement extends AbstractCompiledStatement
+            implements ICompiledDmlStatement {
+        private final DataverseName dataverseName;
+        private final String datasetName;
+        private final String adapter;
+        private final Map<String, String> properties;
+
+        public CompiledCopyFromFileStatement(DataverseName dataverseName, String datasetName, String adapter,
+                Map<String, String> properties) {
+            this.dataverseName = dataverseName;
+            this.datasetName = datasetName;
+            this.adapter = adapter;
+            this.properties = properties;
+        }
+
+        @Override
+        public DataverseName getDataverseName() {
+            return dataverseName;
+        }
+
+        @Override
+        public String getDatasetName() {
+            return datasetName;
+        }
+
+        public String getAdapter() {
+            return adapter;
+        }
+
+        public Map<String, String> getProperties() {
+            return properties;
+        }
+
+        @Override
+        public Statement.Kind getKind() {
+            return Statement.Kind.COPY;
+        }
+
+        @Override
+        public byte getCategory() {
+            return Statement.Category.UPDATE;
+        }
+    }
+
     public static class CompiledInsertStatement extends AbstractCompiledStatement implements ICompiledDmlStatement {
         private final DataverseName dataverseName;
         private final String datasetName;
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
index bd06729..52cddc5 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
@@ -44,6 +44,7 @@
 import org.apache.asterix.lang.common.base.Expression;
 import org.apache.asterix.lang.common.base.Expression.Kind;
 import org.apache.asterix.lang.common.base.ILangExpression;
+import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.lang.common.clause.GroupbyClause;
 import org.apache.asterix.lang.common.clause.LetClause;
 import org.apache.asterix.lang.common.clause.LimitClause;
@@ -99,6 +100,7 @@
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.translator.CompiledStatements.CompiledCopyFromFileStatement;
 import org.apache.asterix.translator.CompiledStatements.CompiledInsertStatement;
 import org.apache.asterix.translator.CompiledStatements.CompiledLoadFromFileStatement;
 import org.apache.asterix.translator.CompiledStatements.CompiledUpsertStatement;
@@ -194,15 +196,13 @@
         return context.getVarCounter();
     }
 
-    @Override
-    public ILogicalPlan translateLoad(ICompiledDmlStatement stmt) throws AlgebricksException {
-        CompiledLoadFromFileStatement clffs = (CompiledLoadFromFileStatement) stmt;
+    public ILogicalPlan translateCopyOrLoad(ICompiledDmlStatement stmt) throws AlgebricksException {
         SourceLocation sourceLoc = stmt.getSourceLocation();
-        Dataset dataset = metadataProvider.findDataset(clffs.getDataverseName(), clffs.getDatasetName());
+        Dataset dataset = metadataProvider.findDataset(stmt.getDataverseName(), stmt.getDatasetName());
         if (dataset == null) {
             // This would never happen since we check for this in AqlTranslator
-            throw new CompilationException(ErrorCode.UNKNOWN_DATASET_IN_DATAVERSE, sourceLoc, clffs.getDatasetName(),
-                    clffs.getDataverseName());
+            throw new CompilationException(ErrorCode.UNKNOWN_DATASET_IN_DATAVERSE, sourceLoc, stmt.getDatasetName(),
+                    stmt.getDataverseName());
         }
         IAType itemType = metadataProvider.findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
         IAType metaItemType =
@@ -219,7 +219,18 @@
 
         LoadableDataSource lds;
         try {
-            lds = new LoadableDataSource(dataset, itemType, metaItemType, clffs.getAdapter(), clffs.getProperties());
+            if (stmt.getKind() == Statement.Kind.LOAD) {
+                lds = new LoadableDataSource(dataset, itemType, metaItemType,
+                        ((CompiledLoadFromFileStatement) stmt).getAdapter(),
+                        ((CompiledLoadFromFileStatement) stmt).getProperties());
+            } else if (stmt.getKind() == Statement.Kind.COPY) {
+                lds = new LoadableDataSource(dataset, itemType, metaItemType,
+                        ((CompiledCopyFromFileStatement) stmt).getAdapter(),
+                        ((CompiledCopyFromFileStatement) stmt).getProperties());
+            } else {
+                throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc, "Unrecognized Statement Type",
+                        stmt.getKind());
+            }
         } catch (IOException e) {
             throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc, e.toString(), e);
         }
@@ -258,14 +269,16 @@
         assign.getInputs().add(new MutableObject<>(dssOp));
         assign.setSourceLocation(sourceLoc);
 
-        // If the input is pre-sorted, we set the ordering property explicitly in the
-        // assign
-        if (clffs.alreadySorted()) {
-            List<OrderColumn> orderColumns = new ArrayList<>();
-            for (int i = 0; i < pkVars.size(); ++i) {
-                orderColumns.add(new OrderColumn(pkVars.get(i), OrderKind.ASC));
+        if (stmt.getKind() == Statement.Kind.LOAD) {
+            // If the input is pre-sorted, we set the ordering property explicitly in the
+            // assign
+            if (((CompiledLoadFromFileStatement) stmt).alreadySorted()) {
+                List<OrderColumn> orderColumns = new ArrayList<>();
+                for (int i = 0; i < pkVars.size(); ++i) {
+                    orderColumns.add(new OrderColumn(pkVars.get(i), OrderKind.ASC));
+                }
+                assign.setExplicitOrderingProperty(new LocalOrderProperty(orderColumns));
             }
-            assign.setExplicitOrderingProperty(new LocalOrderProperty(orderColumns));
         }
 
         // Load does not support meta record now.
@@ -285,22 +298,49 @@
             additionalFilteringAssign.setSourceLocation(sourceLoc);
         }
 
-        InsertDeleteUpsertOperator insertOp = new InsertDeleteUpsertOperator(targetDatasource, payloadRef,
-                varRefsForLoading, InsertDeleteUpsertOperator.Kind.INSERT, true);
-        insertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
-        insertOp.setSourceLocation(sourceLoc);
+        if (stmt.getKind() == Statement.Kind.LOAD) {
+            InsertDeleteUpsertOperator insertOp = new InsertDeleteUpsertOperator(targetDatasource, payloadRef,
+                    varRefsForLoading, InsertDeleteUpsertOperator.Kind.INSERT, true);
+            insertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
+            insertOp.setSourceLocation(sourceLoc);
 
-        if (additionalFilteringAssign != null) {
-            additionalFilteringAssign.getInputs().add(new MutableObject<>(assign));
-            insertOp.getInputs().add(new MutableObject<>(additionalFilteringAssign));
+            if (additionalFilteringAssign != null) {
+                additionalFilteringAssign.getInputs().add(new MutableObject<>(assign));
+                insertOp.getInputs().add(new MutableObject<>(additionalFilteringAssign));
+            } else {
+                insertOp.getInputs().add(new MutableObject<>(assign));
+            }
+
+            SinkOperator leafOperator = new SinkOperator();
+            leafOperator.getInputs().add(new MutableObject<>(insertOp));
+            leafOperator.setSourceLocation(sourceLoc);
+            return new ALogicalPlanImpl(new MutableObject<>(leafOperator));
+        } else if (stmt.getKind() == Statement.Kind.COPY) {
+            InsertDeleteUpsertOperator upsertOp = new InsertDeleteUpsertOperator(targetDatasource, payloadRef,
+                    varRefsForLoading, InsertDeleteUpsertOperator.Kind.UPSERT, false);
+            upsertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
+            upsertOp.setSourceLocation(sourceLoc);
+
+            if (additionalFilteringAssign != null) {
+                additionalFilteringAssign.getInputs().add(new MutableObject<>(assign));
+                upsertOp.getInputs().add(new MutableObject<>(additionalFilteringAssign));
+            } else {
+                upsertOp.getInputs().add(new MutableObject<>(assign));
+            }
+            upsertOp.setOperationVar(context.newVar());
+            upsertOp.setOperationVarType(BuiltinType.AINT8);
+            // Create and add a new variable used for representing the original record
+            upsertOp.setPrevRecordVar(context.newVar());
+            upsertOp.setPrevRecordType(itemType);
+
+            DelegateOperator delegateOperator = new DelegateOperator(new CommitOperator(true));
+            delegateOperator.getInputs().add(new MutableObject<>(upsertOp));
+            delegateOperator.setSourceLocation(sourceLoc);
+            return new ALogicalPlanImpl(new MutableObject<>(delegateOperator));
         } else {
-            insertOp.getInputs().add(new MutableObject<>(assign));
+            throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc, "Unrecognized Statement Type",
+                    stmt.getKind());
         }
-
-        SinkOperator leafOperator = new SinkOperator();
-        leafOperator.getInputs().add(new MutableObject<>(insertOp));
-        leafOperator.setSourceLocation(sourceLoc);
-        return new ALogicalPlanImpl(new MutableObject<>(leafOperator));
     }
 
     @Override
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index 41be44b..afdcee4 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -192,6 +192,7 @@
         // establish facts
         final boolean isQuery = query != null;
         final boolean isLoad = statement != null && statement.getKind() == Statement.Kind.LOAD;
+        final boolean isCopy = statement != null && statement.getKind() == Statement.Kind.COPY;
         final SourceLocation sourceLoc =
                 query != null ? query.getSourceLocation() : statement != null ? statement.getSourceLocation() : null;
         final boolean isExplainOnly = isQuery && query.isExplain();
@@ -207,8 +208,8 @@
         ILangExpressionToPlanTranslator t =
                 translatorFactory.createExpressionToPlanTranslator(metadataProvider, varCounter, externalVars);
         ResultMetadata resultMetadata = new ResultMetadata(output.config().fmt());
-        ILogicalPlan plan =
-                isLoad ? t.translateLoad(statement) : t.translate(query, outputDatasetName, statement, resultMetadata);
+        ILogicalPlan plan = isLoad || isCopy ? t.translateCopyOrLoad(statement)
+                : t.translate(query, outputDatasetName, statement, resultMetadata);
 
         ICcApplicationContext ccAppContext = metadataProvider.getApplicationContext();
         CompilerProperties compilerProperties = ccAppContext.getCompilerProperties();
@@ -233,7 +234,7 @@
         builder.setWarningCollector(warningCollector);
         builder.setMaxWarnings(conf.getMaxWarnings());
 
-        if ((isQuery || isLoad) && !conf.is(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS)
+        if ((isQuery || isLoad || isCopy) && !conf.is(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS)
                 && conf.is(SessionConfig.OOB_LOGICAL_PLAN)) {
             generateLogicalPlan(plan, output.config().getPlanFormat(), cboMode);
         }
@@ -272,7 +273,7 @@
                     PlanPrettyPrinter.printPhysicalOps(plan, buf, 0, true);
                     output.out().write(buf.toString());
                 } else {
-                    if (isQuery || isLoad) {
+                    if (isQuery || isLoad || isCopy) {
                         generateOptimizedLogicalPlan(plan, output.config().getPlanFormat(), cboMode);
                     }
                 }
@@ -299,7 +300,7 @@
         }
 
         if (!conf.isGenerateJobSpec()) {
-            if (isQuery || isLoad) {
+            if (isQuery || isLoad || isCopy) {
                 generateOptimizedLogicalPlan(plan, output.config().getPlanFormat(), cboMode);
             }
             return null;
@@ -324,7 +325,7 @@
         }
 
         if (conf.is(SessionConfig.OOB_OPTIMIZED_LOGICAL_PLAN) || isExplainOnly) {
-            if (isQuery || isLoad) {
+            if (isQuery || isLoad || isCopy) {
                 generateOptimizedLogicalPlan(plan, spec.getLogical2PhysicalMap(), output.config().getPlanFormat(),
                         cboMode);
             }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index f48e2b8..43218d0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -112,6 +112,7 @@
 import org.apache.asterix.lang.common.statement.AnalyzeStatement;
 import org.apache.asterix.lang.common.statement.CompactStatement;
 import org.apache.asterix.lang.common.statement.ConnectFeedStatement;
+import org.apache.asterix.lang.common.statement.CopyStatement;
 import org.apache.asterix.lang.common.statement.CreateAdapterStatement;
 import org.apache.asterix.lang.common.statement.CreateDataverseStatement;
 import org.apache.asterix.lang.common.statement.CreateFeedPolicyStatement;
@@ -209,6 +210,7 @@
 import org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory;
 import org.apache.asterix.translator.AbstractLangTranslator;
 import org.apache.asterix.translator.ClientRequest;
+import org.apache.asterix.translator.CompiledStatements.CompiledCopyFromFileStatement;
 import org.apache.asterix.translator.CompiledStatements.CompiledDeleteStatement;
 import org.apache.asterix.translator.CompiledStatements.CompiledInsertStatement;
 import org.apache.asterix.translator.CompiledStatements.CompiledLoadFromFileStatement;
@@ -436,6 +438,12 @@
                         }
                         handleLoadStatement(metadataProvider, stmt, hcc);
                         break;
+                    case COPY:
+                        if (stats.getProfileType() == Stats.ProfileType.FULL) {
+                            this.jobFlags.add(JobFlag.PROFILE_RUNTIME);
+                        }
+                        handleCopyStatement(metadataProvider, stmt, hcc);
+                        break;
                     case INSERT:
                     case UPSERT:
                         if (((InsertStatement) stmt).getReturnExpression() != null) {
@@ -3442,6 +3450,57 @@
         }
     }
 
+    protected Map<String, String> createExternalDataPropertiesForCopyStmt(DataverseName dataverseName,
+            CopyStatement copyStatement, Datatype itemType, MetadataTransactionContext mdTxnCtx)
+            throws AlgebricksException {
+        return copyStatement.getExternalDetails().getProperties();
+    }
+
+    protected void handleCopyStatement(MetadataProvider metadataProvider, Statement stmt, IHyracksClientConnection hcc)
+            throws Exception {
+        CopyStatement copyStmt = (CopyStatement) stmt;
+        String datasetName = copyStmt.getDatasetName();
+        metadataProvider.validateDatabaseObjectName(copyStmt.getDataverseName(), datasetName,
+                copyStmt.getSourceLocation());
+        DataverseName dataverseName = getActiveDataverseName(copyStmt.getDataverseName());
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        boolean bActiveTxn = true;
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        lockUtil.insertDeleteUpsertBegin(lockManager, metadataProvider.getLocks(), dataverseName, datasetName);
+        try {
+            metadataProvider.setWriteTransaction(true);
+            Dataset dataset = metadataProvider.findDataset(dataverseName, copyStmt.getDatasetName());
+            Datatype itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(),
+                    dataset.getItemTypeName());
+
+            ExternalDetailsDecl externalDetails = copyStmt.getExternalDetails();
+            Map<String, String> properties =
+                    createExternalDataPropertiesForCopyStmt(dataverseName, copyStmt, itemType, mdTxnCtx);
+            ExternalDataUtils.normalize(properties);
+            ExternalDataUtils.validate(properties);
+            validateExternalDatasetProperties(externalDetails, properties, copyStmt.getSourceLocation(), mdTxnCtx,
+                    appCtx);
+            CompiledCopyFromFileStatement cls = new CompiledCopyFromFileStatement(dataverseName,
+                    copyStmt.getDatasetName(), externalDetails.getAdapter(), properties);
+            cls.setSourceLocation(stmt.getSourceLocation());
+            JobSpecification spec = apiFramework.compileQuery(hcc, metadataProvider, null, 0, null, sessionOutput, cls,
+                    null, responsePrinter, warningCollector, null);
+            afterCompile();
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            bActiveTxn = false;
+            if (spec != null && !isCompileOnly()) {
+                runJob(hcc, spec);
+            }
+        } catch (Exception e) {
+            if (bActiveTxn) {
+                abort(e, e, mdTxnCtx);
+            }
+            throw e;
+        } finally {
+            metadataProvider.getLocks().unlock();
+        }
+    }
+
     public JobSpecification handleInsertUpsertStatement(MetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc, IResultSet resultSet, ResultDelivery resultDelivery,
             ResultMetadata outMetadata, Stats stats, IRequestParameters requestParameters,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-1/copy-1.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-1/copy-1.1.ddl.sqlpp
new file mode 100644
index 0000000..e69b33b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-1/copy-1.1.ddl.sqlpp
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ create dataverse test if not exists;
+ use test;
+
+create type AddressType as open {
+  number: int64,
+  street: string,
+  city: string
+};
+
+create type CustomerType as closed {
+  cid: int64,
+  name: string,
+  cashBack: int64,
+  age: int64?,
+  address: AddressType?,
+  lastorder: {
+    oid: int64,
+    total: float
+  }
+};
+
+create dataset Customers(CustomerType) primary key cid;
+
+
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-1/copy-1.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-1/copy-1.2.update.sqlpp
new file mode 100644
index 0000000..002d43d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-1/copy-1.2.update.sqlpp
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use test;
+
+upsert into Customers([
+    {
+        "cid": 1,
+        "name": "Jodi Rotruck",
+        "cashBack": 100,
+        "lastorder": {  "oid": 66,  "total": 38.618626f }
+    },
+    {
+        "cid": 1000,
+        "name": "ABC",
+        "cashBack": 100,
+        "lastorder": {  "oid": 66,  "total": 38.618626f }
+    }
+]);
+
+copy Customers
+using localfs
+(("path"="asterix_nc1://data/nontagged/customerData.json"),("format"="adm"));
+
+upsert into Customers([
+    {
+        "cid": 1,
+        "name": "Jodi Rotruck",
+        "cashBack": 100,
+        "lastorder": {  "oid": 66,  "total": 38.618626f }
+    },
+    {
+        "cid": 1000,
+        "name": "Jodi Rotruck",
+        "cashBack": 100,
+        "lastorder": {  "oid": 66,  "total": 38.618626f }
+    },
+    {
+        "cid": 4,
+        "name": "ABC",
+        "cashBack": 100,
+        "lastorder": {  "oid": 66,  "total": 38.618626f }
+    },
+    {
+        "cid": 1001,
+        "name": "XYZ",
+        "cashBack": 100,
+        "lastorder": {  "oid": 66,  "total": 38.618626f }
+    }
+]);
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-1/copy-1.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-1/copy-1.3.query.sqlpp
new file mode 100644
index 0000000..e5a896b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-1/copy-1.3.query.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+  use test;
+
+  select value count(*) from
+  Customers;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-2/copy-2.0.container.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-2/copy-2.0.container.sqlpp
new file mode 100644
index 0000000..ecbbe43
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-2/copy-2.0.container.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+// create container with data
+playground data_dir data/json/single-line/20-records.json
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-2/copy-2.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-2/copy-2.1.ddl.sqlpp
new file mode 100644
index 0000000..d44645c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-2/copy-2.1.ddl.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use test;
+
+drop dataset test1 if exists;
+create dataset test1 primary key (id: int);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-2/copy-2.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-2/copy-2.2.update.sqlpp
new file mode 100644
index 0000000..a974a62
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-2/copy-2.2.update.sqlpp
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+upsert into test1([
+    {
+        "id": 1
+    },
+    {
+        "id": 1000
+    }
+]);
+
+copy test1 USING S3 (
+("region"="us-west-2"),
+("serviceEndpoint"="http://localhost:8001"),
+("container"="playground"),
+("definition"="data_dir"),
+("format"="json")
+);
+
+upsert into test1([
+    {
+        "id": 1
+    },
+    {
+        "id": 1000
+    },
+    {
+        "id": 2
+    },
+    {
+        "id": 1001
+    }
+]);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-2/copy-2.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-2/copy-2.3.query.sqlpp
new file mode 100644
index 0000000..b72e741
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy/copy-2/copy-2.3.query.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+select count(*) `count` from test1;
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/copy/copy-1/copy-1.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/copy/copy-1/copy-1.1.adm
new file mode 100644
index 0000000..c793025
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/copy/copy-1/copy-1.1.adm
@@ -0,0 +1 @@
+7
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/copy/copy-2/copy-2.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/copy/copy-2/copy-2.1.adm
new file mode 100644
index 0000000..e5bcb83
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/copy/copy-2/copy-2.1.adm
@@ -0,0 +1 @@
+{ "count": 22 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
index 7242984..8db1798 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
@@ -479,4 +479,11 @@
       </compilation-unit>
     </test-case>
   </test-group>
+  <test-group name="copy">
+    <test-case FilePath="copy">
+      <compilation-unit name="copy-2">
+        <output-dir compare="Text">copy-2</output-dir>
+      </compilation-unit>
+    </test-case>
+  </test-group>
 </test-suite>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 2792f91..7bcb473 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -16274,4 +16274,11 @@
       </compilation-unit>
     </test-case>
   </test-group>
+  <test-group name="copy">
+    <test-case FilePath="copy">
+      <compilation-unit name="copy-1">
+        <output-dir compare="Text">copy-1</output-dir>
+      </compilation-unit>
+    </test-case>
+  </test-group>
 </test-suite>
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java
index 05d53b1..1654118 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/base/Statement.java
@@ -111,5 +111,6 @@
         COMPACT,
         SUBSCRIBE_FEED,
         EXTENSION,
+        COPY
     }
 }
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CopyStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CopyStatement.java
new file mode 100644
index 0000000..3baf81a
--- /dev/null
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CopyStatement.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.lang.common.statement;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.lang.common.base.AbstractStatement;
+import org.apache.asterix.lang.common.expression.RecordConstructor;
+import org.apache.asterix.lang.common.util.ExpressionUtils;
+import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
+import org.apache.asterix.object.base.AdmObjectNode;
+
+public class CopyStatement extends AbstractStatement {
+
+    private DataverseName dataverseName;
+    private String datasetName;
+    private ExternalDetailsDecl externalDetails;
+    private AdmObjectNode withObjectNode;
+
+    public CopyStatement(DataverseName dataverseName, String datasetName, ExternalDetailsDecl externalDetails,
+            RecordConstructor withRecord) throws CompilationException {
+        this.dataverseName = dataverseName;
+        this.datasetName = datasetName;
+        this.externalDetails = externalDetails;
+        this.withObjectNode = withRecord == null ? new AdmObjectNode() : ExpressionUtils.toNode(withRecord);
+    }
+
+    public DataverseName getDataverseName() {
+        return dataverseName;
+    }
+
+    public void setDataverseName(DataverseName dataverseName) {
+        this.dataverseName = dataverseName;
+    }
+
+    @Override
+    public Kind getKind() {
+        return Kind.COPY;
+    }
+
+    public String getDatasetName() {
+        return datasetName;
+    }
+
+    public void setDatasetName(String datasetName) {
+        this.datasetName = datasetName;
+    }
+
+    @Override
+    public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationException {
+        return visitor.visit(this, arg);
+    }
+
+    public ExternalDetailsDecl getExternalDetails() {
+        return externalDetails;
+    }
+
+    public void setExternalDetails(ExternalDetailsDecl externalDetails) {
+        this.externalDetails = externalDetails;
+    }
+
+    public AdmObjectNode getWithObjectNode() {
+        return withObjectNode;
+    }
+
+    public void setWithObjectNode(AdmObjectNode withObjectNode) {
+        this.withObjectNode = withObjectNode;
+    }
+
+    @Override
+    public byte getCategory() {
+        return Category.UPDATE;
+    }
+}
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
index 7424398..23455f1 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/FormatPrintVisitor.java
@@ -69,6 +69,7 @@
 import org.apache.asterix.lang.common.statement.AnalyzeStatement;
 import org.apache.asterix.lang.common.statement.CompactStatement;
 import org.apache.asterix.lang.common.statement.ConnectFeedStatement;
+import org.apache.asterix.lang.common.statement.CopyStatement;
 import org.apache.asterix.lang.common.statement.CreateAdapterStatement;
 import org.apache.asterix.lang.common.statement.CreateDataverseStatement;
 import org.apache.asterix.lang.common.statement.CreateFeedPolicyStatement;
@@ -545,6 +546,16 @@
     }
 
     @Override
+    public Void visit(CopyStatement stmtCopy, Integer step) throws CompilationException {
+        out.print(skip(step) + "copy " + datasetSymbol
+                + generateFullName(stmtCopy.getDataverseName(), stmtCopy.getDatasetName()) + " using "
+                + revertStringToQuoted(stmtCopy.getExternalDetails().getAdapter()) + " ");
+        printConfiguration(stmtCopy.getExternalDetails().getProperties());
+        out.println();
+        return null;
+    }
+
+    @Override
     public Void visit(DropDatasetStatement del, Integer step) throws CompilationException {
         out.println(
                 skip(step) + "drop " + datasetSymbol + generateFullName(del.getDataverseName(), del.getDatasetName())
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/AbstractQueryExpressionVisitor.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/AbstractQueryExpressionVisitor.java
index a7444e9..accf476 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/AbstractQueryExpressionVisitor.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/AbstractQueryExpressionVisitor.java
@@ -29,6 +29,7 @@
 import org.apache.asterix.lang.common.statement.AnalyzeStatement;
 import org.apache.asterix.lang.common.statement.CompactStatement;
 import org.apache.asterix.lang.common.statement.ConnectFeedStatement;
+import org.apache.asterix.lang.common.statement.CopyStatement;
 import org.apache.asterix.lang.common.statement.CreateAdapterStatement;
 import org.apache.asterix.lang.common.statement.CreateDataverseStatement;
 import org.apache.asterix.lang.common.statement.CreateFeedPolicyStatement;
@@ -117,6 +118,11 @@
     }
 
     @Override
+    public R visit(CopyStatement stmtCopy, T arg) throws CompilationException {
+        return null;
+    }
+
+    @Override
     public R visit(NodegroupDecl ngd, T arg) throws CompilationException {
         return null;
     }
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/ILangVisitor.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/ILangVisitor.java
index 541567d..9ad247c 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/ILangVisitor.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/base/ILangVisitor.java
@@ -47,6 +47,7 @@
 import org.apache.asterix.lang.common.statement.AnalyzeStatement;
 import org.apache.asterix.lang.common.statement.CompactStatement;
 import org.apache.asterix.lang.common.statement.ConnectFeedStatement;
+import org.apache.asterix.lang.common.statement.CopyStatement;
 import org.apache.asterix.lang.common.statement.CreateAdapterStatement;
 import org.apache.asterix.lang.common.statement.CreateDataverseStatement;
 import org.apache.asterix.lang.common.statement.CreateFeedPolicyStatement;
@@ -101,6 +102,8 @@
 
     R visit(LoadStatement stmtLoad, T arg) throws CompilationException;
 
+    R visit(CopyStatement stmtCopy, T arg) throws CompilationException;
+
     R visit(DropDatasetStatement del, T arg) throws CompilationException;
 
     R visit(InsertStatement insert, T arg) throws CompilationException;
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
index 007e9af..01f72aa 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
+++ b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
@@ -159,6 +159,7 @@
 import org.apache.asterix.lang.common.statement.InsertStatement;
 import org.apache.asterix.lang.common.statement.InternalDetailsDecl;
 import org.apache.asterix.lang.common.statement.LoadStatement;
+import org.apache.asterix.lang.common.statement.CopyStatement;
 import org.apache.asterix.lang.common.statement.NodeGroupDropStatement;
 import org.apache.asterix.lang.common.statement.NodegroupDecl;
 import org.apache.asterix.lang.common.statement.Query;
@@ -929,6 +930,7 @@
     | stmt = FunctionDeclaration()
     | stmt = CreateStatement()
     | stmt = LoadStatement()
+    | stmt = CopyStatement()
     | stmt = DropStatement()
     | stmt = SetStatement()
     | stmt = InsertStatement()
@@ -2714,6 +2716,36 @@
     }
 }
 
+CopyStatement CopyStatement() throws ParseException:
+{
+  Token startToken = null;
+  DataverseName dataverseName = null;
+  Identifier datasetName = null;
+  boolean alreadySorted = false;
+  String adapterName;
+  Map<String,String> properties;
+  Pair<DataverseName,Identifier> nameComponents = null;
+}
+{
+  <COPY> (<INTO>)? { startToken = token; } nameComponents = QualifiedName()
+    {
+      dataverseName = nameComponents.first;
+      datasetName = nameComponents.second;
+    }
+  <USING> adapterName = AdapterName() properties = Configuration()
+    {
+       ExternalDetailsDecl edd = new ExternalDetailsDecl();
+       edd.setAdapter(adapterName);
+       edd.setProperties(properties);
+       try {
+         CopyStatement stmt = new CopyStatement(dataverseName, datasetName.getValue(), edd, null);
+         return addSourceLocation(stmt, startToken);
+       } catch (CompilationException e){
+           throw new SqlppParseException(getSourceLocation(startToken), e.getMessage());
+       }
+    }
+}
+
 LoadStatement LoadStatement() throws ParseException:
 {
   Token startToken = null;
@@ -2743,7 +2775,6 @@
     }
 }
 
-
 String AdapterName() throws ParseException :
 {
   String adapterName = null;
@@ -5709,6 +5740,7 @@
   | <WHERE : "where">
   | <WITH : "with">
   | <WRITE : "write">
+  | <COPY : "copy">
 }
 
 <DEFAULT,IN_DBL_BRACE>