Enhanced Insert AQL

The optional "as Variable" provides a variable binding for the inserted records
The optional "returning Query" allows users to run simple
queries/functions on the records returned by the insert, and can refer
to the variable bound in "as Variable"

Allow commits to be non-sink operators (contnue job pipeline after commit)

Additionally, this change makes small modifications to
the extension code to prepare for the BAD extension

Also made the OptimizerTests able to work for Extensions

Change-Id: I65789d2a861d15232dd29156a6987d0635ec6c94
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1150
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-active/pom.xml b/asterixdb/asterix-active/pom.xml
index 9f4e650..efba47e 100644
--- a/asterixdb/asterix-active/pom.xml
+++ b/asterixdb/asterix-active/pom.xml
@@ -1,3 +1,21 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements. See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership. The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License. You may obtain a copy of the License at
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied.    See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <parent>
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
index 2dd9fe7..d0fb5e8 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
@@ -31,6 +31,6 @@
 
     public EntityId getEntityId();
 
-    public boolean isEntityConnectedToDataset(String dataverseName, String datasetName);
+    public boolean isEntityUsingDataset(String dataverseName, String datasetName);
 
 }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/IExtensionStatement.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/IExtensionStatement.java
index e88962a..d15ae6f 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/IExtensionStatement.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/IExtensionStatement.java
@@ -21,8 +21,11 @@
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.metadata.declared.AqlMetadataProvider;
 import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
+import org.apache.asterix.translator.IStatementExecutor.Stats;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataset.IHyracksDataset;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 /**
@@ -36,15 +39,17 @@
     }
 
     /**
-     * Called when the {@code IQueryTranslator} encounters an extension statement.
+     * Called when the {@code IStatementExecutor} encounters an extension statement.
      * An implementation class should implement the actual processing of the statement in this method.
      *
      * @param queryTranslator
      * @param metadataProvider
      * @param statementExecutor
      * @param hcc
+     * @param resultSetIdCounter
      * @throws Exception
      */
     void handle(IStatementExecutor statementExecutor, AqlMetadataProvider metadataProvider,
-            IHyracksClientConnection hcc) throws HyracksDataException, AlgebricksException;
+            IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
+            int resultSetIdCounter) throws HyracksDataException, AlgebricksException;
 }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/CommitOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/CommitOperator.java
index 8dd6d33..1ee130c 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/CommitOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/CommitOperator.java
@@ -19,23 +19,32 @@
 
 package org.apache.asterix.algebra.operators;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractExtensibleLogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorExtension;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractDelegatedLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorDelegate;
 import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
 
-public class CommitOperator extends AbstractExtensibleLogicalOperator {
+public class CommitOperator extends AbstractDelegatedLogicalOperator {
 
-    private final List<LogicalVariable> primaryKeyLogicalVars;
-    private final LogicalVariable upsertVar;
+    private List<LogicalVariable> primaryKeyLogicalVars;
+    private LogicalVariable upsertVar;
+    private boolean isSink;
 
-    public CommitOperator(List<LogicalVariable> primaryKeyLogicalVars, LogicalVariable upsertVar) {
+    public CommitOperator(boolean isSink) {
+        this.isSink = isSink;
+        this.upsertVar = null;
+        primaryKeyLogicalVars = new ArrayList<>();
+    }
+
+    public CommitOperator(List<LogicalVariable> primaryKeyLogicalVars, LogicalVariable upsertVar, boolean isSink) {
         this.primaryKeyLogicalVars = primaryKeyLogicalVars;
         this.upsertVar = upsertVar;
+        this.isSink = isSink;
     }
 
     @Override
@@ -44,9 +53,23 @@
         return false;
     }
 
+    public boolean isSink() {
+        return isSink;
+    }
+
+    public void setSink(boolean isSink) {
+        this.isSink = isSink;
+    }
+
+    //Provided for Extensions but not used by core
+    public void setVars(List<LogicalVariable> primaryKeyLogicalVars, LogicalVariable upsertVar) {
+        this.primaryKeyLogicalVars = primaryKeyLogicalVars;
+        this.upsertVar = upsertVar;
+    }
+
     @Override
-    public IOperatorExtension newInstance() {
-        return new CommitOperator(primaryKeyLogicalVars, upsertVar);
+    public IOperatorDelegate newInstance() {
+        return new CommitOperator(primaryKeyLogicalVars, upsertVar, isSink);
     }
 
     @Override
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
index 1a26021..5a1c929 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
@@ -47,15 +47,17 @@
     private final String dataverse;
     private final String dataset;
     private final LogicalVariable upsertVar;
+    private final boolean isSink;
 
     public CommitPOperator(JobId jobId, String dataverse, String dataset, int datasetId,
-            List<LogicalVariable> primaryKeyLogicalVars, LogicalVariable upsertVar) {
+            List<LogicalVariable> primaryKeyLogicalVars, LogicalVariable upsertVar, boolean isSink) {
         this.jobId = jobId;
         this.datasetId = datasetId;
         this.primaryKeyLogicalVars = primaryKeyLogicalVars;
         this.upsertVar = upsertVar;
         this.dataverse = dataverse;
         this.dataset = dataset;
+        this.isSink = isSink;
     }
 
     @Override
@@ -105,7 +107,7 @@
         }
         runtime = new CommitRuntimeFactory(jobId, datasetId, primaryKeyFields,
                 metadataProvider.isTemporaryDatasetWriteJob(), metadataProvider.isWriteTransaction(), upsertVarIdx,
-                datasetPartitions);
+                datasetPartitions, isSink);
         builder.contributeMicroOperator(op, runtime, recDesc);
         ILogicalOperator src = op.getInputs().get(0).getValue();
         builder.contributeGraphEdge(src, 0, op, 0);
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
index c6c71f6..a1fa788 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
@@ -31,8 +31,7 @@
 import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.transactions.LogType;
 import org.apache.asterix.common.utils.TransactionUtil;
-import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
-import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
@@ -45,7 +44,7 @@
 import org.apache.hyracks.dataflow.common.util.TaskUtils;
 import org.apache.hyracks.storage.am.bloomfilter.impls.MurmurHash128Bit;
 
-public class CommitRuntime implements IPushRuntime {
+public class CommitRuntime extends AbstractOneInputOneOutputOneFramePushRuntime {
 
     private final static long SEED = 0L;
 
@@ -57,27 +56,27 @@
     protected final boolean isTemporaryDatasetWriteJob;
     protected final boolean isWriteTransaction;
     protected final long[] longHashes;
-    protected final FrameTupleReference frameTupleReference;
     protected final IHyracksTaskContext ctx;
     protected final int resourcePartition;
     protected ITransactionContext transactionContext;
     protected LogRecord logRecord;
-    protected FrameTupleAccessor frameTupleAccessor;
+    protected final boolean isSink;
 
     public CommitRuntime(IHyracksTaskContext ctx, JobId jobId, int datasetId, int[] primaryKeyFields,
-            boolean isTemporaryDatasetWriteJob, boolean isWriteTransaction, int resourcePartition) {
+            boolean isTemporaryDatasetWriteJob, boolean isWriteTransaction, int resourcePartition, boolean isSink) {
         this.ctx = ctx;
-        IAsterixAppRuntimeContext runtimeCtx =
-                (IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
+        IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
+                .getApplicationContext().getApplicationObject();
         this.transactionManager = runtimeCtx.getTransactionSubsystem().getTransactionManager();
         this.logMgr = runtimeCtx.getTransactionSubsystem().getLogManager();
         this.jobId = jobId;
         this.datasetId = datasetId;
         this.primaryKeyFields = primaryKeyFields;
-        this.frameTupleReference = new FrameTupleReference();
+        this.tRef = new FrameTupleReference();
         this.isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob;
         this.isWriteTransaction = isWriteTransaction;
         this.resourcePartition = resourcePartition;
+        this.isSink = isSink;
         longHashes = new long[2];
     }
 
@@ -86,9 +85,14 @@
         try {
             transactionContext = transactionManager.getTransactionContext(jobId, false);
             transactionContext.setWriteTxn(isWriteTransaction);
-            ILogMarkerCallback callback =
-                    TaskUtils.<ILogMarkerCallback> get(ILogMarkerCallback.KEY_MARKER_CALLBACK, ctx);
+            ILogMarkerCallback callback = TaskUtils.<ILogMarkerCallback> get(ILogMarkerCallback.KEY_MARKER_CALLBACK,
+                    ctx);
             logRecord = new LogRecord(callback);
+            if (isSink) {
+                return;
+            }
+            initAccessAppend(ctx);
+            writer.open();
         } catch (ACIDException e) {
             throw new HyracksDataException(e);
         }
@@ -96,26 +100,27 @@
 
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        frameTupleAccessor.reset(buffer);
-        int nTuple = frameTupleAccessor.getTupleCount();
+        tAccess.reset(buffer);
+        int nTuple = tAccess.getTupleCount();
         for (int t = 0; t < nTuple; t++) {
             if (isTemporaryDatasetWriteJob) {
                 /**
-                 * This "if branch" is for writes over temporary datasets.
-                 * A temporary dataset does not require any lock and does not generate any write-ahead
-                 * update and commit log but generates flush log and job commit log.
-                 * However, a temporary dataset still MUST guarantee no-steal policy so that this
-                 * notification call should be delivered to PrimaryIndexOptracker and used correctly in order
-                 * to decrement number of active operation count of PrimaryIndexOptracker.
-                 * By maintaining the count correctly and only allowing flushing when the count is 0, it can
-                 * guarantee the no-steal policy for temporary datasets, too.
+                 * This "if branch" is for writes over temporary datasets. A temporary dataset does not require any lock
+                 * and does not generate any write-ahead update and commit log but generates flush log and job commit
+                 * log. However, a temporary dataset still MUST guarantee no-steal policy so that this notification call
+                 * should be delivered to PrimaryIndexOptracker and used correctly in order to decrement number of
+                 * active operation count of PrimaryIndexOptracker. By maintaining the count correctly and only allowing
+                 * flushing when the count is 0, it can guarantee the no-steal policy for temporary datasets, too.
                  */
                 transactionContext.notifyOptracker(false);
             } else {
-                frameTupleReference.reset(frameTupleAccessor, t);
+                tRef.reset(tAccess, t);
                 try {
                     formLogRecord(buffer, t);
                     logMgr.log(logRecord);
+                    if (!isSink) {
+                        appendTupleToFrame(t);
+                    }
                 } catch (ACIDException e) {
                     throw new HyracksDataException(e);
                 }
@@ -141,8 +146,8 @@
     }
 
     protected void formLogRecord(ByteBuffer buffer, int t) {
-        int pkHash = computePrimaryKeyHashValue(frameTupleReference, primaryKeyFields);
-        TransactionUtil.formEntityCommitLogRecord(logRecord, transactionContext, datasetId, pkHash, frameTupleReference,
+        int pkHash = computePrimaryKeyHashValue(tRef, primaryKeyFields);
+        TransactionUtil.formEntityCommitLogRecord(logRecord, transactionContext, datasetId, pkHash, tRef,
                 primaryKeyFields, resourcePartition, LogType.ENTITY_COMMIT);
     }
 
@@ -153,22 +158,27 @@
 
     @Override
     public void fail() throws HyracksDataException {
-
+        failed = true;
+        if (isSink) {
+            return;
+        }
+        writer.fail();
     }
 
     @Override
     public void close() throws HyracksDataException {
-
-    }
-
-    @Override
-    public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
-        throw new IllegalStateException();
+        if (isSink) {
+            return;
+        }
+        flushIfNotFailed();
+        writer.close();
+        appender.reset(frame, true);
     }
 
     @Override
     public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
-        this.frameTupleAccessor = new FrameTupleAccessor(recordDescriptor);
+        this.inputRecordDesc = recordDescriptor;
+        this.tAccess = new FrameTupleAccessor(inputRecordDesc);
     }
 
     @Override
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntimeFactory.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntimeFactory.java
index 4f28b9d..9486a19 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntimeFactory.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntimeFactory.java
@@ -36,9 +36,10 @@
     private final boolean isWriteTransaction;
     private final int upsertVarIdx;
     private int[] datasetPartitions;
+    private final boolean isSink;
 
     public CommitRuntimeFactory(JobId jobId, int datasetId, int[] primaryKeyFields, boolean isTemporaryDatasetWriteJob,
-            boolean isWriteTransaction, int upsertVarIdx, int[] datasetPartitions) {
+            boolean isWriteTransaction, int upsertVarIdx, int[] datasetPartitions, boolean isSink) {
         this.jobId = jobId;
         this.datasetId = datasetId;
         this.primaryKeyFields = primaryKeyFields;
@@ -46,6 +47,7 @@
         this.isWriteTransaction = isWriteTransaction;
         this.upsertVarIdx = upsertVarIdx;
         this.datasetPartitions = datasetPartitions;
+        this.isSink = isSink;
     }
 
     @Override
@@ -58,10 +60,10 @@
         if (upsertVarIdx >= 0) {
             return new UpsertCommitRuntime(ctx, jobId, datasetId, primaryKeyFields, isTemporaryDatasetWriteJob,
                     isWriteTransaction, datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()],
-                    upsertVarIdx);
+                    upsertVarIdx, isSink);
         } else {
             return new CommitRuntime(ctx, jobId, datasetId, primaryKeyFields, isTemporaryDatasetWriteJob,
-                    isWriteTransaction, datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()]);
+                    isWriteTransaction, datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink);
         }
     }
 }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/UpsertCommitRuntime.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/UpsertCommitRuntime.java
index 7358700..53e0f62 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/UpsertCommitRuntime.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/UpsertCommitRuntime.java
@@ -30,25 +30,25 @@
     private final int upsertIdx;
 
     public UpsertCommitRuntime(IHyracksTaskContext ctx, JobId jobId, int datasetId, int[] primaryKeyFields,
-            boolean isTemporaryDatasetWriteJob, boolean isWriteTransaction, int resourcePartition, int upsertIdx) {
+            boolean isTemporaryDatasetWriteJob, boolean isWriteTransaction, int resourcePartition, int upsertIdx,
+            boolean isSink) {
         super(ctx, jobId, datasetId, primaryKeyFields, isTemporaryDatasetWriteJob, isWriteTransaction,
-                resourcePartition);
+                resourcePartition, isSink);
         this.upsertIdx = upsertIdx;
     }
 
     @Override
     protected void formLogRecord(ByteBuffer buffer, int t) {
-        boolean isNull = ABooleanSerializerDeserializer.getBoolean(buffer.array(),
-                frameTupleAccessor.getFieldSlotsLength() + frameTupleAccessor.getTupleStartOffset(t)
-                        + frameTupleAccessor.getFieldStartOffset(t, upsertIdx) + 1);
+        boolean isNull = ABooleanSerializerDeserializer.getBoolean(buffer.array(), tAccess.getFieldSlotsLength()
+                + tAccess.getTupleStartOffset(t) + tAccess.getFieldStartOffset(t, upsertIdx) + 1);
         if (isNull) {
             // Previous record not found (insert)
             super.formLogRecord(buffer, t);
         } else {
             // Previous record found (delete + insert)
-            int pkHash = computePrimaryKeyHashValue(frameTupleReference, primaryKeyFields);
-            TransactionUtil.formEntityCommitLogRecord(logRecord, transactionContext, datasetId, pkHash,
-                    frameTupleReference, primaryKeyFields, resourcePartition, LogType.UPSERT_ENTITY_COMMIT);
+            int pkHash = computePrimaryKeyHashValue(tRef, primaryKeyFields);
+            TransactionUtil.formEntityCommitLogRecord(logRecord, transactionContext, datasetId, pkHash, tRef,
+                    primaryKeyFields, resourcePartition, LogType.UPSERT_ENTITY_COMMIT);
         }
     }
 }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
index cd8d747..56a3bd0 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
@@ -66,7 +66,7 @@
 import org.apache.asterix.optimizer.rules.RemoveRedundantSelectRule;
 import org.apache.asterix.optimizer.rules.RemoveSortInFeedIngestionRule;
 import org.apache.asterix.optimizer.rules.RemoveUnusedOneToOneEquiJoinRule;
-import org.apache.asterix.optimizer.rules.ReplaceSinkOpWithCommitOpRule;
+import org.apache.asterix.optimizer.rules.SetupCommitExtensionOpRule;
 import org.apache.asterix.optimizer.rules.ResolveVariableRule;
 import org.apache.asterix.optimizer.rules.SetAsterixPhysicalOperatorsRule;
 import org.apache.asterix.optimizer.rules.SetClosedRecordConstructorsRule;
@@ -317,7 +317,7 @@
         List<IAlgebraicRewriteRule> physicalRewritesAllLevels = new LinkedList<>();
         physicalRewritesAllLevels.add(new PullSelectOutOfEqJoin());
         //Turned off the following rule for now not to change OptimizerTest results.
-        physicalRewritesAllLevels.add(new ReplaceSinkOpWithCommitOpRule());
+        physicalRewritesAllLevels.add(new SetupCommitExtensionOpRule());
         physicalRewritesAllLevels.add(new SetAlgebricksPhysicalOperatorsRule());
         physicalRewritesAllLevels.add(new SetAsterixPhysicalOperatorsRule());
         physicalRewritesAllLevels.add(new AddEquivalenceClassForRecordConstructorRule());
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceAutogenerateIDRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceAutogenerateIDRule.java
index fee1c96..c8d2760 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceAutogenerateIDRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceAutogenerateIDRule.java
@@ -18,9 +18,11 @@
  */
 package org.apache.asterix.optimizer.rules;
 
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.asterix.algebra.operators.CommitOperator;
 import org.apache.asterix.lang.common.util.FunctionUtil;
 import org.apache.asterix.metadata.declared.AqlDataSource;
 import org.apache.asterix.metadata.declared.AqlDataSource.AqlDataSourceType;
@@ -44,6 +46,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
@@ -62,13 +65,35 @@
     public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
             throws AlgebricksException {
 
-        // match: [insert to internal dataset with autogenerated id] - assign - project
+        // match: commit OR distribute-result OR SINK - ... followed by:
+        // [insert to internal dataset with autogenerated id] - assign - project
         // produce: insert - assign - assign* - project
         // **
         // OR [insert to internal dataset with autogenerated id] - assign - [datasource scan]
         // produce insert - assign - assign* - datasource scan
 
         AbstractLogicalOperator currentOp = (AbstractLogicalOperator) opRef.getValue();
+        if (currentOp.getOperatorTag() == LogicalOperatorTag.DELEGATE_OPERATOR) {
+            DelegateOperator dOp = (DelegateOperator) currentOp;
+            if (!(dOp.getDelegate() instanceof CommitOperator)) {
+                return false;
+            } else if (!((CommitOperator) dOp.getDelegate()).isSink()) {
+                return false;
+            }
+
+        } else if (currentOp.getOperatorTag() != LogicalOperatorTag.DISTRIBUTE_RESULT
+                && currentOp.getOperatorTag() != LogicalOperatorTag.SINK) {
+            return false;
+        }
+        ArrayDeque<AbstractLogicalOperator> opStack = new ArrayDeque<>();
+        opStack.push(currentOp);
+        while (currentOp.getInputs().size() == 1) {
+            currentOp = (AbstractLogicalOperator) currentOp.getInputs().get(0).getValue();
+            if (currentOp.getOperatorTag() == LogicalOperatorTag.INSERT_DELETE_UPSERT) {
+                break;
+            }
+            opStack.push(currentOp);
+        }
         if (currentOp.getOperatorTag() != LogicalOperatorTag.INSERT_DELETE_UPSERT) {
             return false;
         }
@@ -95,7 +120,8 @@
         AssignOperator assignOp = (AssignOperator) parentOp;
         LogicalVariable inputRecord;
 
-        //bug here. will not work for internal datasets with filters since the pattern becomes [project-assign-assign-insert] <-this should be fixed->
+        //TODO: bug here. will not work for internal datasets with filters since the pattern becomes 
+        //[project-assign-assign-insert]
         AbstractLogicalOperator grandparentOp = (AbstractLogicalOperator) parentOp.getInputs().get(0).getValue();
         if (grandparentOp.getOperatorTag() == LogicalOperatorTag.PROJECT) {
             ProjectOperator projectOp = (ProjectOperator) grandparentOp;
@@ -122,7 +148,12 @@
         VariableUtilities.substituteVariables(insertOp, inputRecord, v, context);
         context.computeAndSetTypeEnvironmentForOperator(newAssign);
         context.computeAndSetTypeEnvironmentForOperator(assignOp);
-        context.computeAndSetTypeEnvironmentForOperator(insertOp);
+        context.computeAndSetTypeEnvironmentForOperator(insertOp);;
+        for (AbstractLogicalOperator op : opStack) {
+            VariableUtilities.substituteVariables(op, inputRecord, v, context);
+            context.computeAndSetTypeEnvironmentForOperator(op);
+        }
+
         return true;
     }
 
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceDynamicTypeCastRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceDynamicTypeCastRule.java
index f6cd015..28bcc7f 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceDynamicTypeCastRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceDynamicTypeCastRule.java
@@ -22,6 +22,7 @@
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.asterix.algebra.operators.CommitOperator;
 import org.apache.asterix.lang.common.util.FunctionUtil;
 import org.apache.asterix.metadata.declared.AqlDataSource;
 import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
@@ -46,6 +47,8 @@
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
@@ -90,11 +93,19 @@
         // We identify INSERT and DISTRIBUTE_RESULT operators.
         AbstractLogicalOperator op1 = (AbstractLogicalOperator) opRef.getValue();
         switch (op1.getOperatorTag()) {
-            case SINK: {
+            case SINK:
+            case DELEGATE_OPERATOR: {
                 /**
-                 * pattern match: sink insert assign
-                 * resulting plan: sink-insert-project-assign
+                 * pattern match: commit insert assign
+                 * resulting plan: commit-insert-project-assign
                  */
+                if (op1.getOperatorTag() == LogicalOperatorTag.DELEGATE_OPERATOR) {
+                    DelegateOperator eOp = (DelegateOperator) op1;
+                    if (!(eOp.getDelegate() instanceof CommitOperator)) {
+                        return false;
+                    }
+                }
+
                 AbstractLogicalOperator op2 = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
                 if (op2.getOperatorTag() == LogicalOperatorTag.INSERT_DELETE_UPSERT) {
                     InsertDeleteUpsertOperator insertDeleteOp = (InsertDeleteUpsertOperator) op2;
@@ -131,21 +142,8 @@
                 // Remember this is the operator we need to modify
                 op = op1;
 
-                // The Variable we want is the (hopefully singular, hopefully record-typed) live variable
-                // of the singular input operator of the DISTRIBUTE_RESULT
-                if (op.getInputs().size() > 1) {
-                    // Hopefully not possible?
-                    throw new AlgebricksException(
-                            "output-record-type defined for expression with multiple input operators");
-                }
-                AbstractLogicalOperator input = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
-                List<LogicalVariable> liveVars = new ArrayList<>();
-                VariableUtilities.getLiveVariables(input, liveVars);
-                if (liveVars.size() > 1) {
-                    throw new AlgebricksException(
-                            "Expression with multiple fields cannot be cast to output-record-type!");
-                }
-                recordVar = liveVars.get(0);
+                recordVar = ((VariableReferenceExpression) ((DistributeResultOperator) op).getExpressions().get(0)
+                        .getValue()).getVariableReference();
                 break;
             }
             default: {
@@ -210,15 +208,15 @@
                 if (var.equals(recordVar)) {
                     /** insert an assign operator to call the function on-top-of the variable */
                     IAType actualType = (IAType) env.getVarType(var);
-                    AbstractFunctionCallExpression cast =
-                            new ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(fd));
+                    AbstractFunctionCallExpression cast = new ScalarFunctionCallExpression(
+                            FunctionUtil.getFunctionInfo(fd));
                     cast.getArguments()
                             .add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(var)));
                     /** enforce the required record type */
                     TypeCastUtils.setRequiredAndInputTypes(cast, requiredRecordType, actualType);
                     LogicalVariable newAssignVar = context.newVar();
-                    AssignOperator newAssignOperator =
-                            new AssignOperator(newAssignVar, new MutableObject<ILogicalExpression>(cast));
+                    AssignOperator newAssignOperator = new AssignOperator(newAssignVar,
+                            new MutableObject<ILogicalExpression>(cast));
                     newAssignOperator.getInputs().add(new MutableObject<ILogicalOperator>(op));
                     opRef.setValue(newAssignOperator);
                     context.computeAndSetTypeEnvironmentForOperator(newAssignOperator);
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRapidFrameFlushProjectAssignRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRapidFrameFlushProjectAssignRule.java
index a8368a7..f655b24 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRapidFrameFlushProjectAssignRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRapidFrameFlushProjectAssignRule.java
@@ -27,7 +27,7 @@
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.AssignPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.StreamProjectPOperator;
@@ -50,10 +50,10 @@
     }
 
     private boolean checkIfRuleIsApplicable(AbstractLogicalOperator op) {
-        if (op.getOperatorTag() != LogicalOperatorTag.EXTENSION_OPERATOR) {
+        if (op.getOperatorTag() != LogicalOperatorTag.DELEGATE_OPERATOR) {
             return false;
         }
-        ExtensionOperator extensionOp = (ExtensionOperator) op;
+        DelegateOperator extensionOp = (DelegateOperator) op;
         if (!(extensionOp.getDelegate() instanceof CommitOperator)) {
             return false;
         }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
index c487a96..8362dd7 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
@@ -24,6 +24,7 @@
 import java.util.List;
 import java.util.Map;
 
+import org.apache.asterix.algebra.operators.CommitOperator;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
 import org.apache.asterix.common.exceptions.AsterixException;
@@ -66,6 +67,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
@@ -90,20 +92,28 @@
     @Override
     public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
             throws AlgebricksException {
-        AbstractLogicalOperator sinkOp = (AbstractLogicalOperator) opRef.getValue();
-        if (sinkOp.getOperatorTag() != LogicalOperatorTag.SINK) {
+        AbstractLogicalOperator op0 = (AbstractLogicalOperator) opRef.getValue();
+        if (op0.getOperatorTag() != LogicalOperatorTag.DELEGATE_OPERATOR
+                && op0.getOperatorTag() != LogicalOperatorTag.SINK) {
             return false;
         }
-        if (sinkOp.getInputs().get(0).getValue().getOperatorTag() != LogicalOperatorTag.INSERT_DELETE_UPSERT) {
+        if (op0.getOperatorTag() == LogicalOperatorTag.DELEGATE_OPERATOR) {
+            DelegateOperator eOp = (DelegateOperator) op0;
+            if (!(eOp.getDelegate() instanceof CommitOperator)) {
+                return false;
+            }
+        }
+        AbstractLogicalOperator op1 = (AbstractLogicalOperator) op0.getInputs().get(0).getValue();
+        if (op1.getOperatorTag() != LogicalOperatorTag.INSERT_DELETE_UPSERT) {
             return false;
         }
         /** find the record variable */
-        InsertDeleteUpsertOperator primaryIndexModificationOp =
-                (InsertDeleteUpsertOperator) sinkOp.getInputs().get(0).getValue();
+        InsertDeleteUpsertOperator primaryIndexModificationOp = (InsertDeleteUpsertOperator) op0.getInputs().get(0)
+                .getValue();
         boolean isBulkload = primaryIndexModificationOp.isBulkload();
         ILogicalExpression newRecordExpr = primaryIndexModificationOp.getPayloadExpression().getValue();
-        List<Mutable<ILogicalExpression>> newMetaExprs =
-                primaryIndexModificationOp.getAdditionalNonFilteringExpressions();
+        List<Mutable<ILogicalExpression>> newMetaExprs = primaryIndexModificationOp
+                .getAdditionalNonFilteringExpressions();
         LogicalVariable newRecordVar;
         LogicalVariable newMetaVar = null;
 
@@ -111,9 +121,8 @@
          * inputOp is the assign operator which extracts primary keys from the input
          * variables (record or meta)
          */
-        AbstractLogicalOperator inputOp =
-                (AbstractLogicalOperator) primaryIndexModificationOp.getInputs().get(0).getValue();
-
+        AbstractLogicalOperator inputOp = (AbstractLogicalOperator) primaryIndexModificationOp.getInputs().get(0)
+                .getValue();
         newRecordVar = getRecordVar(context, inputOp, newRecordExpr, 0);
         if (newMetaExprs != null && !newMetaExprs.isEmpty()) {
             if (newMetaExprs.size() > 1) {
@@ -168,7 +177,7 @@
         // At this point, we have the data type info, and the indexes info as well
         int secondaryIndexTotalCnt = indexes.size() - 1;
         if (secondaryIndexTotalCnt > 0) {
-            sinkOp.getInputs().clear();
+            op0.getInputs().clear();
         } else {
             return false;
         }
@@ -226,8 +235,8 @@
                      * is solved
                      */
                     || primaryIndexModificationOp.getOperation() == Kind.DELETE) {
-                injectFieldAccessesForIndexes(context, dataset, indexes, fieldVarsForNewRecord, recType,
-                        metaType, newRecordVar, newMetaVar, primaryIndexModificationOp, false);
+                injectFieldAccessesForIndexes(context, dataset, indexes, fieldVarsForNewRecord, recType, metaType,
+                        newRecordVar, newMetaVar, primaryIndexModificationOp, false);
                 if (replicateOp != null) {
                     context.computeAndSetTypeEnvironmentForOperator(replicateOp);
                 }
@@ -237,13 +246,12 @@
              * https://issues.apache.org/jira/browse/ASTERIXDB-1507
              * is solved
              */) {
-                List<LogicalVariable> beforeOpMetaVars =
-                        primaryIndexModificationOp.getBeforeOpAdditionalNonFilteringVars();
+                List<LogicalVariable> beforeOpMetaVars = primaryIndexModificationOp
+                        .getBeforeOpAdditionalNonFilteringVars();
                 LogicalVariable beforeOpMetaVar = beforeOpMetaVars == null ? null : beforeOpMetaVars.get(0);
-                currentTop =
-                        injectFieldAccessesForIndexes(context, dataset, indexes, fieldVarsForBeforeOperation, recType,
-                                metaType, primaryIndexModificationOp.getBeforeOpRecordVar(), beforeOpMetaVar,
-                                currentTop, true);
+                currentTop = injectFieldAccessesForIndexes(context, dataset, indexes, fieldVarsForBeforeOperation,
+                        recType, metaType, primaryIndexModificationOp.getBeforeOpRecordVar(), beforeOpMetaVar,
+                        currentTop, true);
             }
         } catch (AsterixException e) {
             throw new AlgebricksException(e);
@@ -264,12 +272,11 @@
             ILogicalOperator replicateOutput;
 
             for (int i = 0; i < secondaryKeyFields.size(); i++) {
-                IndexFieldId indexFieldId =
-                        new IndexFieldId(index.getKeyFieldSourceIndicators().get(i), secondaryKeyFields.get(i));
+                IndexFieldId indexFieldId = new IndexFieldId(index.getKeyFieldSourceIndicators().get(i),
+                        secondaryKeyFields.get(i));
                 LogicalVariable skVar = fieldVarsForNewRecord.get(indexFieldId);
                 secondaryKeyVars.add(skVar);
-                secondaryExpressions.add(new MutableObject<ILogicalExpression>(
-                        new VariableReferenceExpression(skVar)));
+                secondaryExpressions.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(skVar)));
                 if (primaryIndexModificationOp.getOperation() == Kind.UPSERT) {
                     beforeOpSecondaryExpressions.add(new MutableObject<ILogicalExpression>(
                             new VariableReferenceExpression(fieldVarsForBeforeOperation.get(indexFieldId))));
@@ -279,10 +286,10 @@
             IndexInsertDeleteUpsertOperator indexUpdate;
             if (index.getIndexType() != IndexType.RTREE) {
                 // Create an expression per key
-                Mutable<ILogicalExpression> filterExpression =
-                        (primaryIndexModificationOp.getOperation() == Kind.UPSERT) ? null
-                                : createFilterExpression(secondaryKeyVars,
-                                        context.getOutputTypeEnvironment(currentTop), index.isEnforcingKeyFileds());
+                Mutable<ILogicalExpression> filterExpression = (primaryIndexModificationOp
+                        .getOperation() == Kind.UPSERT) ? null
+                                : createFilterExpression(secondaryKeyVars, context.getOutputTypeEnvironment(currentTop),
+                                        index.isEnforcingKeyFileds());
                 AqlIndex dataSourceIndex = new AqlIndex(index, dataverseName, datasetName, mp);
 
                 // Introduce the TokenizeOperator only when doing bulk-load,
@@ -311,8 +318,7 @@
                     // Check the field type of the secondary key.
                     IAType secondaryKeyType;
                     Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(
-                            index.getKeyFieldTypes().get(0), secondaryKeyFields.get(0),
-                            recType);
+                            index.getKeyFieldTypes().get(0), secondaryKeyFields.get(0), recType);
                     secondaryKeyType = keyPairType.first;
 
                     List<Object> varTypes = new ArrayList<>();
@@ -333,8 +339,7 @@
                     // TokenizeOperator to tokenize [SK, PK] pairs
                     TokenizeOperator tokenUpdate = new TokenizeOperator(dataSourceIndex,
                             primaryIndexModificationOp.getPrimaryKeyExpressions(), secondaryExpressions,
-                            tokenizeKeyVars,
-                            filterExpression, primaryIndexModificationOp.getOperation(),
+                            tokenizeKeyVars, filterExpression, primaryIndexModificationOp.getOperation(),
                             primaryIndexModificationOp.isBulkload(), isPartitioned, varTypes);
                     tokenUpdate.getInputs().add(new MutableObject<ILogicalOperator>(currentTop));
                     context.computeAndSetTypeEnvironmentForOperator(tokenUpdate);
@@ -350,8 +355,8 @@
                     // When TokenizeOperator is not needed
                     indexUpdate = new IndexInsertDeleteUpsertOperator(dataSourceIndex,
                             primaryIndexModificationOp.getPrimaryKeyExpressions(), secondaryExpressions,
-                            filterExpression,
-                            primaryIndexModificationOp.getOperation(), primaryIndexModificationOp.isBulkload(),
+                            filterExpression, primaryIndexModificationOp.getOperation(),
+                            primaryIndexModificationOp.isBulkload(),
                             primaryIndexModificationOp.getAdditionalNonFilteringExpressions() == null ? 0
                                     : primaryIndexModificationOp.getAdditionalNonFilteringExpressions().size());
                     indexUpdate.setAdditionalFilteringExpressions(filteringExpressions);
@@ -360,8 +365,8 @@
                     if (primaryIndexModificationOp.getOperation() == Kind.UPSERT) {
                         indexUpdate.setBeforeOpSecondaryKeyExprs(beforeOpSecondaryExpressions);
                         if (filteringFields != null) {
-                            indexUpdate.setBeforeOpAdditionalFilteringExpression(new MutableObject<ILogicalExpression>(
-                                    new VariableReferenceExpression(
+                            indexUpdate.setBeforeOpAdditionalFilteringExpression(
+                                    new MutableObject<ILogicalExpression>(new VariableReferenceExpression(
                                             primaryIndexModificationOp.getBeforeOpFilterVar())));
                         }
                     }
@@ -472,7 +477,7 @@
             }
             if (primaryIndexModificationOp.isBulkload()) {
                 // For bulk load, we connect all fanned out insert operator to a single SINK operator
-                sinkOp.getInputs().add(new MutableObject<ILogicalOperator>(indexUpdate));
+                op0.getInputs().add(new MutableObject<ILogicalOperator>(indexUpdate));
             }
 
         }
@@ -483,16 +488,15 @@
         if (!primaryIndexModificationOp.isBulkload()) {
             // If this is an upsert, we need to
             // Remove the current input to the SINK operator (It is actually already removed above)
-            sinkOp.getInputs().clear();
+            op0.getInputs().clear();
             // Connect the last index update to the SINK
-            sinkOp.getInputs().add(new MutableObject<ILogicalOperator>(currentTop));
+            op0.getInputs().add(new MutableObject<ILogicalOperator>(currentTop));
         }
         return true;
     }
 
     private LogicalVariable getRecordVar(IOptimizationContext context, AbstractLogicalOperator inputOp,
-            ILogicalExpression recordExpr,
-            int expectedRecordIndex) throws AlgebricksException {
+            ILogicalExpression recordExpr, int expectedRecordIndex) throws AlgebricksException {
         if (exprIsRecord(context.getOutputTypeEnvironment(inputOp), recordExpr)) {
             return ((VariableReferenceExpression) recordExpr).getVariableReference();
         } else {
@@ -554,12 +558,10 @@
                 ARecordType sourceType = dataset.hasMetaPart()
                         ? indicators.get(i).intValue() == Index.RECORD_INDICATOR ? recType : metaType : recType;
                 LogicalVariable sourceVar = dataset.hasMetaPart()
-                        ? indicators.get(i).intValue() == Index.RECORD_INDICATOR ? recordVar : metaVar
-                        : recordVar;
+                        ? indicators.get(i).intValue() == Index.RECORD_INDICATOR ? recordVar : metaVar : recordVar;
                 LogicalVariable fieldVar = context.newVar();
                 // create record variable ref
-                Mutable<ILogicalExpression> varRef =
-                        new MutableObject<>(new VariableReferenceExpression(sourceVar));
+                Mutable<ILogicalExpression> varRef = new MutableObject<>(new VariableReferenceExpression(sourceVar));
                 IAType fieldType = sourceType.getSubFieldType(indexFieldId.fieldName);
                 AbstractFunctionCallExpression theFieldAccessFunc;
                 if (fieldType == null) {
@@ -567,24 +569,22 @@
                     // make handling of records with incorrect value type for this field easier and cleaner
                     context.addNotToBeInlinedVar(fieldVar);
                     // create field access
-                    AbstractFunctionCallExpression fieldAccessFunc =
-                            getOpenOrNestedFieldAccessFunction(varRef, indexFieldId.fieldName);
+                    AbstractFunctionCallExpression fieldAccessFunc = getOpenOrNestedFieldAccessFunction(varRef,
+                            indexFieldId.fieldName);
                     // create cast
                     theFieldAccessFunc = new ScalarFunctionCallExpression(
                             FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.CAST_TYPE));
                     // The first argument is the field
-                    theFieldAccessFunc.getArguments()
-                            .add(new MutableObject<ILogicalExpression>(fieldAccessFunc));
-                    TypeCastUtils.setRequiredAndInputTypes(theFieldAccessFunc, skTypes.get(i),
-                            BuiltinType.ANY);
+                    theFieldAccessFunc.getArguments().add(new MutableObject<ILogicalExpression>(fieldAccessFunc));
+                    TypeCastUtils.setRequiredAndInputTypes(theFieldAccessFunc, skTypes.get(i), BuiltinType.ANY);
                 } else {
                     // Get the desired field position
                     int pos = indexFieldId.fieldName.size() > 1 ? -1
                             : sourceType.getFieldIndex(indexFieldId.fieldName.get(0));
                     // Field not found --> This is either an open field or a nested field. it can't be accessed by index
-                    theFieldAccessFunc =
-                            (pos == -1) ? getOpenOrNestedFieldAccessFunction(varRef, indexFieldId.fieldName)
-                                    : getClosedFieldAccessFunction(varRef, pos);
+                    theFieldAccessFunc = (pos == -1)
+                            ? getOpenOrNestedFieldAccessFunction(varRef, indexFieldId.fieldName)
+                            : getClosedFieldAccessFunction(varRef, pos);
                 }
                 vars.add(fieldVar);
                 exprs.add(new MutableObject<ILogicalExpression>(theFieldAccessFunc));
@@ -648,8 +648,7 @@
     }
 
     private static Mutable<ILogicalExpression> constantToMutableLogicalExpression(IAObject constantObject) {
-        return new MutableObject<>(
-                new ConstantExpression(new AsterixConstantValue(constantObject)));
+        return new MutableObject<>(new ConstantExpression(new AsterixConstantValue(constantObject)));
     }
 
     private Mutable<ILogicalExpression> createFilterExpression(List<LogicalVariable> secondaryKeyVars,
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceStaticTypeCastForInsertRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceStaticTypeCastForInsertRule.java
index 89280be..2eaad98 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceStaticTypeCastForInsertRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceStaticTypeCastForInsertRule.java
@@ -22,6 +22,7 @@
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.asterix.algebra.operators.CommitOperator;
 import org.apache.asterix.metadata.declared.AqlDataSource;
 import org.apache.asterix.om.typecomputer.base.TypeCastUtils;
 import org.apache.asterix.om.types.IAType;
@@ -38,6 +39,7 @@
 import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
@@ -87,9 +89,16 @@
         List<LogicalVariable> producedVariables = new ArrayList<LogicalVariable>();
         LogicalVariable oldRecordVariable;
 
-        if (op1.getOperatorTag() != LogicalOperatorTag.SINK) {
+        if (op1.getOperatorTag() != LogicalOperatorTag.DELEGATE_OPERATOR
+                && op1.getOperatorTag() != LogicalOperatorTag.SINK) {
             return false;
         }
+        if (op1.getOperatorTag() == LogicalOperatorTag.DELEGATE_OPERATOR) {
+            DelegateOperator eOp = (DelegateOperator) op1;
+            if (!(eOp.getDelegate() instanceof CommitOperator)) {
+                return false;
+            }
+        }
         AbstractLogicalOperator op2 = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
         if (op2.getOperatorTag() != LogicalOperatorTag.INSERT_DELETE_UPSERT) {
             return false;
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
similarity index 79%
rename from asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java
rename to asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
index 0c36d0b..4bbfce0 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
@@ -41,14 +41,13 @@
 import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
-public class ReplaceSinkOpWithCommitOpRule implements IAlgebraicRewriteRule {
+public class SetupCommitExtensionOpRule implements IAlgebraicRewriteRule {
 
     @Override
     public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
@@ -61,30 +60,32 @@
             throws AlgebricksException {
 
         AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
-        if (op.getOperatorTag() != LogicalOperatorTag.SINK) {
+        if (op.getOperatorTag() != LogicalOperatorTag.DELEGATE_OPERATOR) {
             return false;
         }
-        SinkOperator sinkOperator = (SinkOperator) op;
+        DelegateOperator eOp = (DelegateOperator) op;
+        if (!(eOp.getDelegate() instanceof CommitOperator)) {
+            return false;
+        }
+        boolean isSink = ((CommitOperator) eOp.getDelegate()).isSink();
 
         List<Mutable<ILogicalExpression>> primaryKeyExprs = null;
         int datasetId = 0;
         String dataverse = null;
         String datasetName = null;
-        AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) sinkOperator.getInputs().get(0).getValue();
+        AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) eOp.getInputs().get(0).getValue();
         LogicalVariable upsertVar = null;
-        AssignOperator upsertFlagAssign = null;
         while (descendantOp != null) {
             if (descendantOp.getOperatorTag() == LogicalOperatorTag.INDEX_INSERT_DELETE_UPSERT) {
-                IndexInsertDeleteUpsertOperator indexInsertDeleteUpsertOperator = (IndexInsertDeleteUpsertOperator) descendantOp;
-                if (!indexInsertDeleteUpsertOperator.isBulkload()
-                        && indexInsertDeleteUpsertOperator.getPrevSecondaryKeyExprs() == null) {
-                    primaryKeyExprs = indexInsertDeleteUpsertOperator.getPrimaryKeyExpressions();
-                    datasetId = ((DatasetDataSource) indexInsertDeleteUpsertOperator.getDataSourceIndex()
-                            .getDataSource()).getDataset().getDatasetId();
-                    dataverse = ((DatasetDataSource) indexInsertDeleteUpsertOperator.getDataSourceIndex()
-                            .getDataSource()).getDataset().getDataverseName();
-                    datasetName = ((DatasetDataSource) indexInsertDeleteUpsertOperator.getDataSourceIndex()
-                            .getDataSource()).getDataset().getDatasetName();
+                IndexInsertDeleteUpsertOperator operator = (IndexInsertDeleteUpsertOperator) descendantOp;
+                if (!operator.isBulkload() && operator.getPrevSecondaryKeyExprs() == null) {
+                    primaryKeyExprs = operator.getPrimaryKeyExpressions();
+                    datasetId = ((DatasetDataSource) operator.getDataSourceIndex().getDataSource()).getDataset()
+                            .getDatasetId();
+                    dataverse = ((DatasetDataSource) operator.getDataSourceIndex().getDataSource()).getDataset()
+                            .getDataverseName();
+                    datasetName = ((DatasetDataSource) operator.getDataSourceIndex().getDataSource()).getDataset()
+                            .getDatasetName();
                     break;
                 }
             } else if (descendantOp.getOperatorTag() == LogicalOperatorTag.INSERT_DELETE_UPSERT) {
@@ -115,18 +116,19 @@
                         orFunc.getArguments().add(new MutableObject<ILogicalExpression>(isNewMissingFunc));
 
                         // AssignOperator puts in the cast var the casted record
-                        upsertFlagAssign = new AssignOperator(upsertVar, new MutableObject<ILogicalExpression>(orFunc));
+                        AssignOperator upsertFlagAssign = new AssignOperator(upsertVar,
+                                new MutableObject<ILogicalExpression>(orFunc));
                         // Connect the current top of the plan to the cast operator
                         upsertFlagAssign.getInputs()
-                                .add(new MutableObject<ILogicalOperator>(sinkOperator.getInputs().get(0).getValue()));
-                        sinkOperator.getInputs().clear();
-                        sinkOperator.getInputs().add(new MutableObject<ILogicalOperator>(upsertFlagAssign));
+                                .add(new MutableObject<ILogicalOperator>(eOp.getInputs().get(0).getValue()));
+                        eOp.getInputs().clear();
+                        eOp.getInputs().add(new MutableObject<ILogicalOperator>(upsertFlagAssign));
                         context.computeAndSetTypeEnvironmentForOperator(upsertFlagAssign);
                     }
                     break;
                 }
             }
-            if (descendantOp.getInputs().size() < 1) {
+            if (descendantOp.getInputs().isEmpty()) {
                 break;
             }
             descendantOp = (AbstractLogicalOperator) descendantOp.getInputs().get(0).getValue();
@@ -137,7 +139,7 @@
         }
 
         //copy primaryKeyExprs
-        List<LogicalVariable> primaryKeyLogicalVars = new ArrayList<LogicalVariable>();
+        List<LogicalVariable> primaryKeyLogicalVars = new ArrayList<>();
         for (Mutable<ILogicalExpression> expr : primaryKeyExprs) {
             VariableReferenceExpression varRefExpr = (VariableReferenceExpression) expr.getValue();
             primaryKeyLogicalVars.add(new LogicalVariable(varRefExpr.getVariableReference().getId()));
@@ -148,17 +150,17 @@
         JobId jobId = mp.getJobId();
 
         //create the logical and physical operator
-        CommitOperator commitOperator = new CommitOperator(primaryKeyLogicalVars, upsertVar);
+        CommitOperator commitOperator = new CommitOperator(primaryKeyLogicalVars, upsertVar, isSink);
         CommitPOperator commitPOperator = new CommitPOperator(jobId, dataverse, datasetName, datasetId,
-                primaryKeyLogicalVars, upsertVar);
+                primaryKeyLogicalVars, upsertVar, isSink);
         commitOperator.setPhysicalOperator(commitPOperator);
 
         //create ExtensionOperator and put the commitOperator in it.
-        ExtensionOperator extensionOperator = new ExtensionOperator(commitOperator);
+        DelegateOperator extensionOperator = new DelegateOperator(commitOperator);
         extensionOperator.setPhysicalOperator(commitPOperator);
 
         //update plan link
-        extensionOperator.getInputs().add(sinkOperator.getInputs().get(0));
+        extensionOperator.getInputs().add(eOp.getInputs().get(0));
         context.computeAndSetTypeEnvironmentForOperator(extensionOperator);
         opRef.setValue(extensionOperator);
         return true;
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java
index 4a79387..b1f646a 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java
@@ -36,7 +36,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -187,7 +187,7 @@
         }
 
         @Override
-        public Void visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException {
+        public Void visitDelegateOperator(DelegateOperator op, Void arg) throws AlgebricksException {
             return null;
         }
 
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
index 17dec7c..c033214 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
@@ -23,7 +23,7 @@
 
 import org.apache.asterix.algebra.extension.IAlgebraExtensionManager;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.external.feed.watch.FeedActivity.FeedActivityDetails;
+import org.apache.asterix.external.feed.watch.FeedActivityDetails;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.util.FeedUtils;
 import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java
index 16ac80d..ec29b53 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java
@@ -24,7 +24,6 @@
 import java.util.List;
 import java.util.Map;
 
-import org.apache.asterix.om.util.ConstantExpressionUtil;
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
 import org.apache.asterix.dataflow.data.common.AqlExpressionTypeComputer;
 import org.apache.asterix.metadata.api.IMetadataEntity;
@@ -39,6 +38,7 @@
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.om.types.hierachy.ATypeHierarchy;
+import org.apache.asterix.om.util.ConstantExpressionUtil;
 import org.apache.asterix.optimizer.rules.am.OptimizableOperatorSubTree.DataSourceType;
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -72,15 +72,15 @@
     private AqlMetadataProvider metadataProvider;
 
     // Function Identifier sets that retain the original field variable through each function's arguments
-    private final ImmutableSet<FunctionIdentifier> funcIDSetThatRetainFieldName =
-            ImmutableSet.of(AsterixBuiltinFunctions.WORD_TOKENS, AsterixBuiltinFunctions.GRAM_TOKENS,
-                    AsterixBuiltinFunctions.SUBSTRING, AsterixBuiltinFunctions.SUBSTRING_BEFORE,
-                    AsterixBuiltinFunctions.SUBSTRING_AFTER, AsterixBuiltinFunctions.CREATE_POLYGON,
-                    AsterixBuiltinFunctions.CREATE_MBR, AsterixBuiltinFunctions.CREATE_RECTANGLE,
-                    AsterixBuiltinFunctions.CREATE_CIRCLE, AsterixBuiltinFunctions.CREATE_LINE,
-                    AsterixBuiltinFunctions.CREATE_POINT, AsterixBuiltinFunctions.NUMERIC_ADD,
-                    AsterixBuiltinFunctions.NUMERIC_SUBTRACT, AsterixBuiltinFunctions.NUMERIC_MULTIPLY,
-                    AsterixBuiltinFunctions.NUMERIC_DIVIDE, AsterixBuiltinFunctions.NUMERIC_MOD);
+    private final ImmutableSet<FunctionIdentifier> funcIDSetThatRetainFieldName = ImmutableSet.of(
+            AsterixBuiltinFunctions.WORD_TOKENS, AsterixBuiltinFunctions.GRAM_TOKENS, AsterixBuiltinFunctions.SUBSTRING,
+            AsterixBuiltinFunctions.SUBSTRING_BEFORE, AsterixBuiltinFunctions.SUBSTRING_AFTER,
+            AsterixBuiltinFunctions.CREATE_POLYGON, AsterixBuiltinFunctions.CREATE_MBR,
+            AsterixBuiltinFunctions.CREATE_RECTANGLE, AsterixBuiltinFunctions.CREATE_CIRCLE,
+            AsterixBuiltinFunctions.CREATE_LINE, AsterixBuiltinFunctions.CREATE_POINT,
+            AsterixBuiltinFunctions.NUMERIC_ADD, AsterixBuiltinFunctions.NUMERIC_SUBTRACT,
+            AsterixBuiltinFunctions.NUMERIC_MULTIPLY, AsterixBuiltinFunctions.NUMERIC_DIVIDE,
+            AsterixBuiltinFunctions.NUMERIC_MOD);
 
     public abstract Map<FunctionIdentifier, List<IAccessMethod>> getAccessMethods();
 
@@ -108,7 +108,7 @@
 
     protected void fillSubTreeIndexExprs(OptimizableOperatorSubTree subTree,
             Map<IAccessMethod, AccessMethodAnalysisContext> analyzedAMs, IOptimizationContext context)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         Iterator<Map.Entry<IAccessMethod, AccessMethodAnalysisContext>> amIt = analyzedAMs.entrySet().iterator();
         // Check applicability of indexes by access method type.
         while (amIt.hasNext()) {
@@ -145,15 +145,15 @@
         return list.isEmpty() ? null : list.get(0);
     }
 
-    protected List<Pair<IAccessMethod, Index>>
-            chooseAllIndex(Map<IAccessMethod, AccessMethodAnalysisContext> analyzedAMs) {
+    protected List<Pair<IAccessMethod, Index>> chooseAllIndex(
+            Map<IAccessMethod, AccessMethodAnalysisContext> analyzedAMs) {
         List<Pair<IAccessMethod, Index>> result = new ArrayList<Pair<IAccessMethod, Index>>();
         Iterator<Map.Entry<IAccessMethod, AccessMethodAnalysisContext>> amIt = analyzedAMs.entrySet().iterator();
         while (amIt.hasNext()) {
             Map.Entry<IAccessMethod, AccessMethodAnalysisContext> amEntry = amIt.next();
             AccessMethodAnalysisContext analysisCtx = amEntry.getValue();
-            Iterator<Map.Entry<Index, List<Pair<Integer, Integer>>>> indexIt =
-                    analysisCtx.indexExprsAndVars.entrySet().iterator();
+            Iterator<Map.Entry<Index, List<Pair<Integer, Integer>>>> indexIt = analysisCtx.indexExprsAndVars.entrySet()
+                    .iterator();
             while (indexIt.hasNext()) {
                 Map.Entry<Index, List<Pair<Integer, Integer>>> indexEntry = indexIt.next();
                 // To avoid a case where the chosen access method and a chosen
@@ -167,15 +167,14 @@
                 //                           LENGTH_PARTITIONED_NGRAM_INVIX]
                 IAccessMethod chosenAccessMethod = amEntry.getKey();
                 Index chosenIndex = indexEntry.getKey();
-                boolean isKeywordOrNgramIndexChosen =
-                        chosenIndex.getIndexType() == IndexType.LENGTH_PARTITIONED_WORD_INVIX
-                                || chosenIndex.getIndexType() == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX
-                                || chosenIndex.getIndexType() == IndexType.SINGLE_PARTITION_WORD_INVIX
-                                || chosenIndex.getIndexType() == IndexType.SINGLE_PARTITION_NGRAM_INVIX;
+                IndexType indexType = chosenIndex.getIndexType();
+                boolean isKeywordOrNgramIndexChosen = indexType == IndexType.LENGTH_PARTITIONED_WORD_INVIX
+                        || indexType == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX
+                        || indexType == IndexType.SINGLE_PARTITION_WORD_INVIX
+                        || indexType == IndexType.SINGLE_PARTITION_NGRAM_INVIX;
 
-                if ((chosenAccessMethod == BTreeAccessMethod.INSTANCE && chosenIndex.getIndexType() == IndexType.BTREE)
-                        || (chosenAccessMethod == RTreeAccessMethod.INSTANCE
-                                && chosenIndex.getIndexType() == IndexType.RTREE)
+                if ((chosenAccessMethod == BTreeAccessMethod.INSTANCE && indexType == IndexType.BTREE)
+                        || (chosenAccessMethod == RTreeAccessMethod.INSTANCE && indexType == IndexType.RTREE)
                         || (chosenAccessMethod == InvertedIndexAccessMethod.INSTANCE && isKeywordOrNgramIndexChosen)) {
                     result.add(new Pair<IAccessMethod, Index>(chosenAccessMethod, chosenIndex));
                 }
@@ -196,8 +195,8 @@
      */
     public void pruneIndexCandidates(IAccessMethod accessMethod, AccessMethodAnalysisContext analysisCtx,
             IOptimizationContext context, IVariableTypeEnvironment typeEnvironment) throws AlgebricksException {
-        Iterator<Map.Entry<Index, List<Pair<Integer, Integer>>>> indexExprAndVarIt =
-                analysisCtx.indexExprsAndVars.entrySet().iterator();
+        Iterator<Map.Entry<Index, List<Pair<Integer, Integer>>>> indexExprAndVarIt = analysisCtx.indexExprsAndVars
+                .entrySet().iterator();
         // Used to keep track of matched expressions (added for prefix search)
         int numMatchedKeys = 0;
         ArrayList<Integer> matchedExpressions = new ArrayList<Integer>();
@@ -226,24 +225,22 @@
                     }
                     boolean typeMatch = true;
                     //Prune indexes based on field types
-                    List<IAType> indexedTypes = new ArrayList<IAType>();
+                    List<IAType> matchedTypes = new ArrayList<>();
                     //retrieve types of expressions joined/selected with an indexed field
                     for (int j = 0; j < optFuncExpr.getNumLogicalVars(); j++) {
                         if (j != exprAndVarIdx.second) {
-                            indexedTypes.add(optFuncExpr.getFieldType(j));
+                            matchedTypes.add(optFuncExpr.getFieldType(j));
                         }
                     }
 
-                    //add constants in case of select
-                    if (indexedTypes.size() < 2 && optFuncExpr.getNumLogicalVars() == 1
-                            && optFuncExpr.getNumConstantAtRuntimeExpr() > 0) {
-                        indexedTypes.add((IAType) AqlExpressionTypeComputer.INSTANCE.getType(
+                    if (matchedTypes.size() < 2 && optFuncExpr.getNumLogicalVars() == 1) {
+                        matchedTypes.add((IAType) AqlExpressionTypeComputer.INSTANCE.getType(
                                 optFuncExpr.getConstantAtRuntimeExpr(0), context.getMetadataProvider(),
                                 typeEnvironment));
                     }
 
                     //infer type of logicalExpr based on index keyType
-                    indexedTypes.add((IAType) AqlExpressionTypeComputer.INSTANCE.getType(
+                    matchedTypes.add((IAType) AqlExpressionTypeComputer.INSTANCE.getType(
                             optFuncExpr.getLogicalExpr(exprAndVarIdx.second), null, new IVariableTypeEnvironment() {
 
                                 @Override
@@ -257,7 +254,7 @@
                                 @Override
                                 public Object getVarType(LogicalVariable var, List<LogicalVariable> nonNullVariables,
                                         List<List<LogicalVariable>> correlatedNullableVariableLists)
-                                        throws AlgebricksException {
+                                                throws AlgebricksException {
                                     if (var.equals(optFuncExpr.getSourceVar(exprAndVarIdx.second))) {
                                         return keyType;
                                     }
@@ -285,16 +282,16 @@
                     boolean jaccardSimilarity = optFuncExpr.getFuncExpr().getFunctionIdentifier().getName()
                             .startsWith("similarity-jaccard-check");
 
-                    for (int j = 0; j < indexedTypes.size(); j++) {
-                        for (int k = j + 1; k < indexedTypes.size(); k++) {
-                            typeMatch &= isMatched(indexedTypes.get(j), indexedTypes.get(k), jaccardSimilarity);
+                    for (int j = 0; j < matchedTypes.size(); j++) {
+                        for (int k = j + 1; k < matchedTypes.size(); k++) {
+                            typeMatch &= isMatched(matchedTypes.get(j), matchedTypes.get(k), jaccardSimilarity);
                         }
                     }
 
                     // Check if any field name in the optFuncExpr matches.
                     if (optFuncExpr.findFieldName(keyField) != -1) {
-                        foundKeyField =
-                                typeMatch && optFuncExpr.getOperatorSubTree(exprAndVarIdx.second).hasDataSourceScan();
+                        foundKeyField = typeMatch
+                                && optFuncExpr.getOperatorSubTree(exprAndVarIdx.second).hasDataSourceScan();
                         if (foundKeyField) {
                             matchedExpressions.add(exprAndVarIdx.first);
                             numMatchedKeys++;
@@ -369,8 +366,8 @@
                 continue;
             }
             AbstractFunctionCallExpression argFuncExpr = (AbstractFunctionCallExpression) argExpr;
-            boolean matchFound =
-                    analyzeFunctionExpr(argFuncExpr, assignsAndUnnests, analyzedAMs, context, typeEnvironment);
+            boolean matchFound = analyzeFunctionExpr(argFuncExpr, assignsAndUnnests, analyzedAMs, context,
+                    typeEnvironment);
             found = found || matchFound;
         }
         return found;
@@ -435,14 +432,13 @@
     protected boolean fillIndexExprs(List<Index> datasetIndexes, List<String> fieldName, IAType fieldType,
             IOptimizableFuncExpr optFuncExpr, int matchedFuncExprIndex, int varIdx,
             OptimizableOperatorSubTree matchedSubTree, AccessMethodAnalysisContext analysisCtx)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         List<Index> indexCandidates = new ArrayList<Index>();
         // Add an index to the candidates if one of the indexed fields is
         // fieldName
         for (Index index : datasetIndexes) {
             // Need to also verify the index is pending no op
-            if (index.getKeyFieldNames().contains(fieldName)
-                    && index.getPendingOp() == IMetadataEntity.PENDING_NO_OP) {
+            if (index.getKeyFieldNames().contains(fieldName) && index.getPendingOp() == IMetadataEntity.PENDING_NO_OP) {
                 indexCandidates.add(index);
                 if (optFuncExpr.getFieldType(varIdx) == BuiltinType.AMISSING
                         || optFuncExpr.getFieldType(varIdx) == BuiltinType.ANY) {
@@ -540,8 +536,8 @@
                 return;
             }
         }
-        IAType fieldType =
-                (IAType) context.getOutputTypeEnvironment(unnestOp).getType(optFuncExpr.getLogicalExpr(funcVarIndex));
+        IAType fieldType = (IAType) context.getOutputTypeEnvironment(unnestOp)
+                .getType(optFuncExpr.getLogicalExpr(funcVarIndex));
         // Set the fieldName in the corresponding matched function
         // expression.
         optFuncExpr.setFieldName(funcVarIndex, fieldName);
@@ -571,16 +567,14 @@
             // Remember matching subtree.
             optFuncExpr.setOptimizableSubTree(optVarIndex, subTree);
 
-            List<String> fieldName =
-                    getFieldNameFromSubTree(optFuncExpr, subTree, assignOrUnnestIndex, varIndex,
-                            subTree.getRecordType(), optVarIndex,
-                            optFuncExpr.getFuncExpr().getArguments().get(optVarIndex).getValue(),
-                            datasetRecordVar, subTree.getMetaRecordType(), datasetMetaVar);
+            List<String> fieldName = getFieldNameFromSubTree(optFuncExpr, subTree, assignOrUnnestIndex, varIndex,
+                    subTree.getRecordType(), optVarIndex,
+                    optFuncExpr.getFuncExpr().getArguments().get(optVarIndex).getValue(), datasetRecordVar,
+                    subTree.getMetaRecordType(), datasetMetaVar);
             if (fieldName == null) {
                 continue;
             }
-            IAType fieldType = (IAType) context.getOutputTypeEnvironment(assignOp)
-                    .getType(optFuncExpr.getLogicalExpr(optVarIndex));
+            IAType fieldType = (IAType) context.getOutputTypeEnvironment(assignOp).getVarType(var);
             // Set the fieldName in the corresponding matched
             // function expression.
             optFuncExpr.setFieldName(optVarIndex, fieldName);
@@ -597,7 +591,7 @@
     private void matchVarsFromOptFuncExprToDataSourceScan(IOptimizableFuncExpr optFuncExpr, int optFuncExprIndex,
             List<Index> datasetIndexes, List<LogicalVariable> dsVarList, OptimizableOperatorSubTree subTree,
             AccessMethodAnalysisContext analysisCtx, IOptimizationContext context, boolean fromAdditionalDataSource)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         for (int varIndex = 0; varIndex < dsVarList.size(); varIndex++) {
             LogicalVariable var = dsVarList.get(varIndex);
             int funcVarIndex = optFuncExpr.findLogicalVar(var);
@@ -615,16 +609,15 @@
                 // Check whether this variable is PK, not a record variable.
                 if (varIndex <= subTreePKs.size() - 1) {
                     fieldName = subTreePKs.get(varIndex);
-                    fieldType =
-                            (IAType) context.getOutputTypeEnvironment(
-                                    subTree.getDataSourceRef().getValue()).getVarType(var);
+                    fieldType = (IAType) context.getOutputTypeEnvironment(subTree.getDataSourceRef().getValue())
+                            .getVarType(var);
                 }
             } else {
                 // Need to check additional dataset one by one
                 for (int i = 0; i < subTree.getIxJoinOuterAdditionalDatasets().size(); i++) {
                     if (subTree.getIxJoinOuterAdditionalDatasets().get(i) != null) {
-                        subTreePKs = DatasetUtils.getPartitioningKeys(
-                                subTree.getIxJoinOuterAdditionalDatasets().get(i));
+                        subTreePKs = DatasetUtils
+                                .getPartitioningKeys(subTree.getIxJoinOuterAdditionalDatasets().get(i));
 
                         // Check whether this variable is PK, not a record variable.
                         if (subTreePKs.contains(var) && varIndex <= subTreePKs.size() - 1) {
@@ -667,11 +660,10 @@
      *
      * @throws AlgebricksException
      */
-    protected List<String> getFieldNameFromSubTree(IOptimizableFuncExpr optFuncExpr,
-            OptimizableOperatorSubTree subTree, int opIndex, int assignVarIndex, ARecordType recordType,
-            int funcVarIndex, ILogicalExpression parentFuncExpr, LogicalVariable recordVar,
-            ARecordType metaType, LogicalVariable metaVar)
-            throws AlgebricksException {
+    protected List<String> getFieldNameFromSubTree(IOptimizableFuncExpr optFuncExpr, OptimizableOperatorSubTree subTree,
+            int opIndex, int assignVarIndex, ARecordType recordType, int funcVarIndex,
+            ILogicalExpression parentFuncExpr, LogicalVariable recordVar, ARecordType metaType, LogicalVariable metaVar)
+                    throws AlgebricksException {
         // Get expression corresponding to opVar at varIndex.
         AbstractLogicalExpression expr = null;
         AbstractFunctionCallExpression childFuncExpr = null;
@@ -679,6 +671,10 @@
         if (op.getOperatorTag() == LogicalOperatorTag.ASSIGN) {
             AssignOperator assignOp = (AssignOperator) op;
             expr = (AbstractLogicalExpression) assignOp.getExpressions().get(assignVarIndex).getValue();
+            if (expr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+                //Otherwise the cast for childFuncExpr would fail
+                return null;
+            }
             childFuncExpr = (AbstractFunctionCallExpression) expr;
         } else {
             UnnestOperator unnestOp = (UnnestOperator) op;
@@ -723,8 +719,8 @@
                 return null;
             }
             ConstantExpression constExpr = (ConstantExpression) nameArg;
-            AOrderedList orderedNestedFieldName =
-                    (AOrderedList) ((AsterixConstantValue) constExpr.getValue()).getObject();
+            AOrderedList orderedNestedFieldName = (AOrderedList) ((AsterixConstantValue) constExpr.getValue())
+                    .getObject();
             nestedAccessFieldName = new ArrayList<String>();
             for (int i = 0; i < orderedNestedFieldName.size(); i++) {
                 nestedAccessFieldName.add(((AString) orderedNestedFieldName.getItem(i)).getStringValue());
@@ -733,8 +729,8 @@
             isByName = true;
         }
         if (isFieldAccess) {
-            LogicalVariable sourceVar =
-                    ((VariableReferenceExpression) funcExpr.getArguments().get(0).getValue()).getVariableReference();
+            LogicalVariable sourceVar = ((VariableReferenceExpression) funcExpr.getArguments().get(0).getValue())
+                    .getVariableReference();
             optFuncExpr.setLogicalExpr(funcVarIndex, parentFuncExpr);
             int[] assignAndExpressionIndexes = null;
 
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java
index 874cc7c..23e45c4 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java
@@ -56,7 +56,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
@@ -490,7 +490,7 @@
     }
 
     @Override
-    public ILogicalOperator visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException {
+    public ILogicalOperator visitDelegateOperator(DelegateOperator op, Void arg) throws AlgebricksException {
         return visitSingleInputOperator(op);
     }
 
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java
index eeb2c2a..d3a0c0f 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java
@@ -41,7 +41,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
@@ -255,7 +255,7 @@
     }
 
     @Override
-    public ILogicalOperator visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException {
+    public ILogicalOperator visitDelegateOperator(DelegateOperator op, Void arg) throws AlgebricksException {
         return visitSingleInputOperator(op);
     }
 
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java
index ccf0aeb..44bfbe4 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java
@@ -27,7 +27,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
@@ -143,7 +143,7 @@
     }
 
     @Override
-    public Boolean visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException {
+    public Boolean visitDelegateOperator(DelegateOperator op, Void arg) throws AlgebricksException {
         return false;
     }
 
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
index b184774..98c717c 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
@@ -295,12 +295,17 @@
         private final String datasetName;
         private final Query query;
         private final int varCounter;
+        VariableExpr var;
+        Query returnQuery;
 
-        public CompiledInsertStatement(String dataverseName, String datasetName, Query query, int varCounter) {
+        public CompiledInsertStatement(String dataverseName, String datasetName, Query query, int varCounter,
+                VariableExpr var, Query returnQuery) {
             this.dataverseName = dataverseName;
             this.datasetName = datasetName;
             this.query = query;
             this.varCounter = varCounter;
+            this.var = var;
+            this.returnQuery = returnQuery;
         }
 
         @Override
@@ -321,6 +326,14 @@
             return query;
         }
 
+        public VariableExpr getVar() {
+            return var;
+        }
+
+        public Query getReturnQuery() {
+            return returnQuery;
+        }
+
         @Override
         public byte getKind() {
             return Statement.Kind.INSERT;
@@ -329,8 +342,9 @@
 
     public static class CompiledUpsertStatement extends CompiledInsertStatement {
 
-        public CompiledUpsertStatement(String dataverseName, String datasetName, Query query, int varCounter) {
-            super(dataverseName, datasetName, query, varCounter);
+        public CompiledUpsertStatement(String dataverseName, String datasetName, Query query, int varCounter,
+                VariableExpr var, Query returnQuery) {
+            super(dataverseName, datasetName, query, varCounter, var, returnQuery);
         }
 
         @Override
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
index 1b528b9..149656a 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
@@ -116,6 +116,8 @@
      * @param dmlStatement
      *            The data modification statement when the query results in a modification to a dataset
      * @return the compiled {@code JobSpecification}
+     * @param returnQuery
+     *            In the case of dml, the user may run a query on affected data
      * @throws AsterixException
      * @throws RemoteException
      * @throws AlgebricksException
@@ -124,7 +126,7 @@
      */
     JobSpecification rewriteCompileQuery(AqlMetadataProvider metadataProvider, Query query,
             ICompiledDmlStatement dmlStatement)
-            throws AsterixException, RemoteException, AlgebricksException, JSONException, ACIDException;
+                    throws AsterixException, RemoteException, AlgebricksException, JSONException, ACIDException;
 
     /**
      * returns the active dataverse for an entity or a statement
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
index 09a0476..9879da8 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
@@ -27,11 +27,12 @@
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.asterix.algebra.base.ILangExpressionToPlanTranslator;
+import org.apache.asterix.algebra.operators.CommitOperator;
 import org.apache.asterix.common.config.AsterixMetadataProperties;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.exceptions.AsterixException;
@@ -40,15 +41,15 @@
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.lang.aql.util.RangeMapBuilder;
 import org.apache.asterix.lang.common.base.Expression;
+import org.apache.asterix.lang.common.base.Expression.Kind;
 import org.apache.asterix.lang.common.base.ILangExpression;
 import org.apache.asterix.lang.common.base.Statement;
-import org.apache.asterix.lang.common.base.Expression.Kind;
 import org.apache.asterix.lang.common.clause.GroupbyClause;
 import org.apache.asterix.lang.common.clause.LetClause;
 import org.apache.asterix.lang.common.clause.LimitClause;
 import org.apache.asterix.lang.common.clause.OrderbyClause;
-import org.apache.asterix.lang.common.clause.WhereClause;
 import org.apache.asterix.lang.common.clause.OrderbyClause.OrderModifier;
+import org.apache.asterix.lang.common.clause.WhereClause;
 import org.apache.asterix.lang.common.expression.CallExpr;
 import org.apache.asterix.lang.common.expression.FieldAccessor;
 import org.apache.asterix.lang.common.expression.FieldBinding;
@@ -56,14 +57,14 @@
 import org.apache.asterix.lang.common.expression.IfExpr;
 import org.apache.asterix.lang.common.expression.IndexAccessor;
 import org.apache.asterix.lang.common.expression.ListConstructor;
+import org.apache.asterix.lang.common.expression.ListConstructor.Type;
 import org.apache.asterix.lang.common.expression.LiteralExpr;
 import org.apache.asterix.lang.common.expression.OperatorExpr;
 import org.apache.asterix.lang.common.expression.QuantifiedExpression;
+import org.apache.asterix.lang.common.expression.QuantifiedExpression.Quantifier;
 import org.apache.asterix.lang.common.expression.RecordConstructor;
 import org.apache.asterix.lang.common.expression.UnaryExpr;
 import org.apache.asterix.lang.common.expression.VariableExpr;
-import org.apache.asterix.lang.common.expression.ListConstructor.Type;
-import org.apache.asterix.lang.common.expression.QuantifiedExpression.Quantifier;
 import org.apache.asterix.lang.common.literal.StringLiteral;
 import org.apache.asterix.lang.common.statement.FunctionDecl;
 import org.apache.asterix.lang.common.statement.Query;
@@ -74,13 +75,13 @@
 import org.apache.asterix.lang.common.visitor.base.AbstractQueryExpressionVisitor;
 import org.apache.asterix.metadata.MetadataException;
 import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.declared.AqlDataSource.AqlDataSourceType;
 import org.apache.asterix.metadata.declared.AqlMetadataProvider;
 import org.apache.asterix.metadata.declared.AqlSourceId;
 import org.apache.asterix.metadata.declared.DatasetDataSource;
 import org.apache.asterix.metadata.declared.LoadableDataSource;
 import org.apache.asterix.metadata.declared.ResultSetDataSink;
 import org.apache.asterix.metadata.declared.ResultSetSinkId;
-import org.apache.asterix.metadata.declared.AqlDataSource.AqlDataSourceType;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.Feed;
 import org.apache.asterix.metadata.entities.Function;
@@ -96,8 +97,10 @@
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.runtime.formats.FormatUtils;
 import org.apache.asterix.runtime.util.AsterixAppContextInfo;
+import org.apache.asterix.translator.CompiledStatements.CompiledInsertStatement;
 import org.apache.asterix.translator.CompiledStatements.CompiledLoadFromFileStatement;
 import org.apache.asterix.translator.CompiledStatements.CompiledSubscribeFeedStatement;
+import org.apache.asterix.translator.CompiledStatements.CompiledUpsertStatement;
 import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement;
 import org.apache.asterix.translator.util.FunctionCollection;
 import org.apache.asterix.translator.util.PlanTranslationUtil;
@@ -116,15 +119,15 @@
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.base.OperatorAnnotations;
 import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression.FunctionKind;
 import org.apache.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
 import org.apache.hyracks.algebricks.core.algebra.expressions.BroadcastExpressionAnnotation;
+import org.apache.hyracks.algebricks.core.algebra.expressions.BroadcastExpressionAnnotation.BroadcastSide;
 import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionAnnotation;
 import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
 import org.apache.hyracks.algebricks.core.algebra.expressions.UnnestingFunctionCallExpression;
 import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
-import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression.FunctionKind;
-import org.apache.hyracks.algebricks.core.algebra.expressions.BroadcastExpressionAnnotation.BroadcastSide;
 import org.apache.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import org.apache.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
@@ -133,6 +136,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
@@ -140,13 +144,13 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.LogicalOperatorDeepCopyWithNewVariablesVisitor;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
 import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
@@ -286,15 +290,30 @@
     @Override
     public ILogicalPlan translate(Query expr, String outputDatasetName, ICompiledDmlStatement stmt)
             throws AlgebricksException {
-        Pair<ILogicalOperator, LogicalVariable> p = expr.accept(this,
-                new MutableObject<>(new EmptyTupleSourceOperator()));
+        return translate(expr, outputDatasetName, stmt, null);
+    }
+
+    public ILogicalPlan translate(Query expr, String outputDatasetName, ICompiledDmlStatement stmt,
+            ILogicalOperator baseOp) throws AlgebricksException {
+        MutableObject<ILogicalOperator> base = new MutableObject<>(new EmptyTupleSourceOperator());
+        if (baseOp != null) {
+            base = new MutableObject<>(baseOp);
+        }
+        Pair<ILogicalOperator, LogicalVariable> p = expr.accept(this, base);
         ArrayList<Mutable<ILogicalOperator>> globalPlanRoots = new ArrayList<>();
         ILogicalOperator topOp = p.first;
-        ProjectOperator project = (ProjectOperator) topOp;
-        LogicalVariable unnestVar = project.getVariables().get(0);
-        LogicalVariable resVar = project.getVariables().get(0);
 
         if (outputDatasetName == null) {
+            LogicalVariable resVar;
+            if (topOp instanceof ProjectOperator) {
+                resVar = ((ProjectOperator) topOp).getVariables().get(0);
+            } else if (topOp instanceof AssignOperator) {
+                resVar = ((AssignOperator) topOp).getVariables().get(0);
+            } else if (topOp instanceof AggregateOperator) {
+                resVar = ((AggregateOperator) topOp).getVariables().get(0);
+            } else {
+                throw new AlgebricksException("Invalid returning query");
+            }
             FileSplit outputFileSplit = metadataProvider.getOutputFile();
             if (outputFileSplit == null) {
                 outputFileSplit = getDefaultOutputFileLocation();
@@ -305,8 +324,9 @@
             writeExprList.add(new MutableObject<>(new VariableReferenceExpression(resVar)));
             ResultSetSinkId rssId = new ResultSetSinkId(metadataProvider.getResultSetId());
             ResultSetDataSink sink = new ResultSetDataSink(rssId, null);
-            topOp = new DistributeResultOperator(writeExprList, sink);
-            topOp.getInputs().add(new MutableObject<>(project));
+            DistributeResultOperator newTop = new DistributeResultOperator(writeExprList, sink);
+            newTop.getInputs().add(new MutableObject<>(topOp));
+            topOp = newTop;
 
             // Retrieve the Output RecordType (if any) and store it on
             // the DistributeResultOperator
@@ -315,6 +335,10 @@
                 topOp.getAnnotations().put("output-record-type", outputRecordType);
             }
         } else {
+            ProjectOperator project = (ProjectOperator) topOp;
+            LogicalVariable unnestVar = project.getVariables().get(0);
+            LogicalVariable resVar = project.getVariables().get(0);
+
             /**
              * add the collection-to-sequence right before the project,
              * because dataset only accept non-collection records
@@ -380,12 +404,12 @@
             switch (stmt.getKind()) {
                 case Statement.Kind.INSERT:
                     leafOperator = translateInsert(targetDatasource, varRef, varRefsForLoading,
-                            additionalFilteringExpressions, assign);
+                            additionalFilteringExpressions, assign, stmt);
                     break;
                 case Statement.Kind.UPSERT:
                     leafOperator = translateUpsert(targetDatasource, varRef, varRefsForLoading,
                             additionalFilteringExpressions, assign, additionalFilteringField, unnestVar, project, exprs,
-                            resVar, additionalFilteringAssign);
+                            resVar, additionalFilteringAssign, stmt);
                     break;
                 case Statement.Kind.DELETE:
                     leafOperator = translateDelete(targetDatasource, varRef, varRefsForLoading,
@@ -418,7 +442,7 @@
                 varRefsForLoading, InsertDeleteUpsertOperator.Kind.INSERT, false);
         insertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
         insertOp.getInputs().add(new MutableObject<>(assign));
-        SinkOperator leafOperator = new SinkOperator();
+        ILogicalOperator leafOperator = new DelegateOperator(new CommitOperator(true));
         leafOperator.getInputs().add(new MutableObject<>(insertOp));
         return leafOperator;
     }
@@ -426,7 +450,7 @@
     private ILogicalOperator translateDelete(DatasetDataSource targetDatasource, Mutable<ILogicalExpression> varRef,
             List<Mutable<ILogicalExpression>> varRefsForLoading,
             List<Mutable<ILogicalExpression>> additionalFilteringExpressions, ILogicalOperator assign)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         if (targetDatasource.getDataset().hasMetaPart()) {
             throw new AlgebricksException(targetDatasource.getDataset().getDatasetName()
                     + ": delete from dataset is not supported on Datasets with Meta records");
@@ -435,7 +459,7 @@
                 varRefsForLoading, InsertDeleteUpsertOperator.Kind.DELETE, false);
         deleteOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
         deleteOp.getInputs().add(new MutableObject<>(assign));
-        SinkOperator leafOperator = new SinkOperator();
+        ILogicalOperator leafOperator = new DelegateOperator(new CommitOperator(true));
         leafOperator.getInputs().add(new MutableObject<>(deleteOp));
         return leafOperator;
     }
@@ -528,7 +552,7 @@
             project.getInputs().set(0, new MutableObject<>(metaAndKeysAssign));
         }
         feedModificationOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
-        SinkOperator leafOperator = new SinkOperator();
+        ILogicalOperator leafOperator = new DelegateOperator(new CommitOperator(true));
         leafOperator.getInputs().add(new MutableObject<>(feedModificationOp));
         return leafOperator;
     }
@@ -537,14 +561,20 @@
             List<Mutable<ILogicalExpression>> varRefsForLoading,
             List<Mutable<ILogicalExpression>> additionalFilteringExpressions, ILogicalOperator assign,
             List<String> additionalFilteringField, LogicalVariable unnestVar, ProjectOperator project,
-            List<Mutable<ILogicalExpression>> exprs, LogicalVariable resVar, AssignOperator additionalFilteringAssign)
-            throws AlgebricksException {
+            List<Mutable<ILogicalExpression>> exprs, LogicalVariable resVar, AssignOperator additionalFilteringAssign,
+            ICompiledDmlStatement stmt) throws AlgebricksException {
         if (!targetDatasource.getDataset().allow(project, Dataset.OP_UPSERT)) {
             throw new AlgebricksException(targetDatasource.getDataset().getDatasetName()
                     + ": upsert into dataset is not supported on Datasets with Meta records");
         }
+        CompiledUpsertStatement compiledUpsert = (CompiledUpsertStatement) stmt;
+        InsertDeleteUpsertOperator upsertOp;
+        ILogicalOperator leafOperator;
         if (targetDatasource.getDataset().hasMetaPart()) {
-            InsertDeleteUpsertOperator feedModificationOp;
+            if (compiledUpsert.getReturnQuery() != null) {
+                throw new AlgebricksException("Returning not allowed on datasets with Meta records");
+
+            }
             AssignOperator metaAndKeysAssign;
             List<LogicalVariable> metaAndKeysVars;
             List<Mutable<ILogicalExpression>> metaAndKeysExprs;
@@ -575,71 +605,113 @@
                 }
             }
             // A change feed, we don't need the assign to access PKs
-            feedModificationOp = new InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading,
-                    metaExpSingletonList, InsertDeleteUpsertOperator.Kind.UPSERT, false);
+            upsertOp = new InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading, metaExpSingletonList,
+                    InsertDeleteUpsertOperator.Kind.UPSERT, false);
             // Create and add a new variable used for representing the original record
-            feedModificationOp.setPrevRecordVar(context.newVar());
-            feedModificationOp.setPrevRecordType(targetDatasource.getItemType());
+            upsertOp.setPrevRecordVar(context.newVar());
+            upsertOp.setPrevRecordType(targetDatasource.getItemType());
             if (targetDatasource.getDataset().hasMetaPart()) {
                 List<LogicalVariable> metaVars = new ArrayList<>();
                 metaVars.add(context.newVar());
-                feedModificationOp.setPrevAdditionalNonFilteringVars(metaVars);
+                upsertOp.setPrevAdditionalNonFilteringVars(metaVars);
                 List<Object> metaTypes = new ArrayList<>();
                 metaTypes.add(targetDatasource.getMetaItemType());
-                feedModificationOp.setPrevAdditionalNonFilteringTypes(metaTypes);
+                upsertOp.setPrevAdditionalNonFilteringTypes(metaTypes);
             }
 
             if (additionalFilteringField != null) {
-                feedModificationOp.setPrevFilterVar(context.newVar());
-                feedModificationOp.setPrevFilterType(
+                upsertOp.setPrevFilterVar(context.newVar());
+                upsertOp.setPrevFilterType(
                         ((ARecordType) targetDatasource.getItemType()).getFieldType(additionalFilteringField.get(0)));
                 additionalFilteringAssign.getInputs().clear();
                 additionalFilteringAssign.getInputs().add(assign.getInputs().get(0));
-                feedModificationOp.getInputs().add(new MutableObject<>(additionalFilteringAssign));
+                upsertOp.getInputs().add(new MutableObject<>(additionalFilteringAssign));
             } else {
-                feedModificationOp.getInputs().add(assign.getInputs().get(0));
+                upsertOp.getInputs().add(assign.getInputs().get(0));
             }
             metaAndKeysAssign = new AssignOperator(metaAndKeysVars, metaAndKeysExprs);
             metaAndKeysAssign.getInputs().add(project.getInputs().get(0));
             project.getInputs().set(0, new MutableObject<>(metaAndKeysAssign));
-            feedModificationOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
-            SinkOperator leafOperator = new SinkOperator();
-            leafOperator.getInputs().add(new MutableObject<>(feedModificationOp));
-            return leafOperator;
+            upsertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
+            leafOperator = new DelegateOperator(new CommitOperator(true));
+            leafOperator.getInputs().add(new MutableObject<>(upsertOp));
+
         } else {
-            InsertDeleteUpsertOperator feedModificationOp;
-            feedModificationOp = new InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading,
+            upsertOp = new InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading,
                     InsertDeleteUpsertOperator.Kind.UPSERT, false);
-            feedModificationOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
-            feedModificationOp.getInputs().add(new MutableObject<>(assign));
+            upsertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
+            upsertOp.getInputs().add(new MutableObject<>(assign));
             // Create and add a new variable used for representing the original record
             ARecordType recordType = (ARecordType) targetDatasource.getItemType();
-            feedModificationOp.setPrevRecordVar(context.newVar());
-            feedModificationOp.setPrevRecordType(recordType);
+            upsertOp.setPrevRecordVar(context.newVar());
+            upsertOp.setPrevRecordType(recordType);
             if (additionalFilteringField != null) {
-                feedModificationOp.setPrevFilterVar(context.newVar());
-                feedModificationOp.setPrevFilterType(recordType.getFieldType(additionalFilteringField.get(0)));
+                upsertOp.setPrevFilterVar(context.newVar());
+                upsertOp.setPrevFilterType(recordType.getFieldType(additionalFilteringField.get(0)));
             }
-            SinkOperator leafOperator = new SinkOperator();
-            leafOperator.getInputs().add(new MutableObject<>(feedModificationOp));
-            return leafOperator;
+
+            if (compiledUpsert.getReturnQuery() != null) {
+                leafOperator = createReturningQuery(compiledUpsert, upsertOp);
+
+            } else {
+                leafOperator = new DelegateOperator(new CommitOperator(true));
+                leafOperator.getInputs().add(new MutableObject<ILogicalOperator>(upsertOp));
+            }
         }
+        return leafOperator;
+
+    }
+
+    private ILogicalOperator createReturningQuery(CompiledInsertStatement compiledInsert,
+            InsertDeleteUpsertOperator insertOp) throws AlgebricksException {
+        //Make the id of the insert var point to the record variable
+        context.newVar(compiledInsert.getVar());
+        context.setVar(compiledInsert.getVar(),
+                ((VariableReferenceExpression) insertOp.getPayloadExpression().getValue()).getVariableReference());
+        // context
+
+        ILogicalPlan planAfterInsert = translate(compiledInsert.getReturnQuery(), null, null, insertOp);
+
+        ILogicalOperator finalRoot = planAfterInsert.getRoots().get(0).getValue();
+        ILogicalOperator op;
+        for (op = finalRoot;; op = op.getInputs().get(0).getValue()) {
+            if (op.getInputs().size() != 1) {
+                throw new AlgebricksException("Cannot have a multi-branch returning query");
+            }
+            if (op.getInputs().get(0).getValue() instanceof InsertDeleteUpsertOperator) {
+                break;
+            }
+        }
+
+        op.getInputs().clear();
+        ILogicalOperator leafOperator = new DelegateOperator(new CommitOperator(false));
+        leafOperator.getInputs().add(new MutableObject<ILogicalOperator>(insertOp));
+        op.getInputs().add(new MutableObject<>(leafOperator));
+        leafOperator = finalRoot;
+        return leafOperator;
     }
 
     private ILogicalOperator translateInsert(DatasetDataSource targetDatasource, Mutable<ILogicalExpression> varRef,
             List<Mutable<ILogicalExpression>> varRefsForLoading,
-            List<Mutable<ILogicalExpression>> additionalFilteringExpressions, ILogicalOperator assign)
-            throws AlgebricksException {
+            List<Mutable<ILogicalExpression>> additionalFilteringExpressions, ILogicalOperator assign,
+            ICompiledDmlStatement stmt) throws AlgebricksException {
         if (targetDatasource.getDataset().hasMetaPart()) {
             throw new AlgebricksException(targetDatasource.getDataset().getDatasetName()
                     + ": insert into dataset is not supported on Datasets with Meta records");
         }
+        ILogicalOperator leafOperator;
         InsertDeleteUpsertOperator insertOp = new InsertDeleteUpsertOperator(targetDatasource, varRef,
                 varRefsForLoading, InsertDeleteUpsertOperator.Kind.INSERT, false);
         insertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
-        insertOp.getInputs().add(new MutableObject<>(assign));
-        SinkOperator leafOperator = new SinkOperator();
-        leafOperator.getInputs().add(new MutableObject<>(insertOp));
+        insertOp.getInputs().add(new MutableObject<ILogicalOperator>(assign));
+        CompiledInsertStatement compiledInsert = (CompiledInsertStatement) stmt;
+        if (compiledInsert.getReturnQuery() != null) {
+            leafOperator = createReturningQuery(compiledInsert, insertOp);
+
+        } else {
+            leafOperator = new DelegateOperator(new CommitOperator(true));
+            leafOperator.getInputs().add(new MutableObject<ILogicalOperator>(insertOp));
+        }
         return leafOperator;
     }
 
@@ -880,15 +952,15 @@
 
         gOp.getInputs().add(topOp);
         for (Entry<Expression, VariableExpr> entry : gc.getWithVarMap().entrySet()) {
-            Pair<ILogicalExpression, Mutable<ILogicalOperator>> listifyInput = langExprToAlgExpression(
-                        entry.getKey(), new MutableObject<>(new NestedTupleSourceOperator(new MutableObject<>(gOp))));
-                List<Mutable<ILogicalExpression>> flArgs = new ArrayList<>(1);
+            Pair<ILogicalExpression, Mutable<ILogicalOperator>> listifyInput = langExprToAlgExpression(entry.getKey(),
+                    new MutableObject<>(new NestedTupleSourceOperator(new MutableObject<>(gOp))));
+            List<Mutable<ILogicalExpression>> flArgs = new ArrayList<>(1);
             flArgs.add(new MutableObject<>(listifyInput.first));
             AggregateFunctionCallExpression fListify = AsterixBuiltinFunctions
-                        .makeAggregateFunctionExpression(AsterixBuiltinFunctions.LISTIFY, flArgs);
+                    .makeAggregateFunctionExpression(AsterixBuiltinFunctions.LISTIFY, flArgs);
             LogicalVariable aggVar = context.newVar();
             AggregateOperator agg = new AggregateOperator(mkSingletonArrayList(aggVar),
-                        mkSingletonArrayList(new MutableObject<>(fListify)));
+                    mkSingletonArrayList(new MutableObject<>(fListify)));
 
             agg.getInputs().add(listifyInput.second);
 
@@ -945,8 +1017,8 @@
         LogicalVariable unnestVar = context.newVar();
         UnnestOperator unnestOp = new UnnestOperator(unnestVar,
                 new MutableObject<>(new UnnestingFunctionCallExpression(
-                        FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.SCAN_COLLECTION), Collections
-                                .singletonList(new MutableObject<>(new VariableReferenceExpression(selectVar))))));
+                        FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.SCAN_COLLECTION),
+                        Collections.singletonList(new MutableObject<>(new VariableReferenceExpression(selectVar))))));
         unnestOp.getInputs().add(new MutableObject<>(assignOp));
 
         // Produces the final result.
@@ -1514,7 +1586,7 @@
                     // There is a shared operator reference in the query plan.
                     // Deep copies the child plan.
                     LogicalOperatorDeepCopyWithNewVariablesVisitor visitor =
-                            new LogicalOperatorDeepCopyWithNewVariablesVisitor(context, null);
+                        new LogicalOperatorDeepCopyWithNewVariablesVisitor(context, null);
                     ILogicalOperator newChild = childRef.getValue().accept(visitor, null);
                     LinkedHashMap<LogicalVariable, LogicalVariable> cloneVarMap = visitor
                             .getInputToOutputVariableMapping();
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServlet.java
index 78c68e1..6c8019d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServlet.java
@@ -25,21 +25,14 @@
 import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.io.PrintWriter;
-import java.util.Collection;
 
 import javax.imageio.ImageIO;
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
-import org.apache.asterix.active.EntityId;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.watch.FeedActivity;
-import org.apache.asterix.external.feed.watch.FeedActivity.FeedActivityDetails;
-
 public class FeedServlet extends HttpServlet {
     private static final long serialVersionUID = 1L;
-    private static final String FEED_EXTENSION_NAME = "Feed";
 
     @Override
     public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
@@ -89,49 +82,4 @@
         PrintWriter out = response.getWriter();
         out.println(outStr);
     }
-
-    @SuppressWarnings("unused")
-    private void insertTable(StringBuilder html, Collection<FeedActivity> list) {
-    }
-
-    @SuppressWarnings("null")
-    private void insertRow(StringBuilder html, FeedActivity activity) {
-        String intake = activity.getFeedActivityDetails().get(FeedActivityDetails.INTAKE_LOCATIONS);
-        String compute = activity.getFeedActivityDetails().get(FeedActivityDetails.COMPUTE_LOCATIONS);
-        String store = activity.getFeedActivityDetails().get(FeedActivityDetails.STORAGE_LOCATIONS);
-
-        FeedConnectionId connectionId = new FeedConnectionId(
-                new EntityId(FEED_EXTENSION_NAME, activity.getDataverseName(), activity.getFeedName()),
-                activity.getDatasetName());
-        int intakeRate = 0;
-        int storeRate = 0;
-
-        html.append("<tr>");
-        html.append("<td>" + activity.getFeedName() + "</td>");
-        html.append("<td>" + activity.getDatasetName() + "</td>");
-        html.append("<td>" + activity.getConnectTimestamp() + "</td>");
-        //html.append("<td>" + insertLink(html, FeedDashboardServlet.getParameterizedURL(activity), "Details") + "</td>");
-        html.append("<td>" + intake + "</td>");
-        html.append("<td>" + compute + "</td>");
-        html.append("<td>" + store + "</td>");
-        String color = "black";
-        if (intakeRate > storeRate) {
-            color = "red";
-        }
-        if (intakeRate < 0) {
-            html.append("<td>" + "UNKNOWN" + "</td>");
-        } else {
-            html.append("<td>" + insertColoredText("" + intakeRate, color) + " rec/sec" + "</td>");
-        }
-        if (storeRate < 0) {
-            html.append("<td>" + "UNKNOWN" + "</td>");
-        } else {
-            html.append("<td>" + insertColoredText("" + storeRate, color) + " rec/sec" + "</td>");
-        }
-        html.append("</tr>");
-    }
-
-    private String insertColoredText(String s, String color) {
-        return "<font color=\"" + color + "\">" + s + "</font>";
-    }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index f4ef1a1..8a25888 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -65,19 +65,19 @@
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.compiler.provider.ILangCompilationProvider;
 import org.apache.asterix.external.api.IAdapterFactory;
+import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber;
+import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber.ActiveLifecycleEvent;
 import org.apache.asterix.external.feed.api.IFeed;
 import org.apache.asterix.external.feed.api.IFeed.FeedType;
 import org.apache.asterix.external.feed.api.IFeedJoint;
 import org.apache.asterix.external.feed.api.IFeedJoint.FeedJointType;
-import org.apache.asterix.external.feed.api.IFeedLifecycleEventSubscriber;
-import org.apache.asterix.external.feed.api.IFeedLifecycleEventSubscriber.FeedLifecycleEvent;
+import org.apache.asterix.external.feed.management.ActiveLifecycleEventSubscriber;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.asterix.external.feed.management.FeedConnectionRequest;
 import org.apache.asterix.external.feed.management.FeedEventsListener;
 import org.apache.asterix.external.feed.management.FeedJointKey;
-import org.apache.asterix.external.feed.management.FeedLifecycleEventSubscriber;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
-import org.apache.asterix.external.feed.watch.FeedActivity.FeedActivityDetails;
+import org.apache.asterix.external.feed.watch.FeedActivityDetails;
 import org.apache.asterix.external.feed.watch.FeedConnectJobInfo;
 import org.apache.asterix.external.feed.watch.FeedIntakeInfo;
 import org.apache.asterix.external.indexing.ExternalFile;
@@ -342,7 +342,12 @@
                         break;
                     case Statement.Kind.INSERT:
                     case Statement.Kind.UPSERT:
-                        handleInsertUpsertStatement(metadataProvider, stmt, hcc);
+                        if (((InsertStatement) stmt).getReturnQuery() != null) {
+                            metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++));
+                            metadataProvider.setResultAsyncMode(resultDelivery == ResultDelivery.ASYNC
+                                    || resultDelivery == ResultDelivery.ASYNC_DEFERRED);
+                        }
+                        handleInsertUpsertStatement(metadataProvider, stmt, hcc, hdc, resultDelivery, stats, false);
                         break;
                     case Statement.Kind.DELETE:
                         handleDeleteStatement(metadataProvider, stmt, hcc);
@@ -393,7 +398,8 @@
                         // No op
                         break;
                     case Statement.Kind.EXTENSION:
-                        ((IExtensionStatement) stmt).handle(this, metadataProvider, hcc);
+                        ((IExtensionStatement) stmt).handle(this, metadataProvider, hcc, hdc, resultDelivery, stats,
+                                resultSetIdCounter);
                         break;
                     default:
                         throw new AsterixException("Unknown function");
@@ -506,7 +512,7 @@
         }
     }
 
-    protected void handleCreateDatasetStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+    public void handleCreateDatasetStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc) throws AsterixException, Exception {
         MutableObject<ProgressState> progress = new MutableObject<>(ProgressState.NO_PROGRESS);
         DatasetDecl dd = (DatasetDecl) stmt;
@@ -602,8 +608,7 @@
                     break;
                 case EXTERNAL:
                     String adapter = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getAdapter();
-                    Map<String, String> properties = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl())
-                            .getProperties();
+                    Map<String, String> properties = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getProperties();
 
                     datasetDetails = new ExternalDatasetDetails(adapter, properties, new Date(),
                             ExternalDatasetTransactionState.COMMIT);
@@ -705,7 +710,7 @@
         StringBuilder builder = null;
         IActiveEntityEventsListener[] listeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners();
         for (IActiveEntityEventsListener listener : listeners) {
-            if (listener.isEntityConnectedToDataset(dataverseName, datasetName)) {
+            if (listener.isEntityUsingDataset(dataverseName, datasetName)) {
                 if (builder == null) {
                     builder = new StringBuilder();
                 }
@@ -730,8 +735,7 @@
         }
     }
 
-    protected String configureNodegroupForDataset(DatasetDecl dd, String dataverse,
-            MetadataTransactionContext mdTxnCtx)
+    protected String configureNodegroupForDataset(DatasetDecl dd, String dataverse, MetadataTransactionContext mdTxnCtx)
             throws AsterixException {
         int nodegroupCardinality;
         String nodegroupName;
@@ -984,8 +988,7 @@
 
             ARecordType enforcedType = null;
             if (stmtCreateIndex.isEnforced()) {
-                enforcedType = createEnforcedType(aRecordType,
-                        Lists.newArrayList(index));
+                enforcedType = createEnforcedType(aRecordType, Lists.newArrayList(index));
             }
 
             // #. prepare to create the index artifact in NC.
@@ -1350,7 +1353,7 @@
         return MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
     }
 
-    protected void handleDatasetDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+    public void handleDatasetDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc) throws Exception {
         DropDatasetStatement stmtDelete = (DropDatasetStatement) stmt;
         String dataverseName = getActiveDataverse(stmtDelete.getDataverseName());
@@ -1369,8 +1372,8 @@
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue());
                     return;
                 } else {
-                    throw new AlgebricksException("There is no dataset with this name " + datasetName
-                            + " in dataverse " + dataverseName + ".");
+                    throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
+                            + dataverseName + ".");
                 }
             }
 
@@ -1424,7 +1427,7 @@
             // prepare job spec(s) that would disconnect any active feeds involving the dataset.
             IActiveEntityEventsListener[] activeListeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners();
             for (IActiveEntityEventsListener listener : activeListeners) {
-                if (listener.isEntityConnectedToDataset(dataverseName, datasetName)) {
+                if (listener.isEntityUsingDataset(dataverseName, datasetName)) {
                     throw new AsterixException(
                             "Can't drop dataset since it is connected to active entity: " + listener.getEntityId());
                 }
@@ -1482,8 +1485,7 @@
                 } else {
                     CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
                             indexes.get(j).getIndexName());
-                    jobsToExecute.add(
-                            ExternalIndexingOperations.buildDropFilesIndexJobSpec(cds, metadataProvider, ds));
+                    jobsToExecute.add(ExternalIndexingOperations.buildDropFilesIndexJobSpec(cds, metadataProvider, ds));
                 }
             }
 
@@ -1547,7 +1549,7 @@
             IActiveEntityEventsListener[] listeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners();
             StringBuilder builder = null;
             for (IActiveEntityEventsListener listener : listeners) {
-                if (listener.isEntityConnectedToDataset(dataverseName, datasetName)) {
+                if (listener.isEntityUsingDataset(dataverseName, datasetName)) {
                     if (builder == null) {
                         builder = new StringBuilder();
                     }
@@ -1555,9 +1557,8 @@
                 }
             }
             if (builder != null) {
-                throw new AsterixException(
-                        "Dataset" + datasetName + " is currently being fed into by the following active entities: "
-                                + builder.toString());
+                throw new AsterixException("Dataset" + datasetName
+                        + " is currently being fed into by the following active entities: " + builder.toString());
             }
 
             if (ds.getDatasetType() == DatasetType.INTERNAL) {
@@ -1578,11 +1579,9 @@
                 // #. mark PendingDropOp on the existing index
                 MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
                 MetadataManager.INSTANCE.addIndex(mdTxnCtx,
-                        new Index(dataverseName, datasetName, indexName, index.getIndexType(),
-                                index.getKeyFieldNames(),
+                        new Index(dataverseName, datasetName, indexName, index.getIndexType(), index.getKeyFieldNames(),
                                 index.getKeyFieldSourceIndicators(), index.getKeyFieldTypes(),
-                                index.isEnforcingKeyFileds(), index.isPrimaryIndex(),
-                                IMetadataEntity.PENDING_DROP_OP));
+                                index.isEnforcingKeyFileds(), index.isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP));
 
                 // #. commit the existing transaction before calling runJob.
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -1644,11 +1643,9 @@
                 // #. mark PendingDropOp on the existing index
                 MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
                 MetadataManager.INSTANCE.addIndex(mdTxnCtx,
-                        new Index(dataverseName, datasetName, indexName, index.getIndexType(),
-                                index.getKeyFieldNames(),
+                        new Index(dataverseName, datasetName, indexName, index.getIndexType(), index.getKeyFieldNames(),
                                 index.getKeyFieldSourceIndicators(), index.getKeyFieldTypes(),
-                                index.isEnforcingKeyFileds(), index.isPrimaryIndex(),
-                                IMetadataEntity.PENDING_DROP_OP));
+                                index.isEnforcingKeyFileds(), index.isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP));
 
                 // #. commit the existing transaction before calling runJob.
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -1707,9 +1704,8 @@
                 } catch (Exception e2) {
                     e.addSuppressed(e2);
                     abort(e, e2, mdTxnCtx);
-                    throw new IllegalStateException("System is inconsistent state: pending index("
-                            + dataverseName + "." + datasetName + "."
-                            + indexName + ") couldn't be removed from the metadata", e);
+                    throw new IllegalStateException("System is inconsistent state: pending index(" + dataverseName + "."
+                            + datasetName + "." + indexName + ") couldn't be removed from the metadata", e);
                 }
             }
 
@@ -1748,8 +1744,7 @@
         }
     }
 
-    protected void handleNodegroupDropStatement(AqlMetadataProvider metadataProvider, Statement stmt)
-            throws Exception {
+    protected void handleNodegroupDropStatement(AqlMetadataProvider metadataProvider, Statement stmt) throws Exception {
         NodeGroupDropStatement stmtDelete = (NodeGroupDropStatement) stmt;
         String nodegroupName = stmtDelete.getNodeGroupName().getValue();
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
@@ -1859,40 +1854,61 @@
         }
     }
 
-    protected void handleInsertUpsertStatement(AqlMetadataProvider metadataProvider, Statement stmt,
-            IHyracksClientConnection hcc) throws Exception {
+    public JobSpecification handleInsertUpsertStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+            IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery,
+            IStatementExecutor.Stats stats, boolean compileOnly) throws Exception {
 
         InsertStatement stmtInsertUpsert = (InsertStatement) stmt;
+
         String dataverseName = getActiveDataverse(stmtInsertUpsert.getDataverseName());
         Query query = stmtInsertUpsert.getQuery();
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        if (stmtInsertUpsert.getReturnQuery() != null) {
+            if (!stmtInsertUpsert.getReturnQuery().getDatasets().isEmpty()) {
+                throw new AsterixException("Cannot use datasets in an insert returning query");
+            }
+            // returnQuery Rewriting (happens under the same ongoing metadata transaction)
+            Pair<Query, Integer> rewrittenReturnQuery = apiFramework.reWriteQuery(declaredFunctions, metadataProvider,
+                    stmtInsertUpsert.getReturnQuery(), sessionConfig);
+
+            stmtInsertUpsert.getQuery().setVarCounter(rewrittenReturnQuery.first.getVarCounter());
+            stmtInsertUpsert.setRewrittenReturnQuery(rewrittenReturnQuery.first);
+            stmtInsertUpsert.addToVarCounter(rewrittenReturnQuery.second);
+        }
+
         MetadataLockManager.INSTANCE.insertDeleteUpsertBegin(dataverseName,
                 dataverseName + "." + stmtInsertUpsert.getDatasetName(), query.getDataverses(), query.getDatasets());
-
+        JobSpecification compiled = null;
         try {
             metadataProvider.setWriteTransaction(true);
             CompiledInsertStatement clfrqs = null;
             switch (stmtInsertUpsert.getKind()) {
                 case Statement.Kind.INSERT:
                     clfrqs = new CompiledInsertStatement(dataverseName, stmtInsertUpsert.getDatasetName().getValue(),
-                            query, stmtInsertUpsert.getVarCounter());
+                            query, stmtInsertUpsert.getVarCounter(), stmtInsertUpsert.getVar(),
+                            stmtInsertUpsert.getReturnQuery());
                     break;
                 case Statement.Kind.UPSERT:
                     clfrqs = new CompiledUpsertStatement(dataverseName, stmtInsertUpsert.getDatasetName().getValue(),
-                            query, stmtInsertUpsert.getVarCounter());
+                            query, stmtInsertUpsert.getVarCounter(), stmtInsertUpsert.getVar(),
+                            stmtInsertUpsert.getReturnQuery());
                     break;
                 default:
                     throw new AlgebricksException("Unsupported statement type " + stmtInsertUpsert.getKind());
             }
-            JobSpecification compiled = rewriteCompileQuery(metadataProvider, query, clfrqs);
+            compiled = rewriteCompileQuery(metadataProvider, query, clfrqs);
 
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
 
-            if (compiled != null) {
-                JobUtils.runJob(hcc, compiled, true);
+            if (compiled != null && !compileOnly) {
+                if (stmtInsertUpsert.getReturnQuery() != null) {
+                    handleQueryResult(metadataProvider, hcc, hdc, compiled, resultDelivery, stats);
+                } else {
+                    JobUtils.runJob(hcc, compiled, true);
+                }
             }
 
         } catch (Exception e) {
@@ -1905,9 +1921,10 @@
                     dataverseName + "." + stmtInsertUpsert.getDatasetName(), query.getDataverses(),
                     query.getDatasets());
         }
+        return compiled;
     }
 
-    protected void handleDeleteStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+    public void handleDeleteStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc) throws Exception {
 
         DeleteStatement stmtDelete = (DeleteStatement) stmt;
@@ -1948,7 +1965,7 @@
     @Override
     public JobSpecification rewriteCompileQuery(AqlMetadataProvider metadataProvider, Query query,
             ICompiledDmlStatement stmt)
-            throws AsterixException, RemoteException, AlgebricksException, JSONException, ACIDException {
+                    throws AsterixException, RemoteException, AlgebricksException, JSONException, ACIDException {
 
         // Query Rewriting (happens under the same ongoing metadata transaction)
         Pair<Query, Integer> reWrittenQuery = apiFramework.reWriteQuery(declaredFunctions, metadataProvider, query,
@@ -2034,12 +2051,10 @@
             String description = cfps.getDescription() == null ? "" : cfps.getDescription();
             if (extendingExisting) {
                 FeedPolicyEntity sourceFeedPolicy = MetadataManager.INSTANCE
-                        .getFeedPolicy(metadataProvider.getMetadataTxnContext(), dataverse,
-                                cfps.getSourcePolicyName());
+                        .getFeedPolicy(metadataProvider.getMetadataTxnContext(), dataverse, cfps.getSourcePolicyName());
                 if (sourceFeedPolicy == null) {
-                    sourceFeedPolicy =
-                            MetadataManager.INSTANCE.getFeedPolicy(metadataProvider.getMetadataTxnContext(),
-                                    MetadataConstants.METADATA_DATAVERSE_NAME, cfps.getSourcePolicyName());
+                    sourceFeedPolicy = MetadataManager.INSTANCE.getFeedPolicy(metadataProvider.getMetadataTxnContext(),
+                            MetadataConstants.METADATA_DATAVERSE_NAME, cfps.getSourcePolicyName());
                     if (sourceFeedPolicy == null) {
                         throw new AlgebricksException("Unknown policy " + cfps.getSourcePolicyName());
                     }
@@ -2158,7 +2173,7 @@
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         boolean subscriberRegistered = false;
-        IFeedLifecycleEventSubscriber eventSubscriber = new FeedLifecycleEventSubscriber();
+        IActiveLifecycleEventSubscriber eventSubscriber = new ActiveLifecycleEventSubscriber();
         FeedConnectionId feedConnId = null;
         EntityId entityId = new EntityId(Feed.EXTENSION_NAME, dataverseName, cfs.getFeedName());
         FeedEventsListener listener = (FeedEventsListener) ActiveJobNotificationHandler.INSTANCE
@@ -2211,7 +2226,7 @@
                         triple.third.get(0), pair.first);
                 pair.first.setProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, activeJob);
                 JobUtils.runJob(hcc, pair.first, false);
-                eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_INTAKE_STARTED);
+                eventSubscriber.assertEvent(ActiveLifecycleEvent.FEED_INTAKE_STARTED);
             } else {
                 for (IFeedJoint fj : triple.third) {
                     listener.registerFeedJoint(fj, 0);
@@ -2219,9 +2234,9 @@
             }
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
-            eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_COLLECT_STARTED);
+            eventSubscriber.assertEvent(ActiveLifecycleEvent.FEED_COLLECT_STARTED);
             if (Boolean.valueOf(metadataProvider.getConfig().get(ConnectFeedStatement.WAIT_FOR_COMPLETION))) {
-                eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_COLLECT_ENDED); // blocking call
+                eventSubscriber.assertEvent(ActiveLifecycleEvent.FEED_COLLECT_ENDED); // blocking call
             }
         } catch (Exception e) {
             if (bActiveTxn) {
@@ -2251,7 +2266,7 @@
      */
     protected Triple<FeedConnectionRequest, Boolean, List<IFeedJoint>> getFeedConnectionRequest(String dataverse,
             Feed feed, String dataset, FeedPolicyEntity feedPolicy, MetadataTransactionContext mdTxnCtx)
-            throws AsterixException {
+                    throws AsterixException {
         IFeedJoint sourceFeedJoint;
         FeedConnectionRequest request;
         List<String> functionsToApply = new ArrayList<>();
@@ -2349,10 +2364,10 @@
         Feed feed = FeedMetadataUtil.validateIfFeedExists(dataverseName, cfs.getFeedName().getValue(), mdTxnCtx);
         EntityId entityId = new EntityId(Feed.EXTENSION_NAME, feed.getDataverseName(), feed.getFeedName());
         FeedConnectionId connectionId = new FeedConnectionId(feed.getFeedId(), cfs.getDatasetName().getValue());
-        IFeedLifecycleEventSubscriber eventSubscriber = new FeedLifecycleEventSubscriber();
+        IActiveLifecycleEventSubscriber eventSubscriber = new ActiveLifecycleEventSubscriber();
         FeedEventsListener listener = (FeedEventsListener) ActiveJobNotificationHandler.INSTANCE
                 .getActiveEntityListener(entityId);
-        if (listener == null || !listener.isEntityConnectedToDataset(dataverseName, datasetName)) {
+        if (listener == null || !listener.isEntityUsingDataset(dataverseName, datasetName)) {
             throw new AsterixException("Feed " + feed.getFeedId().getEntityName() + " is currently not connected to "
                     + cfs.getDatasetName().getValue() + ". Invalid operation!");
         }
@@ -2372,7 +2387,7 @@
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
             JobUtils.runJob(hcc, jobSpec, true);
-            eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_COLLECT_ENDED);
+            eventSubscriber.assertEvent(ActiveLifecycleEvent.FEED_COLLECT_ENDED);
         } catch (Exception e) {
             if (bActiveTxn) {
                 abort(e, e, mdTxnCtx);
@@ -2428,8 +2443,8 @@
             if (compiled != null) {
                 FeedEventsListener listener = (FeedEventsListener) ActiveJobNotificationHandler.INSTANCE
                         .getActiveEntityListener(bfs.getSubscriptionRequest().getReceivingFeedId());
-                FeedConnectJobInfo activeJob = new FeedConnectJobInfo(
-                        bfs.getSubscriptionRequest().getReceivingFeedId(), null, ActivityState.ACTIVE,
+                FeedConnectJobInfo activeJob = new FeedConnectJobInfo(bfs.getSubscriptionRequest().getReceivingFeedId(),
+                        null, ActivityState.ACTIVE,
                         new FeedConnectionId(bfs.getSubscriptionRequest().getReceivingFeedId(), dataset),
                         listener.getSourceFeedJoint(), null, alteredJobSpec, policy.getProperties());
                 alteredJobSpec.setProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, activeJob);
@@ -2484,8 +2499,7 @@
                     dataverseName);
             jobsToExecute.add(DatasetOperations.compactDatasetJobSpec(dataverse, datasetName, metadataProvider));
             ARecordType aRecordType = (ARecordType) dt.getDatatype();
-            ARecordType enforcedType = createEnforcedType(
-                    aRecordType, indexes);
+            ARecordType enforcedType = createEnforcedType(aRecordType, indexes);
             if (ds.getDatasetType() == DatasetType.INTERNAL) {
                 for (int j = 0; j < indexes.size(); j++) {
                     if (indexes.get(j).isSecondaryIndex()) {
@@ -2541,59 +2555,25 @@
         jobsToExecute.add(ExternalIndexingOperations.compactFilesIndexJobSpec(ds, metadataProvider));
     }
 
-    protected void handleQuery(AqlMetadataProvider metadataProvider, Query query, IHyracksClientConnection hcc,
-            IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats) throws Exception {
-
+    protected JobSpecification handleQuery(AqlMetadataProvider metadataProvider, Query query,
+            IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats)
+                    throws Exception {
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         MetadataLockManager.INSTANCE.queryBegin(activeDefaultDataverse, query.getDataverses(), query.getDatasets());
+        JobSpecification compiled;
         try {
-            JobSpecification compiled = rewriteCompileQuery(metadataProvider, query, null);
+            compiled = rewriteCompileQuery(metadataProvider, query, null);
 
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
 
             if (query.isExplain()) {
                 sessionConfig.out().flush();
-                return;
+                return compiled;
             } else if (sessionConfig.isExecuteQuery() && compiled != null) {
-                if (GlobalConfig.ASTERIX_LOGGER.isLoggable(Level.FINE)) {
-                    GlobalConfig.ASTERIX_LOGGER.fine(compiled.toJSON().toString(1));
-                }
-                JobId jobId = JobUtils.runJob(hcc, compiled, false);
-
-                JSONObject response = new JSONObject();
-                switch (resultDelivery) {
-                    case ASYNC:
-                        JSONArray handle = new JSONArray();
-                        handle.put(jobId.getId());
-                        handle.put(metadataProvider.getResultSetId().getId());
-                        response.put("handle", handle);
-                        sessionConfig.out().print(response);
-                        sessionConfig.out().flush();
-                        hcc.waitForCompletion(jobId);
-                        break;
-                    case SYNC:
-                        hcc.waitForCompletion(jobId);
-                        ResultReader resultReader = new ResultReader(hdc);
-                        resultReader.open(jobId, metadataProvider.getResultSetId());
-                        ResultUtil.displayResults(resultReader, sessionConfig, stats,
-                                metadataProvider.findOutputRecordType());
-                        break;
-                    case ASYNC_DEFERRED:
-                        handle = new JSONArray();
-                        handle.put(jobId.getId());
-                        handle.put(metadataProvider.getResultSetId().getId());
-                        response.put("handle", handle);
-                        hcc.waitForCompletion(jobId);
-                        sessionConfig.out().print(response);
-                        sessionConfig.out().flush();
-                        break;
-                    default:
-                        break;
-
-                }
+                handleQueryResult(metadataProvider, hcc, hdc, compiled, resultDelivery, stats);
             }
         } catch (Exception e) {
             LOGGER.log(Level.INFO, e.getMessage(), e);
@@ -2606,6 +2586,47 @@
             // release external datasets' locks acquired during compilation of the query
             ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider);
         }
+        return compiled;
+    }
+
+    private void handleQueryResult(AqlMetadataProvider metadataProvider, IHyracksClientConnection hcc,
+            IHyracksDataset hdc, JobSpecification compiled, ResultDelivery resultDelivery, Stats stats)
+                    throws Exception {
+        if (GlobalConfig.ASTERIX_LOGGER.isLoggable(Level.FINE)) {
+            GlobalConfig.ASTERIX_LOGGER.fine(compiled.toJSON().toString(1));
+        }
+        JobId jobId = JobUtils.runJob(hcc, compiled, false);
+
+        JSONObject response = new JSONObject();
+        switch (resultDelivery) {
+            case ASYNC:
+                JSONArray handle = new JSONArray();
+                handle.put(jobId.getId());
+                handle.put(metadataProvider.getResultSetId().getId());
+                response.put("handle", handle);
+                sessionConfig.out().print(response);
+                sessionConfig.out().flush();
+                hcc.waitForCompletion(jobId);
+                break;
+            case SYNC:
+                hcc.waitForCompletion(jobId);
+                ResultReader resultReader = new ResultReader(hdc);
+                resultReader.open(jobId, metadataProvider.getResultSetId());
+                ResultUtil.displayResults(resultReader, sessionConfig, stats, metadataProvider.findOutputRecordType());
+                break;
+            case ASYNC_DEFERRED:
+                handle = new JSONArray();
+                handle.put(jobId.getId());
+                handle.put(metadataProvider.getResultSetId().getId());
+                response.put("handle", handle);
+                hcc.waitForCompletion(jobId);
+                sessionConfig.out().print(response);
+                sessionConfig.out().flush();
+                break;
+            default:
+                break;
+
+        }
     }
 
     protected void handleCreateNodeGroupStatement(AqlMetadataProvider metadataProvider, Statement stmt)
@@ -2876,8 +2897,7 @@
                 break;
             default:
                 throw new AlgebricksException(
-                        "The system \"" + runStmt.getSystem() +
-                                "\" specified in your run statement is not supported.");
+                        "The system \"" + runStmt.getSystem() + "\" specified in your run statement is not supported.");
         }
 
     }
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index a548b3a..a806532 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -170,7 +170,7 @@
                 indexOpDesc, ctx, PARTITION, primaryIndexInfo.primaryIndexInsertFieldsPermutations, recordDescProvider,
                 op, true);
         CommitRuntime commitOp = new CommitRuntime(ctx, getTxnJobId(), dataset.getDatasetId(),
-                primaryIndexInfo.primaryKeyIndexes, false, true, PARTITION);
+                primaryIndexInfo.primaryKeyIndexes, false, true, PARTITION, true);
         insertOp.setOutputFrameWriter(0, commitOp, primaryIndexInfo.rDesc);
         commitOp.setInputRecordDescriptor(0, primaryIndexInfo.rDesc);
         return insertOp;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/optimizer/OptimizerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/optimizer/OptimizerTest.java
index 6e3f94c..2716859 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/optimizer/OptimizerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/optimizer/OptimizerTest.java
@@ -30,6 +30,7 @@
 
 import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
 import org.apache.asterix.api.java.AsterixJavaClient;
+import org.apache.asterix.app.cc.CompilerExtensionManager;
 import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
 import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.exceptions.AsterixException;
@@ -38,6 +39,7 @@
 import org.apache.asterix.compiler.provider.SqlppCompilationProvider;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.IdentitiyResolverFactory;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
 import org.apache.asterix.test.base.AsterixTestHelper;
 import org.apache.asterix.test.common.TestHelper;
 import org.apache.asterix.test.runtime.HDFSCluster;
@@ -66,15 +68,16 @@
             + "optimizerts" + SEPARATOR;
     private static final String PATH_QUERIES = PATH_BASE + "queries" + SEPARATOR;
     private static final String PATH_EXPECTED = PATH_BASE + "results" + SEPARATOR;
-    private static final String PATH_ACTUAL = "target" + File.separator + "opttest" + SEPARATOR;
+    protected static final String PATH_ACTUAL = "target" + File.separator + "opttest" + SEPARATOR;
 
     private static final ArrayList<String> ignore = AsterixTestHelper.readFile(FILENAME_IGNORE, PATH_BASE);
     private static final ArrayList<String> only = AsterixTestHelper.readFile(FILENAME_ONLY, PATH_BASE);
-    private static final String TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml";
+    protected static String TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml";
     private static final ILangCompilationProvider aqlCompilationProvider = new AqlCompilationProvider();
     private static final ILangCompilationProvider sqlppCompilationProvider = new SqlppCompilationProvider();
+    protected static ILangCompilationProvider extensionLangCompilationProvider = null;
 
-    private static AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil();
+    protected static AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil();
 
     @BeforeClass
     public static void setUp() throws Exception {
@@ -171,9 +174,13 @@
             PrintWriter plan = new PrintWriter(actualFile);
             ILangCompilationProvider provider = queryFile.getName().endsWith("aql") ? aqlCompilationProvider
                     : sqlppCompilationProvider;
+            if (extensionLangCompilationProvider != null) {
+                provider = extensionLangCompilationProvider;
+            }
             IHyracksClientConnection hcc = integrationUtil.getHyracksClientConnection();
             AsterixJavaClient asterix = new AsterixJavaClient(hcc, query, plan, provider,
-                    new DefaultStatementExecutorFactory(null));
+                    new DefaultStatementExecutorFactory(
+                            (CompilerExtensionManager) AsterixAppContextInfo.INSTANCE.getExtensionManager()));
             try {
                 asterix.compile(true, false, false, true, true, false, false);
             } catch (AsterixException e) {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
index 39a4d3b..3929113 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
@@ -52,8 +52,12 @@
         return setUp(cleanup, TEST_CONFIG_FILE_NAME, integrationUtil, true);
     }
 
+    public static List<ILibraryManager> setUp(boolean cleanup, String configFile) throws Exception {
+        return setUp(cleanup, configFile, integrationUtil, true);
+    }
+
     public static List<ILibraryManager> setUp(boolean cleanup, String configFile,
-            AsterixHyracksIntegrationUtil integrationUtil, boolean startHdfs) throws Exception {
+            AsterixHyracksIntegrationUtil alternateIntegrationUtil, boolean startHdfs) throws Exception {
         System.out.println("Starting setup");
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Starting setup");
@@ -63,6 +67,7 @@
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("initializing pseudo cluster");
         }
+        integrationUtil = alternateIntegrationUtil;
         integrationUtil.init(cleanup);
 
         if (LOGGER.isLoggable(Level.INFO)) {
@@ -87,8 +92,8 @@
         libraryManagers.add(AsterixAppContextInfo.INSTANCE.getLibraryManager());
         // Adds library managers for NCs, one-per-NC.
         for (NodeControllerService nc : integrationUtil.ncs) {
-            IAsterixAppRuntimeContext runtimeCtx =
-                    (IAsterixAppRuntimeContext) nc.getApplicationContext().getApplicationObject();
+            IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) nc.getApplicationContext()
+                    .getApplicationObject();
             libraryManagers.add(runtimeCtx.getLibraryManager());
         }
         return libraryManagers;
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/insert-return-custom-result.aql b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/insert-return-custom-result.aql
new file mode 100644
index 0000000..205edc2
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/insert-return-custom-result.aql
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type TweetMessageTypeuuid as closed {
+  tweetid: int,
+  message-text: string,
+  location:point
+}
+
+create dataset TweetMessageuuids(TweetMessageTypeuuid)
+primary key tweetid;
+
+
+insert into dataset TweetMessageuuids as $a(
+let $x :=
+[{ "tweetid":1,"message-text":"hello","location":create-point(6.0,6.0)},
+{"tweetid":2,"message-text":"goodbye","location":create-point(1.0,1.0)},
+{"tweetid":3,"message-text":"the end","location":create-point(6.0,3.0)},
+{"tweetid":4,"message-text":"what","location":create-point(3.0,6.0)},
+{"tweetid":5,"message-text":"good","location":create-point(5.0,6.0)}]
+for $y in $x
+return $y
+) returning
+let $x := create-circle($a.location,5.0)
+order by $a.tweetid
+return {
+  "x":$x,
+  "tweetid":$a.tweetid
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/insert-return-custom-result.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/insert-return-custom-result.plan
new file mode 100644
index 0000000..391b248
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/insert-return-custom-result.plan
@@ -0,0 +1,17 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+          -- STABLE_SORT [$$16(ASC)]  |PARTITIONED|
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              -- COMMIT  |PARTITIONED|
+                -- STREAM_PROJECT  |PARTITIONED|
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    -- INSERT_DELETE  |PARTITIONED|
+                      -- HASH_PARTITION_EXCHANGE [$$16]  |PARTITIONED|
+                        -- ASSIGN  |UNPARTITIONED|
+                          -- STREAM_PROJECT  |UNPARTITIONED|
+                            -- UNNEST  |UNPARTITIONED|
+                              -- ASSIGN  |UNPARTITIONED|
+                                -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-return-records/insert-return-records.1.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-return-records/insert-return-records.1.ddl.aql
new file mode 100644
index 0000000..a75f9da
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-return-records/insert-return-records.1.ddl.aql
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name  : insert-return-records
+ * Description     : Check records returned on insert
+ * Expected Result : Success
+ * Date            : Mar 2015
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type TweetMessageTypeuuid as closed {
+  tweetid: int,
+  message-text: string
+}
+
+create dataset TweetMessageuuids(TweetMessageTypeuuid)
+primary key tweetid;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-return-records/insert-return-records.3.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-return-records/insert-return-records.3.query.aql
new file mode 100644
index 0000000..6fc7112
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-return-records/insert-return-records.3.query.aql
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name  : insert-return-records
+ * Description     : Check records returned on insert
+ * Expected Result : Success
+ * Date            : Mar 2015
+ */
+
+ use dataverse test;
+
+insert into dataset TweetMessageuuids as $message (
+[{ "tweetid":1,"message-text":"hello"},
+{"tweetid":2,"message-text":"goodbye"},
+{"tweetid":3,"message-text":"the end"},
+{"tweetid":4,"message-text":"what"},
+{"tweetid":5,"message-text":"good"}]
+) returning $message;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-returning-fieldname/insert-returning-fieldname.1.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-returning-fieldname/insert-returning-fieldname.1.ddl.aql
new file mode 100644
index 0000000..9e77afb
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-returning-fieldname/insert-returning-fieldname.1.ddl.aql
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name  : insert-returning-fieldname
+ * Description     : Check fields returned on insert
+ * Expected Result : Success
+ * Date            : Mar 2015
+ */
+
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type TweetMessageTypeuuid as closed {
+  tweetid: uuid,
+  message-text: string
+}
+
+create dataset TweetMessageuuids(TweetMessageTypeuuid)
+primary key tweetid autogenerated;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-returning-fieldname/insert-returning-fieldname.3.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-returning-fieldname/insert-returning-fieldname.3.query.aql
new file mode 100644
index 0000000..6a7be26
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-returning-fieldname/insert-returning-fieldname.3.query.aql
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name  : insert-returning-fieldname
+ * Description     : Check fields returned on insert
+ * Expected Result : Success
+ * Date            : Mar 2015
+ */
+
+use dataverse test;
+
+insert into dataset TweetMessageuuids as $message (
+{ "message-text":"hello"}
+) returning $message.message-text;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-with-bad-return/insert-with-bad-return.1.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-with-bad-return/insert-with-bad-return.1.ddl.aql
new file mode 100644
index 0000000..a75f9da
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-with-bad-return/insert-with-bad-return.1.ddl.aql
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name  : insert-return-records
+ * Description     : Check records returned on insert
+ * Expected Result : Success
+ * Date            : Mar 2015
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type TweetMessageTypeuuid as closed {
+  tweetid: int,
+  message-text: string
+}
+
+create dataset TweetMessageuuids(TweetMessageTypeuuid)
+primary key tweetid;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-with-bad-return/insert-with-bad-return.3.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-with-bad-return/insert-with-bad-return.3.query.aql
new file mode 100644
index 0000000..417860d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-with-bad-return/insert-with-bad-return.3.query.aql
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name  : insert-with-bad-return
+ * Description     : Throw an error
+ * Expected Result : Error
+ * Date            : Oct 2016
+ */
+
+ use dataverse test;
+
+insert into dataset TweetMessageuuids as $message (
+[{ "tweetid":1,"message-text":"hello"},
+{"tweetid":2,"message-text":"goodbye"},
+{"tweetid":3,"message-text":"the end"},
+{"tweetid":4,"message-text":"what"},
+{"tweetid":5,"message-text":"good"}]
+) returning
+for $result in dataset TweetMessageuuids
+where $result.message-text = $message
+return $result;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/upsert-return-custom-result/upsert-return-custom-result.1.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/upsert-return-custom-result/upsert-return-custom-result.1.ddl.aql
new file mode 100644
index 0000000..50ff8ae
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/upsert-return-custom-result/upsert-return-custom-result.1.ddl.aql
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name  : upsert-return-custom-result
+ * Description     : Check records returned on upsert
+ * Expected Result : Success
+ * Date            : Mar 2015
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type TweetMessageTypeuuid as closed {
+  tweetid: int,
+  message-text: string,
+  location:point
+}
+
+create dataset TweetMessageuuids(TweetMessageTypeuuid)
+primary key tweetid;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/upsert-return-custom-result/upsert-return-custom-result.3.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/upsert-return-custom-result/upsert-return-custom-result.3.query.aql
new file mode 100644
index 0000000..9abf667
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/upsert-return-custom-result/upsert-return-custom-result.3.query.aql
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name  : upsert-return-custom-result
+ * Description     : Check records returned on upsert
+ * Expected Result : Success
+ * Date            : Mar 2015
+ */
+
+ use dataverse test;
+
+upsert into dataset TweetMessageuuids as $a (
+let $x :=
+[{ "tweetid":1,"message-text":"hello","location":create-point(6.0,6.0)},
+{"tweetid":2,"message-text":"goodbye","location":create-point(1.0,1.0)},
+{"tweetid":3,"message-text":"the end","location":create-point(6.0,3.0)},
+{"tweetid":4,"message-text":"what","location":create-point(3.0,6.0)},
+{"tweetid":5,"message-text":"good","location":create-point(5.0,6.0)}]
+for $y in $x
+return $y
+) returning
+let $x := create-circle($a.location,5.0)
+order by $a.tweetid
+return {
+  "x":$x,
+  "tweetid":$a.tweetid
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/dml/insert-return-records/insert-return-records.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/dml/insert-return-records/insert-return-records.1.adm
new file mode 100644
index 0000000..2ee9b1c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/dml/insert-return-records/insert-return-records.1.adm
@@ -0,0 +1,5 @@
+{ "tweetid": 1, "message-text": "hello" }
+{ "tweetid": 2, "message-text": "goodbye" }
+{ "tweetid": 4, "message-text": "what" }
+{ "tweetid": 3, "message-text": "the end" }
+{ "tweetid": 5, "message-text": "good" }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/dml/insert-returning-fieldname/insert-returning-fieldname.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/dml/insert-returning-fieldname/insert-returning-fieldname.1.adm
new file mode 100644
index 0000000..84ed78b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/dml/insert-returning-fieldname/insert-returning-fieldname.1.adm
@@ -0,0 +1 @@
+"hello"
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/dml/upsert-return-custom-result/upsert-return-custom-result.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/dml/upsert-return-custom-result/upsert-return-custom-result.1.adm
new file mode 100644
index 0000000..8706beb
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/dml/upsert-return-custom-result/upsert-return-custom-result.1.adm
@@ -0,0 +1,5 @@
+{ "x": circle("6.0,6.0 5.0"), "tweetid": 1 }
+{ "x": circle("1.0,1.0 5.0"), "tweetid": 2 }
+{ "x": circle("3.0,6.0 5.0"), "tweetid": 4 }
+{ "x": circle("6.0,3.0 5.0"), "tweetid": 3 }
+{ "x": circle("5.0,6.0 5.0"), "tweetid": 5 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
index 85884f8..d0e342c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -1699,6 +1699,27 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="dml">
+      <compilation-unit name="insert-return-records">
+        <output-dir compare="Text">insert-return-records</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="dml">
+      <compilation-unit name="insert-returning-fieldname">
+        <output-dir compare="Text">insert-returning-fieldname</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="dml">
+      <compilation-unit name="insert-with-bad-return">
+        <output-dir compare="Text">insert-with-bad-return</output-dir>
+        <expected-error>Error: Cannot use datasets in an insert returning query</expected-error>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="dml">
+      <compilation-unit name="upsert-return-custom-result">
+        <output-dir compare="Text">upsert-return-custom-result</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="dml">
       <compilation-unit name="insert-src-dst-01">
         <output-dir compare="Text">insert-src-dst-01</output-dir>
       </compilation-unit>
diff --git a/asterixdb/asterix-doc/src/site/markdown/aql/manual.md b/asterixdb/asterix-doc/src/site/markdown/aql/manual.md
index 510bc83..393beec 100644
--- a/asterixdb/asterix-doc/src/site/markdown/aql/manual.md
+++ b/asterixdb/asterix-doc/src/site/markdown/aql/manual.md
@@ -824,7 +824,7 @@
 
 #### Insert
 
-    InsertStatement ::= "insert" "into" "dataset" QualifiedName Query
+    InsertStatement ::= "insert" "into" "dataset" QualifiedName ( "as" Variable )? Query ( "returning" Query )?
 
 The AQL insert statement is used to insert data into a dataset.
 The data to be inserted comes from an AQL query expression.
@@ -834,13 +834,17 @@
 If the query part of an insert returns a single object, then the insert statement itself will
 be a single, atomic transaction.
 If the query part returns multiple objects, then each object inserted will be handled independently
-as a tranaction. If a dataset has an auto-generated primary key field, an insert statement should not include a value for that field in it. (The system will automatically extend the provided record with this additional field and a corresponding value.)
+as a tranaction. If a dataset has an auto-generated primary key field, an insert statement should not include a value for that field in it. (The system will automatically extend the provided record with this additional field and a corresponding value.).
+The optional "as Variable" provides a variable binding for the inserted records, which can be used in the "returning" clause.
+The optional "returning Query" allows users to run simple queries/functions on the records returned by the insert.
+This query cannot refer to any datasets.
+
 
 The following example illustrates a query-based insertion.
 
 ##### Example
 
-    insert into dataset UsersCopy (for $user in dataset FacebookUsers return $user)
+    insert into dataset UsersCopy as $inserted (for $user in dataset FacebookUsers return $user ) returning $inserted.screen-name
 
 #### Delete
 
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleEventSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IActiveLifecycleEventSubscriber.java
similarity index 75%
rename from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleEventSubscriber.java
rename to asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IActiveLifecycleEventSubscriber.java
index ad3c1c9..be9a245 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleEventSubscriber.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IActiveLifecycleEventSubscriber.java
@@ -20,18 +20,21 @@
 
 import org.apache.asterix.common.exceptions.AsterixException;
 
-public interface IFeedLifecycleEventSubscriber {
+public interface IActiveLifecycleEventSubscriber {
 
-    public enum FeedLifecycleEvent {
+    public enum ActiveLifecycleEvent {
         FEED_INTAKE_STARTED,
         FEED_COLLECT_STARTED,
         FEED_INTAKE_FAILURE,
         FEED_COLLECT_FAILURE,
         FEED_INTAKE_ENDED,
-        FEED_COLLECT_ENDED
+        FEED_COLLECT_ENDED,
+        ACTIVE_JOB_STARTED,
+        ACTIVE_JOB_ENDED,
+        ACTIVE_JOB_FAILED
     }
 
-    public void assertEvent(FeedLifecycleEvent event) throws AsterixException, InterruptedException;
+    public void assertEvent(ActiveLifecycleEvent event) throws AsterixException, InterruptedException;
 
-    public void handleFeedEvent(FeedLifecycleEvent event);
+    public void handleEvent(ActiveLifecycleEvent event);
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedLifecycleEventSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveLifecycleEventSubscriber.java
similarity index 62%
rename from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedLifecycleEventSubscriber.java
rename to asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveLifecycleEventSubscriber.java
index 6e3cebce..7f8efc5 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedLifecycleEventSubscriber.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveLifecycleEventSubscriber.java
@@ -22,26 +22,26 @@
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.feed.api.IFeedLifecycleEventSubscriber;
+import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber;
 
-public class FeedLifecycleEventSubscriber implements IFeedLifecycleEventSubscriber {
+public class ActiveLifecycleEventSubscriber implements IActiveLifecycleEventSubscriber {
 
-    private LinkedBlockingQueue<FeedLifecycleEvent> inbox;
+    private LinkedBlockingQueue<ActiveLifecycleEvent> inbox;
 
-    public FeedLifecycleEventSubscriber() {
-        this.inbox = new LinkedBlockingQueue<FeedLifecycleEvent>();
+    public ActiveLifecycleEventSubscriber() {
+        this.inbox = new LinkedBlockingQueue<>();
     }
 
     @Override
-    public void handleFeedEvent(FeedLifecycleEvent event) {
+    public void handleEvent(ActiveLifecycleEvent event) {
         inbox.add(event);
     }
 
     @Override
-    public void assertEvent(FeedLifecycleEvent event) throws AsterixException, InterruptedException {
+    public void assertEvent(ActiveLifecycleEvent event) throws AsterixException, InterruptedException {
         boolean eventOccurred = false;
-        FeedLifecycleEvent e = null;
-        Iterator<FeedLifecycleEvent> eventsSoFar = inbox.iterator();
+        ActiveLifecycleEvent e;
+        Iterator<ActiveLifecycleEvent> eventsSoFar = inbox.iterator();
         while (eventsSoFar.hasNext()) {
             e = eventsSoFar.next();
             assertNoFailure(e);
@@ -57,9 +57,10 @@
         }
     }
 
-    private void assertNoFailure(FeedLifecycleEvent e) throws AsterixException {
-        if (e.equals(FeedLifecycleEvent.FEED_INTAKE_FAILURE) || e.equals(FeedLifecycleEvent.FEED_COLLECT_FAILURE)) {
-            throw new AsterixException("Failure in feed");
+    private void assertNoFailure(ActiveLifecycleEvent e) throws AsterixException {
+        if (e.equals(ActiveLifecycleEvent.FEED_INTAKE_FAILURE) || e.equals(ActiveLifecycleEvent.FEED_COLLECT_FAILURE)
+                || e.equals(ActiveLifecycleEvent.ACTIVE_JOB_FAILED)) {
+            throw new AsterixException("Failure in active job.");
         }
     }
 
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
index 0c49c92..02d2c8b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
@@ -34,8 +34,8 @@
 import org.apache.asterix.external.feed.api.FeedOperationCounter;
 import org.apache.asterix.external.feed.api.IFeedJoint;
 import org.apache.asterix.external.feed.api.IFeedJoint.State;
-import org.apache.asterix.external.feed.api.IFeedLifecycleEventSubscriber;
-import org.apache.asterix.external.feed.api.IFeedLifecycleEventSubscriber.FeedLifecycleEvent;
+import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber;
+import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber.ActiveLifecycleEvent;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
 import org.apache.asterix.external.feed.watch.FeedConnectJobInfo;
 import org.apache.asterix.external.feed.watch.FeedIntakeInfo;
@@ -63,7 +63,7 @@
 public class FeedEventsListener implements IActiveEntityEventsListener {
     private static final Logger LOGGER = Logger.getLogger(FeedEventsListener.class);
     private final Map<EntityId, Pair<FeedOperationCounter, List<IFeedJoint>>> feedPipeline;
-    private final List<IFeedLifecycleEventSubscriber> subscribers;
+    private final List<IActiveLifecycleEventSubscriber> subscribers;
     private final Map<Long, ActiveJob> jobs;
     private final Map<Long, ActiveJob> intakeJobs;
     private final Map<EntityId, FeedIntakeInfo> entity2Intake;
@@ -145,7 +145,7 @@
             case FEED_CONNECT:
                 ((FeedConnectJobInfo) jobInfo).partitionStart();
                 if (((FeedConnectJobInfo) jobInfo).collectionStarted()) {
-                    notifyFeedEventSubscribers(FeedLifecycleEvent.FEED_COLLECT_STARTED);
+                    notifyFeedEventSubscribers(ActiveLifecycleEvent.FEED_COLLECT_STARTED);
                 }
                 break;
             case INTAKE:
@@ -161,7 +161,7 @@
         if (feedPipeline.get(message.getEntityId()).first.decrementAndGet() == 0) {
             ((FeedIntakeInfo) jobInfo).getIntakeFeedJoint().setState(State.ACTIVE);
             jobInfo.setState(ActivityState.ACTIVE);
-            notifyFeedEventSubscribers(FeedLifecycleEvent.FEED_INTAKE_STARTED);
+            notifyFeedEventSubscribers(ActiveLifecycleEvent.FEED_INTAKE_STARTED);
         }
     }
 
@@ -339,10 +339,10 @@
         return locations;
     }
 
-    private synchronized void notifyFeedEventSubscribers(FeedLifecycleEvent event) {
+    private synchronized void notifyFeedEventSubscribers(ActiveLifecycleEvent event) {
         if (subscribers != null && !subscribers.isEmpty()) {
-            for (IFeedLifecycleEventSubscriber subscriber : subscribers) {
-                subscriber.handleFeedEvent(event);
+            for (IActiveLifecycleEventSubscriber subscriber : subscribers) {
+                subscriber.handleEvent(event);
             }
         }
     }
@@ -362,8 +362,8 @@
         // notify event listeners
         feedPipeline.remove(feedId);
         entity2Intake.remove(feedId);
-        notifyFeedEventSubscribers(pair.first.isFailedIngestion() ? FeedLifecycleEvent.FEED_INTAKE_FAILURE
-                : FeedLifecycleEvent.FEED_INTAKE_ENDED);
+        notifyFeedEventSubscribers(pair.first.isFailedIngestion() ? ActiveLifecycleEvent.FEED_INTAKE_FAILURE
+                : ActiveLifecycleEvent.FEED_INTAKE_ENDED);
     }
 
     private synchronized void handleFeedCollectJobFinishMessage(FeedConnectJobInfo cInfo) throws Exception {
@@ -389,8 +389,8 @@
         connectJobInfos.remove(connectionId);
         jobs.remove(cInfo.getJobId().getId());
         // notify event listeners
-        FeedLifecycleEvent event =
-                failure ? FeedLifecycleEvent.FEED_COLLECT_FAILURE : FeedLifecycleEvent.FEED_COLLECT_ENDED;
+        ActiveLifecycleEvent event =
+                failure ? ActiveLifecycleEvent.FEED_COLLECT_FAILURE : ActiveLifecycleEvent.FEED_COLLECT_ENDED;
         notifyFeedEventSubscribers(event);
     }
 
@@ -569,16 +569,16 @@
 
     }
 
-    public synchronized void registerFeedEventSubscriber(IFeedLifecycleEventSubscriber subscriber) {
+    public synchronized void registerFeedEventSubscriber(IActiveLifecycleEventSubscriber subscriber) {
         subscribers.add(subscriber);
     }
 
-    public void deregisterFeedEventSubscriber(IFeedLifecycleEventSubscriber subscriber) {
+    public void deregisterFeedEventSubscriber(IActiveLifecycleEventSubscriber subscriber) {
         subscribers.remove(subscriber);
     }
 
     public synchronized boolean isFeedConnectionActive(FeedConnectionId connectionId,
-            IFeedLifecycleEventSubscriber eventSubscriber) {
+            IActiveLifecycleEventSubscriber eventSubscriber) {
         boolean active = false;
         FeedConnectJobInfo cInfo = connectJobInfos.get(connectionId);
         if (cInfo != null) {
@@ -643,7 +643,7 @@
     }
 
     @Override
-    public boolean isEntityConnectedToDataset(String dataverseName, String datasetName) {
+    public boolean isEntityUsingDataset(String dataverseName, String datasetName) {
         return isConnectedToDataset(datasetName);
     }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedActivity.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedActivity.java
deleted file mode 100644
index aac6676..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedActivity.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.external.feed.watch;
-
-import java.util.Map;
-
-public class FeedActivity implements Comparable<FeedActivity> {
-
-    private int activityId;
-
-    private final String dataverseName;
-    private final String datasetName;
-    private final String feedName;
-    private final Map<String, String> feedActivityDetails;
-
-    public static class FeedActivityDetails {
-        public static final String INTAKE_LOCATIONS = "intake-locations";
-        public static final String COMPUTE_LOCATIONS = "compute-locations";
-        public static final String STORAGE_LOCATIONS = "storage-locations";
-        public static final String COLLECT_LOCATIONS = "collect-locations";
-        public static final String FEED_POLICY_NAME = "feed-policy-name";
-        public static final String FEED_CONNECT_TIMESTAMP = "feed-connect-timestamp";
-    }
-
-    public FeedActivity(String dataverseName, String feedName, String datasetName,
-            Map<String, String> feedActivityDetails) {
-        this.dataverseName = dataverseName;
-        this.feedName = feedName;
-        this.datasetName = datasetName;
-        this.feedActivityDetails = feedActivityDetails;
-    }
-
-    public String getDataverseName() {
-        return dataverseName;
-    }
-
-    public String getDatasetName() {
-        return datasetName;
-    }
-
-    public String getFeedName() {
-        return feedName;
-    }
-
-    @Override
-    public boolean equals(Object other) {
-        if (this == other) {
-            return true;
-        }
-        if (!(other instanceof FeedActivity)) {
-            return false;
-        }
-
-        if (!((FeedActivity) other).dataverseName.equals(dataverseName)) {
-            return false;
-        }
-        if (!((FeedActivity) other).datasetName.equals(datasetName)) {
-            return false;
-        }
-        if (!((FeedActivity) other).getFeedName().equals(feedName)) {
-            return false;
-        }
-        if (((FeedActivity) other).getActivityId() != (activityId)) {
-            return false;
-        }
-
-        return true;
-    }
-
-    @Override
-    public int hashCode() {
-        return toString().hashCode();
-    }
-
-    @Override
-    public String toString() {
-        return dataverseName + "." + feedName + " --> " + datasetName + " " + activityId;
-    }
-
-    public String getConnectTimestamp() {
-        return feedActivityDetails.get(FeedActivityDetails.FEED_CONNECT_TIMESTAMP);
-    }
-
-    public int getActivityId() {
-        return activityId;
-    }
-
-    public void setActivityId(int activityId) {
-        this.activityId = activityId;
-    }
-
-    public Map<String, String> getFeedActivityDetails() {
-        return feedActivityDetails;
-    }
-
-    @Override
-    public int compareTo(FeedActivity o) {
-        return o.getActivityId() - this.activityId;
-    }
-
-}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedActivityDetails.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedActivityDetails.java
new file mode 100644
index 0000000..496cc53
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedActivityDetails.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.external.feed.watch;
+
+public class FeedActivityDetails {
+    public static final String INTAKE_LOCATIONS = "intake-locations";
+    public static final String COMPUTE_LOCATIONS = "compute-locations";
+    public static final String STORAGE_LOCATIONS = "storage-locations";
+    public static final String COLLECT_LOCATIONS = "collect-locations";
+    public static final String FEED_POLICY_NAME = "feed-policy-name";
+    public static final String FEED_CONNECT_TIMESTAMP = "feed-connect-timestamp";
+}
diff --git a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
index d4bf6bc..f2f04d8 100644
--- a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
+++ b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
@@ -30,7 +30,7 @@
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.external.feed.management.FeedConnectionRequest;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
-import org.apache.asterix.external.feed.watch.FeedActivity;
+import org.apache.asterix.external.feed.watch.FeedActivityDetails;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.lang.aql.parser.AQLParserFactory;
 import org.apache.asterix.lang.common.base.IParser;
@@ -72,9 +72,9 @@
     public void initialize(MetadataTransactionContext mdTxnCtx) throws MetadataException {
         this.query = new Query(false);
         EntityId sourceFeedId = connectionRequest.getFeedJointKey().getFeedId();
-        Feed subscriberFeed =
-                MetadataManager.INSTANCE.getFeed(mdTxnCtx, connectionRequest.getReceivingFeedId().getDataverse(),
-                        connectionRequest.getReceivingFeedId().getEntityName());
+        Feed subscriberFeed = MetadataManager.INSTANCE.getFeed(mdTxnCtx,
+                connectionRequest.getReceivingFeedId().getDataverse(),
+                connectionRequest.getReceivingFeedId().getEntityName());
         if (subscriberFeed == null) {
             throw new IllegalStateException(" Subscriber feed " + subscriberFeed + " not found.");
         }
@@ -95,8 +95,8 @@
         StringBuilder builder = new StringBuilder();
         builder.append("use dataverse " + sourceFeedId.getDataverse() + ";\n");
         builder.append("set" + " " + FunctionUtil.IMPORT_PRIVATE_FUNCTIONS + " " + "'" + Boolean.TRUE + "'" + ";\n");
-        builder.append("set" + " " + FeedActivity.FeedActivityDetails.FEED_POLICY_NAME + " " + "'"
-                + connectionRequest.getPolicy() + "'" + ";\n");
+        builder.append("set" + " " + FeedActivityDetails.FEED_POLICY_NAME + " " + "'" + connectionRequest.getPolicy()
+                + "'" + ";\n");
 
         builder.append("insert into dataset " + connectionRequest.getTargetDataset() + " ");
         builder.append(" (" + " for $x in feed-collect ('" + sourceFeedId.getDataverse() + "'" + "," + "'"
diff --git a/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj b/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
index d17861e..463805b 100644
--- a/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
+++ b/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
@@ -926,18 +926,27 @@
 
 InsertStatement InsertStatement() throws ParseException:
 {
+  VariableExpr var = null;
   Pair<Identifier,Identifier> nameComponents = null;
   Query query;
+  Query returnQuery = null;
   boolean upsert = false;
 }
 {
-  (<INSERT>|<UPSERT>{ upsert = true; }) <INTO> <DATASET> nameComponents = QualifiedName() query = Query()
+  (<INSERT>|<UPSERT>{ upsert = true; }) <INTO> <DATASET> nameComponents = QualifiedName()
+  (<AS> var = Variable())?
+  {
+    if(var != null){
+      getCurrentScope().addNewVarSymbolToScope(var.getVar());
+    }
+  }
+  query = Query() ( <RETURNING> returnQuery = Query())?
     {
       query.setTopLevel(true);
       if(upsert){
-        return new UpsertStatement(nameComponents.first, nameComponents.second, query, getVarCounter());
+        return new UpsertStatement(nameComponents.first, nameComponents.second, query, getVarCounter(), var, returnQuery);
       } else{
-        return new InsertStatement(nameComponents.first, nameComponents.second, query, getVarCounter());
+        return new InsertStatement(nameComponents.first, nameComponents.second, query, getVarCounter(), var, returnQuery);
       }
     }
 }
@@ -2646,6 +2655,7 @@
   | <PRIMARY : "primary">
   | <REFRESH : "refresh">
   | <RETURN : "return">
+  | <RETURNING : "returning">
   | <RTREE : "rtree">
   | <RUN : "run">
   | <SATISFIES : "satisfies">
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/InsertStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/InsertStatement.java
index b2dc72e..987c3d9 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/InsertStatement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/InsertStatement.java
@@ -20,6 +20,7 @@
 
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.lang.common.expression.VariableExpr;
 import org.apache.asterix.lang.common.struct.Identifier;
 import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
 import org.apache.commons.lang3.ObjectUtils;
@@ -29,13 +30,18 @@
     private final Identifier dataverseName;
     private final Identifier datasetName;
     private final Query query;
-    private final int varCounter;
+    private int varCounter;
+    private final VariableExpr var;
+    private Query returnQuery;
 
-    public InsertStatement(Identifier dataverseName, Identifier datasetName, Query query, int varCounter) {
+    public InsertStatement(Identifier dataverseName, Identifier datasetName, Query query, int varCounter,
+            VariableExpr var, Query returnQuery) {
         this.dataverseName = dataverseName;
         this.datasetName = datasetName;
         this.query = query;
         this.varCounter = varCounter;
+        this.var = var;
+        this.returnQuery = returnQuery;
     }
 
     @Override
@@ -55,10 +61,26 @@
         return query;
     }
 
+    public void addToVarCounter(int addition) {
+        varCounter += addition;
+    }
+
     public int getVarCounter() {
         return varCounter;
     }
 
+    public VariableExpr getVar() {
+        return var;
+    }
+
+    public Query getReturnQuery() {
+        return returnQuery;
+    }
+
+    public void setRewrittenReturnQuery(Query rewrittenReturnQuery) {
+        this.returnQuery = rewrittenReturnQuery;
+    }
+
     @Override
     public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws AsterixException {
         return visitor.visit(this, arg);
@@ -66,7 +88,7 @@
 
     @Override
     public int hashCode() {
-        return ObjectUtils.hashCodeMulti(datasetName, dataverseName, query);
+        return ObjectUtils.hashCodeMulti(datasetName, dataverseName, query, varCounter, var, returnQuery);
     }
 
     @Override
@@ -79,12 +101,17 @@
         }
         InsertStatement target = (InsertStatement) object;
         return ObjectUtils.equals(datasetName, target.datasetName)
-                && ObjectUtils.equals(dataverseName, target.dataverseName) && ObjectUtils.equals(query, target.query);
+                && ObjectUtils.equals(dataverseName, target.dataverseName) && ObjectUtils.equals(query, target.query)
+                && ObjectUtils.equals(varCounter, target.varCounter) && ObjectUtils.equals(var, target.var)
+                && ObjectUtils.equals(returnQuery, target.returnQuery);
     }
 
     @Override
     public byte getCategory() {
-        return Category.UPDATE;
+        if (var == null) {
+            return Category.UPDATE;
+        }
+        return Category.QUERY;
     }
 
 }
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/UpsertStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/UpsertStatement.java
index 1fb1de2..a82d948 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/UpsertStatement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/UpsertStatement.java
@@ -19,12 +19,14 @@
 package org.apache.asterix.lang.common.statement;
 
 import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.lang.common.expression.VariableExpr;
 import org.apache.asterix.lang.common.struct.Identifier;
 
 public class UpsertStatement extends InsertStatement {
 
-    public UpsertStatement(Identifier dataverseName, Identifier datasetName, Query query, int varCounter) {
-        super(dataverseName, datasetName, query, varCounter);
+    public UpsertStatement(Identifier dataverseName, Identifier datasetName, Query query, int varCounter,
+            VariableExpr var, Query returnQuery) {
+        super(dataverseName, datasetName, query, varCounter, var, returnQuery);
     }
 
     @Override
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
index bcbdd26..50682c9 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
+++ b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
@@ -969,7 +969,7 @@
   <INSERT> <INTO> nameComponents = QualifiedName() query = Query(false)
     {
       query.setTopLevel(true);
-      return new InsertStatement(nameComponents.first, nameComponents.second, query, getVarCounter());
+      return new InsertStatement(nameComponents.first, nameComponents.second, query, getVarCounter(), null, null);
     }
 }
 
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/base/ADateTime.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/base/ADateTime.java
index 3b35486..f3592e3 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/base/ADateTime.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/base/ADateTime.java
@@ -20,14 +20,13 @@
 
 import java.io.IOException;
 
-import org.json.JSONException;
-import org.json.JSONObject;
-
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.om.base.temporal.GregorianCalendarSystem;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.om.visitors.IOMVisitor;
+import org.json.JSONException;
+import org.json.JSONObject;
 
 /**
  * ADateTime type represents the timestamp values.
@@ -40,12 +39,15 @@
  * - minute; <br/>
  * - second; <br/>
  * - millisecond. <br/>
- * By default, an ADateTime value is a UTC time value, i.e., there is no timezone information maintained. However user can use the timezone based AQL function to convert a UTC time to a timezone-embedded time.
+ * By default, an ADateTime value is a UTC time value, i.e., there is no timezone information maintained. However user
+ * can use the timezone based AQL function to convert a UTC time to a timezone-embedded time.
  * <p/>
  * And the string representation of an ADateTime value follows the ISO8601 standard, in the following format:<br/>
  * [+|-]YYYY-MM-DDThh:mm:ss.xxxZ
  * <p/>
- * Internally, an ADateTime value is stored as the number of milliseconds elapsed since 1970-01-01T00:00:00.000Z (also called chronon time). Functions to convert between a string representation of an ADateTime and its chronon time are implemented in {@link GregorianCalendarSystem}.
+ * Internally, an ADateTime value is stored as the number of milliseconds elapsed since 1970-01-01T00:00:00.000Z (also
+ * called chronon time). Functions to convert between a string representation of an ADateTime and its chronon time are
+ * implemented in {@link GregorianCalendarSystem}.
  * <p/>
  */
 public class ADateTime implements IAObject {
@@ -122,6 +124,13 @@
         return sbder.toString();
     }
 
+    public String toSimpleString() throws IOException {
+        StringBuilder sbder = new StringBuilder();
+        GregorianCalendarSystem.getInstance().getExtendStringRepUntilField(chrononTime, 0, sbder,
+                GregorianCalendarSystem.Fields.YEAR, GregorianCalendarSystem.Fields.MILLISECOND, true);
+        return sbder.toString();
+    }
+
     public long getChrononTime() {
         return chrononTime;
     }
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/base/AUUID.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/base/AUUID.java
index 329ae53..025866c 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/base/AUUID.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/base/AUUID.java
@@ -37,13 +37,12 @@
     public static final int UUID_CHARS = 36;
     public static final int UUID_BYTES = 16;
 
-    protected final byte [] uuidBytes;
+    protected final byte[] uuidBytes;
 
-    protected static final char [] CHARS;
-
+    private static final char[] CHARS;
 
     static {
-        CHARS = new char [16];
+        CHARS = new char[16];
         for (int i = 0; i < 16; i++) {
             CHARS[i] = Character.forDigit(i, 16);
         }
@@ -53,7 +52,7 @@
         this(new byte[UUID_BYTES]);
     }
 
-    public AUUID(byte [] bytes) {
+    public AUUID(byte[] bytes) {
         this.uuidBytes = bytes;
     }
 
@@ -95,11 +94,16 @@
         return appendLiteralOnly(buf).append('}').toString();
     }
 
+    public String toSimpleString() {
+        StringBuilder buf = new StringBuilder(UUID_CHARS + 9);
+        return appendLiteralOnly(buf).toString();
+    }
+
     public StringBuilder appendLiteralOnly(StringBuilder buf) {
         return appendLiteralOnly(uuidBytes, 0, buf);
     }
 
-    private static StringBuilder digits(byte b [], int offset, int count, StringBuilder result) {
+    private static StringBuilder digits(byte b[], int offset, int count, StringBuilder result) {
         for (int i = 0; i < count; i++) {
             result.append(CHARS[(b[offset + i] >> 4) & 0xf]);
             result.append(CHARS[b[offset + i] & 0xf]);
@@ -107,7 +111,7 @@
         return result;
     }
 
-    public static StringBuilder appendLiteralOnly(byte [] bytes, int offset, StringBuilder result) {
+    public static StringBuilder appendLiteralOnly(byte[] bytes, int offset, StringBuilder result) {
         digits(bytes, offset, 4, result).append('-');
         digits(bytes, offset + 4, 2, result).append('-');
         digits(bytes, offset + 6, 2, result).append('-');
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
index cc7a75f..2d13baf 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
@@ -26,7 +26,7 @@
     DISTRIBUTE_RESULT,
     EMPTYTUPLESOURCE,
     EXCHANGE,
-    EXTENSION_OPERATOR,
+    DELEGATE_OPERATOR,
     EXTERNAL_LOOKUP,
     GROUP,
     INDEX_INSERT_DELETE_UPSERT,
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractExtensibleLogicalOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractDelegatedLogicalOperator.java
similarity index 95%
rename from hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractExtensibleLogicalOperator.java
rename to hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractDelegatedLogicalOperator.java
index dd555e2..9a66e72 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractExtensibleLogicalOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractDelegatedLogicalOperator.java
@@ -27,7 +27,7 @@
 /**
  * @author rico
  */
-public abstract class AbstractExtensibleLogicalOperator implements IOperatorExtension {
+public abstract class AbstractDelegatedLogicalOperator implements IOperatorDelegate {
 
     private AbstractLogicalOperator.ExecutionMode mode = AbstractLogicalOperator.ExecutionMode.UNPARTITIONED;
     protected List<LogicalVariable> schema;
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ExtensionOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/DelegateOperator.java
similarity index 88%
rename from hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ExtensionOperator.java
rename to hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/DelegateOperator.java
index d2f7715..3667e6b 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ExtensionOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/DelegateOperator.java
@@ -34,11 +34,11 @@
 /**
  * @author rico
  */
-public class ExtensionOperator extends AbstractLogicalOperator {
+public class DelegateOperator extends AbstractLogicalOperator {
 
-    private IOperatorExtension delegate;
+    private IOperatorDelegate delegate;
 
-    public ExtensionOperator(IOperatorExtension delegate) {
+    public DelegateOperator(IOperatorDelegate delegate) {
         super();
         if (delegate == null) {
             throw new IllegalArgumentException("delegate cannot be null!");
@@ -54,13 +54,14 @@
     }
 
     @Override
-    public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform transform) throws AlgebricksException {
+    public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform transform)
+            throws AlgebricksException {
         return delegate.acceptExpressionTransform(transform);
     }
 
     @Override
     public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg) throws AlgebricksException {
-        return visitor.visitExtensionOperator(this, arg);
+        return visitor.visitDelegateOperator(this, arg);
     }
 
     @Override
@@ -80,10 +81,10 @@
 
     @Override
     public LogicalOperatorTag getOperatorTag() {
-        return LogicalOperatorTag.EXTENSION_OPERATOR;
+        return LogicalOperatorTag.DELEGATE_OPERATOR;
     }
 
-    public IOperatorExtension getNewInstanceOfDelegateOperator() {
+    public IOperatorDelegate getNewInstanceOfDelegateOperator() {
         return delegate.newInstance();
     }
 
@@ -117,7 +118,7 @@
         return delegate.toString();
     }
 
-    public IOperatorExtension getDelegate() {
+    public IOperatorDelegate getDelegate() {
         return delegate;
     }
 
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IOperatorExtension.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IOperatorDelegate.java
similarity index 95%
rename from hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IOperatorExtension.java
rename to hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IOperatorDelegate.java
index e61d9a2..a052c3f 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IOperatorExtension.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IOperatorDelegate.java
@@ -30,13 +30,13 @@
 /**
  * @author rico
  */
-public interface IOperatorExtension {
+public interface IOperatorDelegate {
 
     void setExecutionMode(ExecutionMode mode);
 
     boolean isMap();
 
-    public IOperatorExtension newInstance();
+    public IOperatorDelegate newInstance();
 
     boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform transform) throws AlgebricksException;
 
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
index d278078..d0aea60 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
@@ -35,7 +35,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -164,7 +164,7 @@
     }
 
     @Override
-    public Long visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException {
+    public Long visitDelegateOperator(DelegateOperator op, Void arg) throws AlgebricksException {
         return UNKNOWN;
     }
 
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
index b259869..4843f81 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
@@ -53,7 +53,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -764,7 +764,7 @@
     }
 
     @Override
-    public Void visitExtensionOperator(ExtensionOperator op, IOptimizationContext ctx) throws AlgebricksException {
+    public Void visitDelegateOperator(DelegateOperator op, IOptimizationContext ctx) throws AlgebricksException {
         propagateFDsAndEquivClasses(op, ctx);
         return null;
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
index 7f34e8b..b3b9da1 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
@@ -41,7 +41,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -119,9 +119,9 @@
     }
 
     @Override
-    public Boolean visitExtensionOperator(ExtensionOperator op, ILogicalOperator arg) throws AlgebricksException {
-        ExtensionOperator aop = (ExtensionOperator) copyAndSubstituteVar(op, arg);
-        if (aop.getOperatorTag() != LogicalOperatorTag.EXTENSION_OPERATOR) {
+    public Boolean visitDelegateOperator(DelegateOperator op, ILogicalOperator arg) throws AlgebricksException {
+        DelegateOperator aop = (DelegateOperator) copyAndSubstituteVar(op, arg);
+        if (aop.getOperatorTag() != LogicalOperatorTag.DELEGATE_OPERATOR) {
             return Boolean.FALSE;
         }
         return Boolean.TRUE;
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
index 58b31f8..52d8e64 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
@@ -44,7 +44,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -499,7 +499,7 @@
     }
 
     @Override
-    public Void visitExtensionOperator(ExtensionOperator op, ILogicalOperator arg) throws AlgebricksException {
+    public Void visitDelegateOperator(DelegateOperator op, ILogicalOperator arg) throws AlgebricksException {
         mapVariablesStandard(op, arg);
         return null;
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
index f4b3195..0da9110 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
@@ -41,7 +41,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
@@ -548,7 +548,7 @@
     }
 
     @Override
-    public ILogicalOperator visitExtensionOperator(ExtensionOperator op, ILogicalOperator arg)
+    public ILogicalOperator visitDelegateOperator(DelegateOperator op, ILogicalOperator arg)
             throws AlgebricksException {
         throw new UnsupportedOperationException();
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
index 7e92869..bdabbca 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
@@ -34,7 +34,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -350,7 +350,7 @@
     }
 
     @Override
-    public Void visitExtensionOperator(ExtensionOperator op, IOptimizationContext arg) throws AlgebricksException {
+    public Void visitDelegateOperator(DelegateOperator op, IOptimizationContext arg) throws AlgebricksException {
         // TODO Auto-generated method stub
         return null;
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
index 442899f..7543e5f 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
@@ -39,7 +39,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -379,8 +379,8 @@
     }
 
     @Override
-    public ILogicalOperator visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException {
-        return new ExtensionOperator(op.getNewInstanceOfDelegateOperator());
+    public ILogicalOperator visitDelegateOperator(DelegateOperator op, Void arg) throws AlgebricksException {
+        return new DelegateOperator(op.getNewInstanceOfDelegateOperator());
     }
 
     @Override
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java
index 9f1acea..c96276f 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java
@@ -36,7 +36,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -164,7 +164,7 @@
     }
 
     @Override
-    public Void visitExtensionOperator(ExtensionOperator op, IOptimizationContext ctx) throws AlgebricksException {
+    public Void visitDelegateOperator(DelegateOperator op, IOptimizationContext ctx) throws AlgebricksException {
         return null;
     }
 
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
index 3645aff..ec96d48 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
@@ -40,7 +40,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -283,7 +283,7 @@
     }
 
     @Override
-    public Void visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException {
+    public Void visitDelegateOperator(DelegateOperator op, Void arg) throws AlgebricksException {
         op.getDelegate().getProducedVariables(producedVariables);
         return null;
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
index a746cf2..28f4e5e 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
@@ -38,7 +38,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -319,7 +319,7 @@
     }
 
     @Override
-    public Void visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException {
+    public Void visitDelegateOperator(DelegateOperator op, Void arg) throws AlgebricksException {
         standardLayout(op);
         return null;
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index 7345928..cf24ee7 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -38,7 +38,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -471,7 +471,7 @@
     }
 
     @Override
-    public Void visitExtensionOperator(ExtensionOperator op, Pair<LogicalVariable, LogicalVariable> arg)
+    public Void visitDelegateOperator(DelegateOperator op, Pair<LogicalVariable, LogicalVariable> arg)
             throws AlgebricksException {
         return null;
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index 3daa00f..b8cb4ff 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -38,7 +38,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -443,7 +443,7 @@
     }
 
     @Override
-    public Void visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException {
+    public Void visitDelegateOperator(DelegateOperator op, Void arg) throws AlgebricksException {
         op.getDelegate().getUsedVariables(usedVariables);
         return null;
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index d3dd166..71ac8f3 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -36,7 +36,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -438,7 +438,7 @@
     }
 
     @Override
-    public Void visitExtensionOperator(ExtensionOperator op, Integer indent) throws AlgebricksException {
+    public Void visitDelegateOperator(DelegateOperator op, Integer indent) throws AlgebricksException {
         addIndent(indent).append(op.toString());
         return null;
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
index f5ff8b4..8da41e2 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
@@ -26,7 +26,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -78,7 +78,7 @@
 
     public R visitSelectOperator(SelectOperator op, T arg) throws AlgebricksException;
 
-    public R visitExtensionOperator(ExtensionOperator op, T arg) throws AlgebricksException;
+    public R visitDelegateOperator(DelegateOperator op, T arg) throws AlgebricksException;
 
     public R visitProjectOperator(ProjectOperator op, T arg) throws AlgebricksException;
 
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java
index c5d7291..080828d 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java
@@ -37,7 +37,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
@@ -164,7 +164,7 @@
     }
 
     @Override
-    public ILogicalOperator visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException {
+    public ILogicalOperator visitDelegateOperator(DelegateOperator op, Void arg) throws AlgebricksException {
         return visit(op);
     }