Ensure nextFrame and flush are not called in a failed pipeline.

Change-Id: I9827b06f640858f27ec1bcca2a39991780bee3b1

Change-Id: I7859bf51e2203ef8518168e94a8a2d935193f9f3
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index 2a0631c..a815f3a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -25,7 +25,6 @@
 import java.io.IOException;
 import java.net.Inet4Address;
 import java.util.ArrayList;
-import java.util.EnumSet;
 import java.util.List;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -42,9 +41,6 @@
 import org.apache.hyracks.api.application.INCApplication;
 import org.apache.hyracks.api.client.HyracksConnection;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.job.JobFlag;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.common.config.ConfigManager;
 import org.apache.hyracks.control.common.controllers.CCConfig;
@@ -131,7 +127,8 @@
         ccConfig.setClusterListenPort(DEFAULT_HYRACKS_CC_CLUSTER_PORT);
         ccConfig.setResultTTL(120000L);
         ccConfig.setResultSweepThreshold(1000L);
-        configManager.set(ControllerConfig.Option.DEFAULT_DIR, joinPath(getDefaultStoragePath(), "asterixdb"));
+        ccConfig.setEnforceFrameWriterProtocol(true);
+        configManager.set(ControllerConfig.Option.DEFAULT_DIR, getDefaultStoragePath());
         return ccConfig;
     }
 
@@ -150,7 +147,7 @@
         ncConfig.setResultTTL(120000L);
         ncConfig.setResultSweepThreshold(1000L);
         ncConfig.setVirtualNC(true);
-        configManager.set(ControllerConfig.Option.DEFAULT_DIR, joinPath(getDefaultStoragePath(), "asterixdb", ncName));
+        configManager.set(ControllerConfig.Option.DEFAULT_DIR, joinPath(getDefaultStoragePath(), ncName));
         return ncConfig;
     }
 
@@ -215,13 +212,6 @@
         }
     }
 
-    public void runJob(JobSpecification spec) throws Exception {
-        GlobalConfig.ASTERIX_LOGGER.info(spec.toJSON().toString());
-        JobId jobId = hcc.startJob(spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
-        GlobalConfig.ASTERIX_LOGGER.info(jobId.toString());
-        hcc.waitForCompletion(jobId);
-    }
-
     protected String getDefaultStoragePath() {
         return joinPath("target", "io", "dir");
     }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 80c05db..bda57a7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -28,6 +28,7 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
@@ -191,8 +192,10 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileSplit;
 import org.apache.hyracks.api.io.UnmanagedFileSplit;
+import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.control.common.controllers.CCConfig;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 
 /*
@@ -214,6 +217,7 @@
     protected final IRewriterFactory rewriterFactory;
     protected final IStorageComponentProvider componentProvider;
     protected final ExecutorService executorService;
+    protected final EnumSet<JobFlag> jobFlags = EnumSet.noneOf(JobFlag.class);
 
     public QueryTranslator(ICcApplicationContext appCtx, List<Statement> statements, SessionOutput output,
             ILangCompilationProvider compliationProvider, IStorageComponentProvider componentProvider,
@@ -228,6 +232,9 @@
         rewriterFactory = compliationProvider.getRewriterFactory();
         activeDataverse = MetadataBuiltinEntities.DEFAULT_DATAVERSE;
         this.executorService = executorService;
+        if (appCtx.getServiceContext().getAppConfig().getBoolean(CCConfig.Option.ENFORCE_FRAME_WRITER_PROTOCOL)) {
+            this.jobFlags.add(JobFlag.ENFORCE_CONTRACT);
+        }
     }
 
     public SessionOutput getSessionOutput() {
@@ -621,7 +628,7 @@
                 progress.setValue(ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA);
 
                 // #. runJob
-                JobUtils.runJob(hcc, jobSpec, true);
+                runJob(hcc, jobSpec);
 
                 // #. begin new metadataTxn
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
@@ -653,7 +660,7 @@
                     JobSpecification jobSpec = DatasetUtil.dropDatasetJobSpec(dataset, metadataProvider);
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                     bActiveTxn = false;
-                    JobUtils.runJob(hcc, jobSpec, true);
+                    runJob(hcc, jobSpec);
                 } catch (Exception e2) {
                     e.addSuppressed(e2);
                     if (bActiveTxn) {
@@ -900,7 +907,7 @@
                                 "Failed to create job spec for replicating Files Index For external dataset");
                     }
                     filesIndexReplicated = true;
-                    JobUtils.runJob(hcc, spec, true);
+                    runJob(hcc, spec);
                 }
             }
 
@@ -937,7 +944,7 @@
             progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
 
             // #. create the index artifact in NC.
-            JobUtils.runJob(hcc, spec, true);
+            runJob(hcc, spec);
 
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
             bActiveTxn = true;
@@ -948,7 +955,7 @@
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
 
-            JobUtils.runJob(hcc, spec, true);
+            runJob(hcc, spec);
 
             // #. begin new metadataTxn
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
@@ -986,7 +993,7 @@
                             ExternalIndexingOperations.buildDropFilesIndexJobSpec(metadataProvider, ds);
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                     bActiveTxn = false;
-                    JobUtils.runJob(hcc, jobSpec, true);
+                    runJob(hcc, jobSpec);
                 } catch (Exception e2) {
                     e.addSuppressed(e2);
                     if (bActiveTxn) {
@@ -1005,7 +1012,7 @@
                     JobSpecification jobSpec = IndexUtil.buildDropIndexJobSpec(index, metadataProvider, ds);
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                     bActiveTxn = false;
-                    JobUtils.runJob(hcc, jobSpec, true);
+                    runJob(hcc, jobSpec);
                 } catch (Exception e2) {
                     e.addSuppressed(e2);
                     if (bActiveTxn) {
@@ -1189,7 +1196,7 @@
             progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
 
             for (JobSpecification jobSpec : jobsToExecute) {
-                JobUtils.runJob(hcc, jobSpec, true);
+                runJob(hcc, jobSpec);
             }
 
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
@@ -1226,7 +1233,7 @@
                 // remove the all indexes in NC
                 try {
                     for (JobSpecification jobSpec : jobsToExecute) {
-                        JobUtils.runJob(hcc, jobSpec, true);
+                        runJob(hcc, jobSpec);
                     }
                 } catch (Exception e2) {
                     // do no throw exception since still the metadata needs to be compensated.
@@ -1405,7 +1412,7 @@
                 progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
 
                 for (JobSpecification jobSpec : jobsToExecute) {
-                    JobUtils.runJob(hcc, jobSpec, true);
+                    runJob(hcc, jobSpec);
                 }
 
                 // #. begin a new transaction
@@ -1466,7 +1473,7 @@
                 progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
 
                 for (JobSpecification jobSpec : jobsToExecute) {
-                    JobUtils.runJob(hcc, jobSpec, true);
+                    runJob(hcc, jobSpec);
                 }
 
                 // #. begin a new transaction
@@ -1496,7 +1503,7 @@
                 // remove the all indexes in NC
                 try {
                     for (JobSpecification jobSpec : jobsToExecute) {
-                        JobUtils.runJob(hcc, jobSpec, true);
+                        runJob(hcc, jobSpec);
                     }
                 } catch (Exception e2) {
                     // do no throw exception since still the metadata needs to be compensated.
@@ -1659,7 +1666,7 @@
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
             if (spec != null) {
-                JobUtils.runJob(hcc, spec, true);
+                runJob(hcc, spec);
             }
         } catch (Exception e) {
             if (bActiveTxn) {
@@ -1725,7 +1732,7 @@
                 if (jobSpec == null) {
                     return jobSpec;
                 }
-                JobUtils.runJob(hcc, jobSpec, true);
+                runJob(hcc, jobSpec);
             } finally {
                 locker.unlock();
             }
@@ -1753,7 +1760,7 @@
             bActiveTxn = false;
 
             if (jobSpec != null && !compileOnly) {
-                JobUtils.runJob(hcc, jobSpec, true);
+                runJob(hcc, jobSpec);
             }
             return jobSpec;
         } catch (Exception e) {
@@ -1935,7 +1942,7 @@
             } else {
                 JobSpecification spec = FeedOperations.buildRemoveFeedStorageJob(metadataProvider,
                         MetadataManager.INSTANCE.getFeed(mdTxnCtx, feedId.getDataverse(), feedId.getEntityName()));
-                JobUtils.runJob(hcc, spec, true);
+                runJob(hcc, spec);
                 MetadataManager.INSTANCE.dropFeed(mdTxnCtx, dataverseName, feedName);
             }
 
@@ -2022,7 +2029,7 @@
             activeEventHandler.registerListener(listener);
             IActiveEventSubscriber eventSubscriber = listener.subscribe(ActivityState.STARTED);
             feedJob.setProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, entityId);
-            JobUtils.runJob(hcc, feedJob,
+            JobUtils.runJob(hcc, feedJob, jobFlags,
                     Boolean.valueOf(metadataProvider.getConfig().get(StartFeedStatement.WAIT_FOR_COMPLETION)));
             eventSubscriber.sync();
             LOGGER.log(Level.INFO, "Submitted");
@@ -2211,7 +2218,7 @@
 
             // #. run the jobs
             for (JobSpecification jobSpec : jobsToExecute) {
-                JobUtils.runJob(hcc, jobSpec, true);
+                runJob(hcc, jobSpec);
             }
         } catch (Exception e) {
             if (bActiveTxn) {
@@ -2300,14 +2307,14 @@
                 }
                 break;
             case IMMEDIATE:
-                createAndRunJob(hcc, null, compiler, locker, resultDelivery, id -> {
+                createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> {
                     final ResultReader resultReader = new ResultReader(hdc, id, resultSetId);
                     ResultUtil.printResults(appCtx, resultReader, sessionOutput, stats,
                             metadataProvider.findOutputRecordType());
                 }, clientContextId, ctx);
                 break;
             case DEFERRED:
-                createAndRunJob(hcc, null, compiler, locker, resultDelivery, id -> {
+                createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> {
                     ResultUtil.printResultHandle(sessionOutput, new ResultHandle(id, resultSetId));
                     if (outMetadata != null) {
                         outMetadata.getResultSets()
@@ -2325,7 +2332,7 @@
             ResultSetId resultSetId, MutableBoolean printed) {
         Mutable<JobId> jobId = new MutableObject<>(JobId.INVALID);
         try {
-            createAndRunJob(hcc, jobId, compiler, locker, resultDelivery, id -> {
+            createAndRunJob(hcc, jobFlags, jobId, compiler, locker, resultDelivery, id -> {
                 final ResultHandle handle = new ResultHandle(id, resultSetId);
                 ResultUtil.printStatus(sessionOutput, AbstractQueryApiServlet.ResultStatus.RUNNING);
                 ResultUtil.printResultHandle(sessionOutput, handle);
@@ -2353,16 +2360,20 @@
         }
     }
 
-    private static void createAndRunJob(IHyracksClientConnection hcc, Mutable<JobId> jId, IStatementCompiler compiler,
-            IMetadataLocker locker, ResultDelivery resultDelivery, IResultPrinter printer, String clientContextId,
-            IStatementExecutorContext ctx) throws Exception {
+    private void runJob(IHyracksClientConnection hcc, JobSpecification jobSpec) throws Exception {
+        JobUtils.runJob(hcc, jobSpec, jobFlags, true);
+    }
+
+    private static void createAndRunJob(IHyracksClientConnection hcc, EnumSet<JobFlag> jobFlags, Mutable<JobId> jId,
+            IStatementCompiler compiler, IMetadataLocker locker, ResultDelivery resultDelivery, IResultPrinter printer,
+            String clientContextId, IStatementExecutorContext ctx) throws Exception {
         locker.lock();
         try {
             final JobSpecification jobSpec = compiler.compile();
             if (jobSpec == null) {
                 return;
             }
-            final JobId jobId = JobUtils.runJob(hcc, jobSpec, false);
+            final JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
             if (ctx != null && clientContextId != null) {
                 ctx.put(clientContextId, jobId); // Adds the running job into the context.
             }
@@ -2507,14 +2518,14 @@
             transactionState = TransactionState.BEGIN;
 
             // run the files update job
-            JobUtils.runJob(hcc, spec, true);
+            runJob(hcc, spec);
 
             for (Index index : indexes) {
                 if (!ExternalIndexingOperations.isFileIndex(index)) {
                     spec = ExternalIndexingOperations.buildIndexUpdateOp(ds, index, metadataFiles, addedFiles,
                             appendedFiles, metadataProvider);
                     // run the files update job
-                    JobUtils.runJob(hcc, spec, true);
+                    runJob(hcc, spec);
                 }
             }
 
@@ -2533,7 +2544,7 @@
             bActiveTxn = false;
             transactionState = TransactionState.READY_TO_COMMIT;
             // We don't release the latch since this job is expected to be quick
-            JobUtils.runJob(hcc, spec, true);
+            runJob(hcc, spec);
             // Start a new metadata transaction to record the final state of the transaction
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
             metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -2602,7 +2613,7 @@
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                 bActiveTxn = false;
                 try {
-                    JobUtils.runJob(hcc, spec, true);
+                    runJob(hcc, spec);
                 } catch (Exception e2) {
                     // This should never happen -- fix throw illegal
                     e.addSuppressed(e2);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index cbe551b..7d4b41d 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -205,7 +205,7 @@
                 NoOpOperationCallbackFactory.INSTANCE, filterFields, filterFields, false);
         BTreeSearchOperatorNodePushable searchOp =
                 searchOpDesc.createPushRuntime(ctx, primaryIndexInfo.getSearchRecordDescriptorProvider(), PARTITION, 1);
-        emptyTupleOp.setFrameWriter(0, searchOp,
+        emptyTupleOp.setOutputFrameWriter(0, searchOp,
                 primaryIndexInfo.getSearchRecordDescriptorProvider().getInputRecordDescriptor(null, 0));
         searchOp.setOutputFrameWriter(0, countOp, primaryIndexInfo.rDesc);
         return emptyTupleOp;
@@ -312,7 +312,7 @@
     public IHyracksTaskContext createTestContext(boolean withMessaging) throws HyracksDataException {
         IHyracksTaskContext ctx = TestUtils.create(KB32);
         if (withMessaging) {
-            TaskUtil.putInSharedMap(HyracksConstants.KEY_MESSAGE, new VSizeFrame(ctx), ctx);
+            TaskUtil.put(HyracksConstants.KEY_MESSAGE, new VSizeFrame(ctx), ctx);
         }
         ctx = Mockito.spy(ctx);
         Mockito.when(ctx.getJobletContext()).thenReturn(jobletCtx);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
index 5e3da5f..b80f8f8 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
@@ -40,6 +40,9 @@
 import org.apache.asterix.runtime.utils.CcApplicationContext;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.SessionOutput;
+import org.apache.hyracks.api.application.ICCServiceContext;
+import org.apache.hyracks.api.config.IApplicationConfig;
+import org.apache.hyracks.control.common.controllers.CCConfig;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -59,6 +62,11 @@
         ExternalProperties mockAsterixExternalProperties = mock(ExternalProperties.class);
         when(mockAsterixAppContextInfo.getExternalProperties()).thenReturn(mockAsterixExternalProperties);
         when(mockAsterixExternalProperties.getAPIServerPort()).thenReturn(19002);
+        ICCServiceContext mockServiceContext = mock(ICCServiceContext.class);
+        when(mockAsterixAppContextInfo.getServiceContext()).thenReturn(mockServiceContext);
+        IApplicationConfig mockApplicationConfig = mock(IApplicationConfig.class);
+        when(mockServiceContext.getAppConfig()).thenReturn(mockApplicationConfig);
+        when(mockApplicationConfig.getBoolean(CCConfig.Option.ENFORCE_FRAME_WRITER_PROTOCOL)).thenReturn(true);
 
         // Mocks AsterixClusterProperties.
         Cluster mockCluster = mock(Cluster.class);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
index 48ca338..b1c7ff3 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
@@ -72,7 +72,7 @@
             IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
             VSizeFrame message = new VSizeFrame(ctx);
             VSizeFrame tempBuffer = new VSizeFrame(ctx);
-            TaskUtil.putInSharedMap(HyracksConstants.KEY_MESSAGE, message, ctx);
+            TaskUtil.put(HyracksConstants.KEY_MESSAGE, message, ctx);
             message.getBuffer().clear();
             message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
             message.getBuffer().flip();
@@ -152,7 +152,7 @@
             IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
             VSizeFrame message = new VSizeFrame(ctx);
             VSizeFrame tempBuffer = new VSizeFrame(ctx);
-            TaskUtil.putInSharedMap(HyracksConstants.KEY_MESSAGE, message, ctx);
+            TaskUtil.put(HyracksConstants.KEY_MESSAGE, message, ctx);
             writeRandomMessage(message, MessagingFrameTupleAppender.MARKER_MESSAGE, DEFAULT_FRAME_SIZE + 1);
             ISerializerDeserializer<?>[] serdes = new ISerializerDeserializer<?>[] {
                     Integer64SerializerDeserializer.INSTANCE, DoubleSerializerDeserializer.INSTANCE,
@@ -236,7 +236,7 @@
             IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
             VSizeFrame message = new VSizeFrame(ctx);
             VSizeFrame tempBuffer = new VSizeFrame(ctx);
-            TaskUtil.putInSharedMap(HyracksConstants.KEY_MESSAGE, message, ctx);
+            TaskUtil.put(HyracksConstants.KEY_MESSAGE, message, ctx);
             message.getBuffer().clear();
             writeRandomMessage(message, MessagingFrameTupleAppender.MARKER_MESSAGE, DEFAULT_FRAME_SIZE);
             ISerializerDeserializer<?>[] serdes = new ISerializerDeserializer<?>[] {
@@ -294,7 +294,7 @@
             IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
             VSizeFrame message = new VSizeFrame(ctx);
             VSizeFrame tempBuffer = new VSizeFrame(ctx);
-            TaskUtil.putInSharedMap(HyracksConstants.KEY_MESSAGE, message, ctx);
+            TaskUtil.put(HyracksConstants.KEY_MESSAGE, message, ctx);
             message.getBuffer().clear();
             message.getBuffer().put(MessagingFrameTupleAppender.ACK_REQ_FEED_MESSAGE);
             message.getBuffer().flip();
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
index ef72c67..1e3960f 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
@@ -136,7 +136,7 @@
                         marker.getBuffer().putLong(markerId);
                         marker.getBuffer().flip();
                         markerId++;
-                        TaskUtil.putInSharedMap(HyracksConstants.KEY_MESSAGE, marker, ctx);
+                        TaskUtil.put(HyracksConstants.KEY_MESSAGE, marker, ctx);
                         tupleAppender.flush(insertOp);
                     }
                     ITupleReference tuple = tupleGenerator.next();
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
index e8f018e..6f666f4 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
@@ -11,7 +11,7 @@
     "compiler\.joinmemory" : 262144,
     "compiler\.parallelism" : 0,
     "compiler\.sortmemory" : 327680,
-    "default\.dir" : "target/io/dir/asterixdb",
+    "default\.dir" : "target/io/dir",
     "instance\.name" : "DEFAULT_INSTANCE",
     "log\.level" : "INFO",
     "max\.wait\.active\.cluster" : 60,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
index e099835..5f0c47b 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
@@ -11,7 +11,7 @@
     "compiler\.joinmemory" : 262144,
     "compiler\.parallelism" : -1,
     "compiler\.sortmemory" : 327680,
-    "default\.dir" : "target/io/dir/asterixdb",
+    "default\.dir" : "target/io/dir",
     "instance\.name" : "DEFAULT_INSTANCE",
     "log\.level" : "WARNING",
     "max\.wait\.active\.cluster" : 60,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
index b3fe5cc..b03de79 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
@@ -11,7 +11,7 @@
     "compiler\.joinmemory" : 262144,
     "compiler\.parallelism" : 3,
     "compiler\.sortmemory" : 327680,
-    "default\.dir" : "target/io/dir/asterixdb",
+    "default\.dir" : "target/io/dir",
     "instance\.name" : "DEFAULT_INSTANCE",
     "log\.level" : "WARNING",
     "max\.wait\.active\.cluster" : 60,
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
index 3f9fba9..1c610ac 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
@@ -83,7 +83,7 @@
         try {
             if (isPrimary && ctx.getSharedObject() != null) {
                 PrimaryIndexLogMarkerCallback callback = new PrimaryIndexLogMarkerCallback(lsmIndex);
-                TaskUtil.putInSharedMap(ILogMarkerCallback.KEY_MARKER_CALLBACK, callback, ctx);
+                TaskUtil.put(ILogMarkerCallback.KEY_MARKER_CALLBACK, callback, ctx);
             }
             writer.open();
             modCallback =
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index df647d5..5fc1837 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -62,6 +62,7 @@
     public static final int TYPE_CONVERT_INTEGER_SOURCE = 19;
     public static final int TYPE_CONVERT_INTEGER_TARGET = 20;
     public static final int TYPE_CONVERT_OUT_OF_BOUND = 21;
+    public static final int FIELD_SHOULD_HAVE_CONCRETE_TYPE = 22;
     public static final int INSTANTIATION_ERROR = 100;
 
     // Compilation errors
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java
index 41f9e67..cacbfbc 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java
@@ -19,7 +19,10 @@
 
 package org.apache.asterix.common.utils;
 
+import java.util.EnumSet;
+
 import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 
@@ -32,8 +35,13 @@
 
     public static JobId runJob(IHyracksClientConnection hcc, JobSpecification spec, boolean waitForCompletion)
             throws Exception {
+        return runJob(hcc, spec, EnumSet.noneOf(JobFlag.class), waitForCompletion);
+    }
+
+    public static JobId runJob(IHyracksClientConnection hcc, JobSpecification spec, EnumSet<JobFlag> jobFlags,
+            boolean waitForCompletion) throws Exception {
         spec.setMaxReattempts(0);
-        final JobId jobId = hcc.startJob(spec);
+        final JobId jobId = hcc.startJob(spec, jobFlags);
         if (waitForCompletion) {
             hcc.waitForCompletion(jobId);
         }
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 7204e34..e303ba1 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -55,6 +55,7 @@
 19 = Can't convert integer types. The source type should be one of %1$s.
 20 = Can't convert integer types. The target type should be one of %1$s.
 21 = Source value %1$s is out of range that %2$s can hold - %2$s.MAX_VALUE: %3$s, %2$s.MIN_VALUE: %4$s
+22 = Field value has type tag ANY, but it should have a concrete type
 100 = Unable to instantiate class %1$s
 
 # Compile-time check errors
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
index 8a7bda9..2876ea6 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -66,7 +66,7 @@
             FeedAdapter adapter = (FeedAdapter) adapterFactory.createAdapter(ctx, partition);
             adapterRuntimeManager = new AdapterRuntimeManager(ctx, runtimeId.getEntityId(), adapter, writer, partition);
             IFrame message = new VSizeFrame(ctx);
-            TaskUtil.putInSharedMap(HyracksConstants.KEY_MESSAGE, message, ctx);
+            TaskUtil.put(HyracksConstants.KEY_MESSAGE, message, ctx);
             /*
              * Set null feed message. Feed pipeline carries with it a message with each frame
              * Initially, the message is set to a null message that can be changed by feed adapters.
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
index 0558ead..24a7462 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
@@ -106,7 +106,7 @@
         this.feedManager = (ActiveManager) ((INcApplicationContext) ctx.getJobletContext().getServiceContext()
                 .getApplicationContext()).getActiveManager();
         this.message = new VSizeFrame(ctx);
-        TaskUtil.putInSharedMap(HyracksConstants.KEY_MESSAGE, message, ctx);
+        TaskUtil.put(HyracksConstants.KEY_MESSAGE, message, ctx);
         this.opDesc = feedMetaOperatorDescriptor;
         this.recordDescProvider = recordDescProvider;
     }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
index 662b543..97c1115d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
@@ -98,7 +98,7 @@
         this.feedManager = (ActiveManager) ((INcApplicationContext) ctx.getJobletContext().getServiceContext()
                 .getApplicationContext()).getActiveManager();
         this.message = new VSizeFrame(ctx);
-        TaskUtil.putInSharedMap(HyracksConstants.KEY_MESSAGE, message, ctx);
+        TaskUtil.put(HyracksConstants.KEY_MESSAGE, message, ctx);
         this.recordDescProvider = recordDescProvider;
         this.opDesc = feedMetaOperatorDescriptor;
     }
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/common/AOrderedListBinaryTokenizer.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/common/AOrderedListBinaryTokenizer.java
index 90af41b..c644bbe 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/common/AOrderedListBinaryTokenizer.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/common/AOrderedListBinaryTokenizer.java
@@ -18,11 +18,11 @@
  */
 package org.apache.asterix.dataflow.data.common;
 
-import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.EnumDeserializer;
 import org.apache.asterix.om.utils.NonTaggedFormatUtil;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizer;
 import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IToken;
 import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.ITokenFactory;
@@ -64,7 +64,7 @@
             length = NonTaggedFormatUtil.getFieldValueLength(data, itemOffset, typeTag, false);
             // Last param is a hack to pass the type tag.
             token.reset(data, itemOffset, itemOffset + length, length, data[start + 1]);
-        } catch (AsterixException e) {
+        } catch (HyracksDataException e) {
             throw new IllegalStateException(e);
         }
         itemIndex++;
@@ -79,7 +79,7 @@
         this.itemIndex = 0;
     }
 
-    protected int getItemOffset(byte[] data, int start, int itemIndex) throws AsterixException {
+    protected int getItemOffset(byte[] data, int start, int itemIndex) throws HyracksDataException {
         return AOrderedListSerializerDeserializer.getItemOffset(data, start, itemIndex);
     }
 
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/common/AUnorderedListBinaryTokenizer.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/common/AUnorderedListBinaryTokenizer.java
index b90e87f..6926713 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/common/AUnorderedListBinaryTokenizer.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/common/AUnorderedListBinaryTokenizer.java
@@ -18,8 +18,8 @@
  */
 package org.apache.asterix.dataflow.data.common;
 
-import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.dataflow.data.nontagged.serde.AUnorderedListSerializerDeserializer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.ITokenFactory;
 
 public class AUnorderedListBinaryTokenizer extends AOrderedListBinaryTokenizer {
@@ -29,7 +29,7 @@
     }
 
     @Override
-    protected int getItemOffset(byte[] data, int start, int itemIndex) throws AsterixException {
+    protected int getItemOffset(byte[] data, int start, int itemIndex) throws HyracksDataException {
         return AUnorderedListSerializerDeserializer.getItemOffset(data, start, itemIndex);
     }
 
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AOrderedListSerializerDeserializer.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AOrderedListSerializerDeserializer.java
index da95091..0a66d56 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AOrderedListSerializerDeserializer.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AOrderedListSerializerDeserializer.java
@@ -24,7 +24,6 @@
 import java.util.List;
 
 import org.apache.asterix.builders.OrderedListBuilder;
-import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
 import org.apache.asterix.om.base.AOrderedList;
 import org.apache.asterix.om.base.IAObject;
@@ -42,7 +41,8 @@
 public class AOrderedListSerializerDeserializer implements ISerializerDeserializer<AOrderedList> {
 
     private static final long serialVersionUID = 1L;
-    public static final AOrderedListSerializerDeserializer SCHEMALESS_INSTANCE = new AOrderedListSerializerDeserializer();
+    public static final AOrderedListSerializerDeserializer SCHEMALESS_INSTANCE =
+            new AOrderedListSerializerDeserializer();
 
     private final IAType itemType;
     @SuppressWarnings("rawtypes")
@@ -74,18 +74,19 @@
             ISerializerDeserializer currentDeserializer = deserializer;
             if (itemType.getTypeTag() == ATypeTag.ANY && typeTag != ATypeTag.ANY) {
                 currentItemType = TypeTagUtil.getBuiltinTypeByTag(typeTag);
-                currentDeserializer = SerializerDeserializerProvider.INSTANCE
-                        .getNonTaggedSerializerDeserializer(currentItemType);
+                currentDeserializer =
+                        SerializerDeserializerProvider.INSTANCE.getNonTaggedSerializerDeserializer(currentItemType);
             }
 
-            List<IAObject> items = new ArrayList<IAObject>();
+            List<IAObject> items = new ArrayList<>();
             in.readInt(); // list size
             int numberOfitems;
             numberOfitems = in.readInt();
             if (numberOfitems > 0) {
                 if (!NonTaggedFormatUtil.isFixedSizedCollection(currentItemType)) {
-                    for (int i = 0; i < numberOfitems; i++)
+                    for (int i = 0; i < numberOfitems; i++) {
                         in.readInt();
+                    }
                 }
                 for (int i = 0; i < numberOfitems; i++) {
                     IAObject v = (IAObject) currentDeserializer.deserialize(in);
@@ -119,14 +120,15 @@
     }
 
     public static int getNumberOfItems(byte[] serOrderedList, int offset) {
-        if (serOrderedList[offset] == ATypeTag.ARRAY.serialize())
+        if (serOrderedList[offset] == ATypeTag.ARRAY.serialize()) {
             // 6 = tag (1) + itemTag (1) + list size (4)
             return AInt32SerializerDeserializer.getInt(serOrderedList, offset + 6);
-        else
+        } else {
             return -1;
+        }
     }
 
-    public static int getItemOffset(byte[] serOrderedList, int offset, int itemIndex) throws AsterixException {
+    public static int getItemOffset(byte[] serOrderedList, int offset, int itemIndex) throws HyracksDataException {
         if (serOrderedList[offset] == ATypeTag.ARRAY.serialize()) {
             ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serOrderedList[offset + 1]);
             if (NonTaggedFormatUtil.isFixedSizedCollection(typeTag)) {
@@ -136,8 +138,9 @@
                 return offset + AInt32SerializerDeserializer.getInt(serOrderedList, offset + 10 + (4 * itemIndex));
             }
             // 10 = tag (1) + itemTag (1) + list size (4) + number of items (4)
-        } else
+        } else {
             return -1;
+        }
     }
 
 }
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AUnorderedListSerializerDeserializer.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AUnorderedListSerializerDeserializer.java
index dd95586..a6090af 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AUnorderedListSerializerDeserializer.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AUnorderedListSerializerDeserializer.java
@@ -23,7 +23,6 @@
 import java.util.ArrayList;
 
 import org.apache.asterix.builders.UnorderedListBuilder;
-import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
 import org.apache.asterix.om.base.AUnorderedList;
 import org.apache.asterix.om.base.IACursor;
@@ -43,7 +42,8 @@
 
     private static final long serialVersionUID = 1L;
 
-    public static final AUnorderedListSerializerDeserializer SCHEMALESS_INSTANCE = new AUnorderedListSerializerDeserializer();
+    public static final AUnorderedListSerializerDeserializer SCHEMALESS_INSTANCE =
+            new AUnorderedListSerializerDeserializer();
 
     private final IAType itemType;
     private final AUnorderedListType unorderedlistType;
@@ -76,18 +76,19 @@
             ISerializerDeserializer currentDeserializer = deserializer;
             if (itemType.getTypeTag() == ATypeTag.ANY && typeTag != ATypeTag.ANY) {
                 currentItemType = TypeTagUtil.getBuiltinTypeByTag(typeTag);
-                currentDeserializer = SerializerDeserializerProvider.INSTANCE
-                        .getNonTaggedSerializerDeserializer(currentItemType);
+                currentDeserializer =
+                        SerializerDeserializerProvider.INSTANCE.getNonTaggedSerializerDeserializer(currentItemType);
             }
 
             in.readInt(); // list size
             int numberOfitems;
             numberOfitems = in.readInt();
-            ArrayList<IAObject> items = new ArrayList<IAObject>();
+            ArrayList<IAObject> items = new ArrayList<>();
             if (numberOfitems > 0) {
                 if (!NonTaggedFormatUtil.isFixedSizedCollection(currentItemType)) {
-                    for (int i = 0; i < numberOfitems; i++)
+                    for (int i = 0; i < numberOfitems; i++) {
                         in.readInt();
+                    }
                 }
                 for (int i = 0; i < numberOfitems; i++) {
                     items.add((IAObject) currentDeserializer.deserialize(in));
@@ -121,14 +122,15 @@
     }
 
     public static int getNumberOfItems(byte[] serOrderedList, int offset) {
-        if (serOrderedList[offset] == ATypeTag.MULTISET.serialize())
+        if (serOrderedList[offset] == ATypeTag.MULTISET.serialize()) {
             // 6 = tag (1) + itemTag (1) + list size (4)
             return AInt32SerializerDeserializer.getInt(serOrderedList, offset + 6);
-        else
+        } else {
             return -1;
+        }
     }
 
-    public static int getItemOffset(byte[] serOrderedList, int offset, int itemIndex) throws AsterixException {
+    public static int getItemOffset(byte[] serOrderedList, int offset, int itemIndex) throws HyracksDataException {
         if (serOrderedList[offset] == ATypeTag.MULTISET.serialize()) {
             ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serOrderedList[offset + 1]);
             if (NonTaggedFormatUtil.isFixedSizedCollection(typeTag)) {
@@ -138,8 +140,9 @@
                 return offset + AInt32SerializerDeserializer.getInt(serOrderedList, offset + 10 + (4 * itemIndex));
             }
             // 10 = tag (1) + itemTag (1) + list size (4) + number of items (4)
-        } else
+        } else {
             return -1;
+        }
     }
 
 }
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/AListPointable.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/AListPointable.java
index f2e7299..7a2efe0 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/AListPointable.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/AListPointable.java
@@ -29,6 +29,7 @@
 import org.apache.asterix.om.util.container.IObjectFactory;
 import org.apache.asterix.om.utils.NonTaggedFormatUtil;
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.api.AbstractPointable;
 import org.apache.hyracks.data.std.api.IPointable;
 import org.apache.hyracks.data.std.api.IPointableFactory;
@@ -123,7 +124,7 @@
         return NonTaggedFormatUtil.isFixedSizedCollection(inputType.getItemType());
     }
 
-    public int getFixedLength(AbstractCollectionType inputType) throws AsterixException {
+    public int getFixedLength(AbstractCollectionType inputType) throws HyracksDataException {
         return NonTaggedFormatUtil.getFieldValueLength(bytes, 0, inputType.getItemType().getTypeTag(), false);
     }
 
@@ -163,7 +164,7 @@
     // Item accessors
     // ----------------------
 
-    public int getItemOffset(AbstractCollectionType inputType, int index) throws AsterixException {
+    public int getItemOffset(AbstractCollectionType inputType, int index) throws HyracksDataException {
         if (isFixedType(inputType)) {
             return getItemCountOffset() + getItemCountSize() + index * getFixedLength(inputType);
         } else {
@@ -172,14 +173,14 @@
         }
     }
 
-    public byte getItemTag(AbstractCollectionType inputType, int index) throws AsterixException {
+    public byte getItemTag(AbstractCollectionType inputType, int index) throws HyracksDataException {
         if (getType() != ATypeTag.ANY.serialize()) {
             return getType();
         }
         return bytes[getItemOffset(inputType, index)];
     }
 
-    public int getItemSize(AbstractCollectionType inputType, int index) throws AsterixException {
+    public int getItemSize(AbstractCollectionType inputType, int index) throws HyracksDataException {
         if (isFixedType(inputType)) {
             return getFixedLength(inputType);
         } else {
@@ -188,8 +189,8 @@
         }
     }
 
-    public void getItemValue(AbstractCollectionType inputType, int index, DataOutput dOut) throws IOException,
-            AsterixException {
+    public void getItemValue(AbstractCollectionType inputType, int index, DataOutput dOut)
+            throws IOException, AsterixException {
         if (getType() != ATypeTag.ANY.serialize()) {
             dOut.writeByte(getType());
         }
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java
index 190a3b2..0aa256f 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java
@@ -32,6 +32,7 @@
 import org.apache.asterix.om.utils.NonTaggedFormatUtil;
 import org.apache.asterix.om.utils.RecordUtil;
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.api.AbstractPointable;
 import org.apache.hyracks.data.std.api.IPointable;
 import org.apache.hyracks.data.std.api.IPointableFactory;
@@ -252,7 +253,7 @@
         return aType;
     }
 
-    public int getClosedFieldSize(ARecordType recordType, int fieldId) throws AsterixException {
+    public int getClosedFieldSize(ARecordType recordType, int fieldId) throws HyracksDataException {
         if (isClosedFieldNull(recordType, fieldId)) {
             return 0;
         }
@@ -294,7 +295,7 @@
         return getOpenFieldNameOffset(recordType, fieldId) + getOpenFieldNameSize(recordType, fieldId);
     }
 
-    public int getOpenFieldValueSize(ARecordType recordType, int fieldId) throws AsterixException {
+    public int getOpenFieldValueSize(ARecordType recordType, int fieldId) throws HyracksDataException {
         int offset = getOpenFieldValueOffset(recordType, fieldId);
         ATypeTag tag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(getOpenFieldTag(recordType, fieldId));
         return NonTaggedFormatUtil.getFieldValueLength(bytes, offset, tag, true);
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/utils/NonTaggedFormatUtil.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/utils/NonTaggedFormatUtil.java
index 37e8c05..01cea05 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/utils/NonTaggedFormatUtil.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/utils/NonTaggedFormatUtil.java
@@ -19,7 +19,7 @@
 package org.apache.asterix.om.utils;
 
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
-import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
 import org.apache.asterix.dataflow.data.nontagged.serde.AIntervalSerializerDeserializer;
 import org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
@@ -39,6 +39,7 @@
 import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.primitive.ByteArrayPointable;
 import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
 import org.apache.hyracks.util.string.UTF8StringUtil;
@@ -106,12 +107,12 @@
     }
 
     public static int getFieldValueLength(byte[] serNonTaggedAObject, int offset, ATypeTag typeTag, boolean tagged)
-            throws AsterixException {
+            throws HyracksDataException {
         switch (typeTag) {
             case ANY:
                 ATypeTag tag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serNonTaggedAObject[offset]);
                 if (tag == ATypeTag.ANY) {
-                    throw new AsterixException("Field value has type tag ANY, but it should have a concrete type.");
+                    throw HyracksDataException.create(ErrorCode.FIELD_SHOULD_HAVE_CONCRETE_TYPE);
                 }
                 return getFieldValueLength(serNonTaggedAObject, offset, tag, true) + 1;
             case MISSING:
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/AbstractAsterixListIterator.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/AbstractAsterixListIterator.java
index 379c362..c07aae2 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/AbstractAsterixListIterator.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/AbstractAsterixListIterator.java
@@ -18,7 +18,6 @@
  */
 package org.apache.asterix.runtime.evaluators.common;
 
-import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.EnumDeserializer;
@@ -72,32 +71,24 @@
 
     @Override
     public void next() throws HyracksDataException {
-        try {
-            pos = nextPos;
-            ++count;
-            nextPos = startOff + listLength;
-            if (count + 1 < numberOfItems) {
-                nextPos = getItemOffset(data, startOff, count + 1);
-            }
-            itemLen = nextPos - pos;
-        } catch (AsterixException e) {
-            throw new HyracksDataException(e);
+        pos = nextPos;
+        ++count;
+        nextPos = startOff + listLength;
+        if (count + 1 < numberOfItems) {
+            nextPos = getItemOffset(data, startOff, count + 1);
         }
+        itemLen = nextPos - pos;
     }
 
     @Override
     public void reset() throws HyracksDataException {
         count = 0;
-        try {
-            pos = getItemOffset(data, startOff, count);
-            nextPos = startOff + listLength;
-            if (count + 1 < numberOfItems) {
-                nextPos = getItemOffset(data, startOff, count + 1);
-            }
-            itemLen = nextPos - pos;
-        } catch (AsterixException e) {
-            throw new HyracksDataException(e);
+        pos = getItemOffset(data, startOff, count);
+        nextPos = startOff + listLength;
+        if (count + 1 < numberOfItems) {
+            nextPos = getItemOffset(data, startOff, count + 1);
         }
+        itemLen = nextPos - pos;
     }
 
     @Override
@@ -153,7 +144,7 @@
         reset();
     }
 
-    protected abstract int getItemOffset(byte[] serOrderedList, int offset, int itemIndex) throws AsterixException;
+    protected abstract int getItemOffset(byte[] serOrderedList, int offset, int itemIndex) throws HyracksDataException;
 
     protected abstract int getNumberOfItems(byte[] serOrderedList, int offset);
 
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/ListAccessor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/ListAccessor.java
index c03cfa6..3c97fc9 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/ListAccessor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/ListAccessor.java
@@ -22,7 +22,6 @@
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
 import org.apache.asterix.dataflow.data.nontagged.serde.AUnorderedListSerializerDeserializer;
 import org.apache.asterix.om.types.ATypeTag;
@@ -76,7 +75,7 @@
         return size;
     }
 
-    public int getItemOffset(int itemIndex) throws AsterixException {
+    public int getItemOffset(int itemIndex) throws HyracksDataException {
         if (listType == ATypeTag.MULTISET) {
             return AUnorderedListSerializerDeserializer.getItemOffset(listBytes, start, itemIndex);
         } else {
@@ -84,12 +83,12 @@
         }
     }
 
-    public int getItemLength(int itemOffset) throws AsterixException {
+    public int getItemLength(int itemOffset) throws HyracksDataException {
         ATypeTag itemType = getItemType(itemOffset);
         return NonTaggedFormatUtil.getFieldValueLength(listBytes, itemOffset, itemType, itemsAreSelfDescribing());
     }
 
-    public ATypeTag getItemType(int itemOffset) throws AsterixException {
+    public ATypeTag getItemType(int itemOffset) {
         if (itemType == ATypeTag.ANY) {
             return EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(listBytes[itemOffset]);
         } else {
@@ -97,7 +96,7 @@
         }
     }
 
-    public void writeItem(int itemIndex, DataOutput dos) throws AsterixException, IOException {
+    public void writeItem(int itemIndex, DataOutput dos) throws IOException {
         int itemOffset = getItemOffset(itemIndex);
         int itemLength = getItemLength(itemOffset);
         if (itemsAreSelfDescribing()) {
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/OrderedListIterator.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/OrderedListIterator.java
index f0c2f5a..c5add44 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/OrderedListIterator.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/OrderedListIterator.java
@@ -18,13 +18,13 @@
  */
 package org.apache.asterix.runtime.evaluators.common;
 
-import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public final class OrderedListIterator extends AbstractAsterixListIterator {
 
     @Override
-    protected int getItemOffset(byte[] serOrderedList, int offset, int itemIndex) throws AsterixException {
+    protected int getItemOffset(byte[] serOrderedList, int offset, int itemIndex) throws HyracksDataException {
         return AOrderedListSerializerDeserializer.getItemOffset(serOrderedList, offset, itemIndex);
     }
 
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/SimilarityJaccardPrefixEvaluator.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/SimilarityJaccardPrefixEvaluator.java
index f519065..b70c6ad0 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/SimilarityJaccardPrefixEvaluator.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/SimilarityJaccardPrefixEvaluator.java
@@ -21,7 +21,6 @@
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
 import org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
 import org.apache.asterix.dataflow.data.nontagged.serde.AUnorderedListSerializerDeserializer;
@@ -72,8 +71,8 @@
     // result
     protected final AMutableFloat res = new AMutableFloat(0);
     @SuppressWarnings("unchecked")
-    protected final ISerializerDeserializer<AFloat> reusSerde = SerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.AFLOAT);
+    protected final ISerializerDeserializer<AFloat> reusSerde =
+            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AFLOAT);
 
     public SimilarityJaccardPrefixEvaluator(IScalarEvaluatorFactory[] args, IHyracksTaskContext context)
             throws HyracksDataException {
@@ -91,8 +90,8 @@
         // similarity threshold
         sim = 0;
         evalThreshold.evaluate(tuple, inputVal);
-        float similarityThreshold = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(),
-                inputVal.getStartOffset() + 1);
+        float similarityThreshold =
+                AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), inputVal.getStartOffset() + 1);
 
         if (similarityThreshold != similarityThresholdCache || similarityFilters == null) {
             similarityFilters = new SimilarityFiltersJaccard(similarityThreshold);
@@ -126,39 +125,23 @@
 
             int lengthTokens1;
             if (serList[startOffset] == ATypeTag.SERIALIZED_ORDEREDLIST_TYPE_TAG) {
-                lengthTokens1 = AOrderedListSerializerDeserializer.getNumberOfItems(inputVal.getByteArray(),
-                        startOffset);
+                lengthTokens1 =
+                        AOrderedListSerializerDeserializer.getNumberOfItems(inputVal.getByteArray(), startOffset);
                 // read tokens
                 for (i = 0; i < lengthTokens1; i++) {
-                    int itemOffset;
-                    int token;
-                    try {
-                        itemOffset = AOrderedListSerializerDeserializer.getItemOffset(serList, startOffset, i);
-                    } catch (AsterixException e) {
-                        throw new HyracksDataException(e);
-                    }
-                    token = ATypeHierarchy.getIntegerValueWithDifferentTypeTagPosition(
-                            BuiltinFunctions.SIMILARITY_JACCARD.getName(), 1, serList, itemOffset,
-                                startOffset + 1);
+                    int itemOffset = AOrderedListSerializerDeserializer.getItemOffset(serList, startOffset, i);
+                    int token = ATypeHierarchy.getIntegerValueWithDifferentTypeTagPosition(
+                            BuiltinFunctions.SIMILARITY_JACCARD.getName(), 1, serList, itemOffset, startOffset + 1);
                     tokens1.add(token);
                 }
             } else {
-                lengthTokens1 = AUnorderedListSerializerDeserializer.getNumberOfItems(inputVal.getByteArray(),
-                        startOffset);
+                lengthTokens1 =
+                        AUnorderedListSerializerDeserializer.getNumberOfItems(inputVal.getByteArray(), startOffset);
                 // read tokens
                 for (i = 0; i < lengthTokens1; i++) {
-                    int itemOffset;
-                    int token;
-
-                    try {
-                        itemOffset = AUnorderedListSerializerDeserializer.getItemOffset(serList, startOffset, i);
-                    } catch (AsterixException e) {
-                        throw new HyracksDataException(e);
-                    }
-
-                    token = ATypeHierarchy.getIntegerValueWithDifferentTypeTagPosition(
-                            BuiltinFunctions.SIMILARITY_JACCARD.getName(), 1, serList, itemOffset,
-                                startOffset + 1);
+                    int itemOffset = AUnorderedListSerializerDeserializer.getItemOffset(serList, startOffset, i);
+                    int token = ATypeHierarchy.getIntegerValueWithDifferentTypeTagPosition(
+                            BuiltinFunctions.SIMILARITY_JACCARD.getName(), 1, serList, itemOffset, startOffset + 1);
                     tokens1.add(token);
                 }
             }
@@ -181,39 +164,23 @@
 
             int lengthTokens2;
             if (serList[startOffset] == ATypeTag.SERIALIZED_ORDEREDLIST_TYPE_TAG) {
-                lengthTokens2 = AOrderedListSerializerDeserializer.getNumberOfItems(inputVal.getByteArray(),
-                        startOffset);
+                lengthTokens2 =
+                        AOrderedListSerializerDeserializer.getNumberOfItems(inputVal.getByteArray(), startOffset);
                 // read tokens
                 for (i = 0; i < lengthTokens2; i++) {
-                    int itemOffset;
-                    int token;
-
-                    try {
-                        itemOffset = AOrderedListSerializerDeserializer.getItemOffset(serList, startOffset, i);
-                    } catch (AsterixException e) {
-                        throw new HyracksDataException(e);
-                    }
-                    token = ATypeHierarchy.getIntegerValueWithDifferentTypeTagPosition(
-                            BuiltinFunctions.SIMILARITY_JACCARD.getName(), 3, serList, itemOffset,
-                                startOffset + 1);
+                    int itemOffset = AOrderedListSerializerDeserializer.getItemOffset(serList, startOffset, i);
+                    int token = ATypeHierarchy.getIntegerValueWithDifferentTypeTagPosition(
+                            BuiltinFunctions.SIMILARITY_JACCARD.getName(), 3, serList, itemOffset, startOffset + 1);
                     tokens2.add(token);
                 }
             } else {
-                lengthTokens2 = AUnorderedListSerializerDeserializer.getNumberOfItems(inputVal.getByteArray(),
-                        startOffset);
+                lengthTokens2 =
+                        AUnorderedListSerializerDeserializer.getNumberOfItems(inputVal.getByteArray(), startOffset);
                 // read tokens
                 for (i = 0; i < lengthTokens2; i++) {
-                    int itemOffset;
-                    int token;
-
-                    try {
-                        itemOffset = AUnorderedListSerializerDeserializer.getItemOffset(serList, startOffset, i);
-                    } catch (AsterixException e) {
-                        throw new HyracksDataException(e);
-                    }
-                    token = ATypeHierarchy.getIntegerValueWithDifferentTypeTagPosition(
-                            BuiltinFunctions.SIMILARITY_JACCARD.getName(), 3, serList, itemOffset,
-                                startOffset + 1);
+                    int itemOffset = AUnorderedListSerializerDeserializer.getItemOffset(serList, startOffset, i);
+                    int token = ATypeHierarchy.getIntegerValueWithDifferentTypeTagPosition(
+                            BuiltinFunctions.SIMILARITY_JACCARD.getName(), 3, serList, itemOffset, startOffset + 1);
                     tokens2.add(token);
                 }
             }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/UnorderedListIterator.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/UnorderedListIterator.java
index ee58011..b755da2 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/UnorderedListIterator.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/UnorderedListIterator.java
@@ -18,13 +18,13 @@
  */
 package org.apache.asterix.runtime.evaluators.common;
 
-import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.dataflow.data.nontagged.serde.AUnorderedListSerializerDeserializer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public final class UnorderedListIterator extends AbstractAsterixListIterator {
 
     @Override
-    protected int getItemOffset(byte[] serOrderedList, int offset, int itemIndex) throws AsterixException {
+    protected int getItemOffset(byte[] serOrderedList, int offset, int itemIndex) throws HyracksDataException {
         return AUnorderedListSerializerDeserializer.getItemOffset(serOrderedList, offset, itemIndex);
     }
 
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AnyCollectionMemberDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AnyCollectionMemberDescriptor.java
index 714fabe..b78bc5c 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AnyCollectionMemberDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AnyCollectionMemberDescriptor.java
@@ -21,7 +21,6 @@
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
 import org.apache.asterix.dataflow.data.nontagged.serde.AUnorderedListSerializerDeserializer;
 import org.apache.asterix.om.functions.BuiltinFunctions;
@@ -96,9 +95,8 @@
 
                     if (serList[offset] != ATypeTag.SERIALIZED_ORDEREDLIST_TYPE_TAG
                             && serList[offset] != ATypeTag.SERIALIZED_UNORDEREDLIST_TYPE_TAG) {
-                        throw new TypeMismatchException(BuiltinFunctions.ANY_COLLECTION_MEMBER, 0,
-                                serList[offset], ATypeTag.SERIALIZED_ORDEREDLIST_TYPE_TAG,
-                                ATypeTag.SERIALIZED_UNORDEREDLIST_TYPE_TAG);
+                        throw new TypeMismatchException(BuiltinFunctions.ANY_COLLECTION_MEMBER, 0, serList[offset],
+                                ATypeTag.SERIALIZED_ORDEREDLIST_TYPE_TAG, ATypeTag.SERIALIZED_UNORDEREDLIST_TYPE_TAG);
                     }
 
                     try {
@@ -127,8 +125,8 @@
 
                         if (selfDescList) {
                             itemTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serList[itemOffset]);
-                            itemLength = NonTaggedFormatUtil.getFieldValueLength(serList, itemOffset, itemTag, true)
-                                    + 1;
+                            itemLength =
+                                    NonTaggedFormatUtil.getFieldValueLength(serList, itemOffset, itemTag, true) + 1;
                             result.set(serList, itemOffset, itemLength);
                         } else {
                             itemLength = NonTaggedFormatUtil.getFieldValueLength(serList, itemOffset, itemTag, false);
@@ -137,9 +135,7 @@
                             result.set(resultStorage);
                         }
                     } catch (IOException e) {
-                        throw new HyracksDataException(e);
-                    } catch (AsterixException e) {
-                        throw new HyracksDataException(e);
+                        throw HyracksDataException.create(e);
                     }
                 }
             };
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CodePointToStringDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CodePointToStringDescriptor.java
index 09e15b5..a59d03d 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CodePointToStringDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CodePointToStringDescriptor.java
@@ -21,7 +21,6 @@
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
 import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.asterix.om.functions.IFunctionDescriptor;
@@ -99,39 +98,32 @@
                                         throw new UnsupportedTypeException(getIdentifier(), serOrderedList[offset]);
                                 }
                             }
-
-                            try {
-                                // calculate length first
-                                int utf_8_len = 0;
-                                for (int i = 0; i < size; i++) {
-                                    int itemOffset = AOrderedListSerializerDeserializer.getItemOffset(serOrderedList,
-                                            offset, i);
-                                    int codePoint = 0;
-                                    codePoint = ATypeHierarchy.getIntegerValueWithDifferentTypeTagPosition(
-                                            getIdentifier().getName(), 0,
-                                            serOrderedList, itemOffset, offset + 1);
-                                    utf_8_len += UTF8StringUtil.codePointToUTF8(codePoint, currentUTF8);
-                                }
-                                out.writeByte(stringTypeTag);
-                                UTF8StringUtil.writeUTF8Length(utf_8_len, tempStoreForLength, out);
-                                for (int i = 0; i < size; i++) {
-                                    int itemOffset = AOrderedListSerializerDeserializer.getItemOffset(serOrderedList,
-                                            offset, i);
-                                    int codePoint = 0;
-                                    codePoint = ATypeHierarchy.getIntegerValueWithDifferentTypeTagPosition(
-                                            getIdentifier().getName(), 0,
-                                            serOrderedList, itemOffset, offset + 1);
-                                    utf_8_len = UTF8StringUtil.codePointToUTF8(codePoint, currentUTF8);
-                                    for (int j = 0; j < utf_8_len; j++) {
-                                        out.writeByte(currentUTF8[j]);
-                                    }
-                                }
-                                result.set(resultStorage);
-                            } catch (AsterixException ex) {
-                                throw new HyracksDataException(ex);
+                            // calculate length first
+                            int utf_8_len = 0;
+                            for (int i = 0; i < size; i++) {
+                                int itemOffset =
+                                        AOrderedListSerializerDeserializer.getItemOffset(serOrderedList, offset, i);
+                                int codePoint = 0;
+                                codePoint = ATypeHierarchy.getIntegerValueWithDifferentTypeTagPosition(
+                                        getIdentifier().getName(), 0, serOrderedList, itemOffset, offset + 1);
+                                utf_8_len += UTF8StringUtil.codePointToUTF8(codePoint, currentUTF8);
                             }
-                        } catch (IOException e1) {
-                            throw new HyracksDataException(e1);
+                            out.writeByte(stringTypeTag);
+                            UTF8StringUtil.writeUTF8Length(utf_8_len, tempStoreForLength, out);
+                            for (int i = 0; i < size; i++) {
+                                int itemOffset =
+                                        AOrderedListSerializerDeserializer.getItemOffset(serOrderedList, offset, i);
+                                int codePoint = 0;
+                                codePoint = ATypeHierarchy.getIntegerValueWithDifferentTypeTagPosition(
+                                        getIdentifier().getName(), 0, serOrderedList, itemOffset, offset + 1);
+                                utf_8_len = UTF8StringUtil.codePointToUTF8(codePoint, currentUTF8);
+                                for (int j = 0; j < utf_8_len; j++) {
+                                    out.writeByte(currentUTF8[j]);
+                                }
+                            }
+                            result.set(resultStorage);
+                        } catch (IOException e) {
+                            throw HyracksDataException.create(e);
                         }
                     }
                 };
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CreatePolygonDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CreatePolygonDescriptor.java
index 76d9cf3..74f8757 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CreatePolygonDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CreatePolygonDescriptor.java
@@ -21,7 +21,6 @@
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
 import org.apache.asterix.dataflow.data.nontagged.serde.APointSerializerDeserializer;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
@@ -75,10 +74,10 @@
                     private final IPointable inputArgList = new VoidPointable();
                     private final IScalarEvaluator evalList = listEvalFactory.createScalarEvaluator(ctx);
                     @SuppressWarnings("unchecked")
-                    private final ISerializerDeserializer<ANull> nullSerde = SerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ANULL);
-                    private final ISerializerDeserializer<AMissing> missingSerde = SerializerDeserializerProvider.
-                            INSTANCE.getSerializerDeserializer(BuiltinType.AMISSING);
+                    private final ISerializerDeserializer<ANull> nullSerde =
+                            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+                    private final ISerializerDeserializer<AMissing> missingSerde =
+                            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AMISSING);
 
                     @Override
                     public void evaluate(IFrameTupleReference tuple, IPointable result) throws HyracksDataException {
@@ -95,51 +94,46 @@
                                         ATypeTag.SERIALIZED_UNORDEREDLIST_TYPE_TAG);
                             }
                             listAccessor.reset(listBytes, offset);
-                            try {
-                                // First check the list consists of a valid items
-                                for (int i = 0; i < listAccessor.size(); i++) {
-                                    int itemOffset = listAccessor.getItemOffset(i);
-                                    ATypeTag itemType = listAccessor.getItemType(itemOffset);
-                                    if (itemType != ATypeTag.DOUBLE) {
-                                        if (itemType == ATypeTag.NULL) {
-                                            nullSerde.serialize(ANull.NULL, out);
-                                            return;
-                                        }
-                                        if (itemType == ATypeTag.MISSING) {
-                                            missingSerde.serialize(AMissing.MISSING, out);
-                                            return;
-                                        }
-                                        throw new UnsupportedItemTypeException(BuiltinFunctions.CREATE_POLYGON,
-                                                itemType.serialize());
+                            // First check the list consists of a valid items
+                            for (int i = 0; i < listAccessor.size(); i++) {
+                                int itemOffset = listAccessor.getItemOffset(i);
+                                ATypeTag itemType = listAccessor.getItemType(itemOffset);
+                                if (itemType != ATypeTag.DOUBLE) {
+                                    if (itemType == ATypeTag.NULL) {
+                                        nullSerde.serialize(ANull.NULL, out);
+                                        return;
                                     }
-
+                                    if (itemType == ATypeTag.MISSING) {
+                                        missingSerde.serialize(AMissing.MISSING, out);
+                                        return;
+                                    }
+                                    throw new UnsupportedItemTypeException(BuiltinFunctions.CREATE_POLYGON,
+                                            itemType.serialize());
                                 }
-                                if (listAccessor.size() < 6) {
-                                    throw new InvalidDataFormatException(getIdentifier(),
-                                            ATypeTag.SERIALIZED_POLYGON_TYPE_TAG);
-                                } else if (listAccessor.size() % 2 != 0) {
-                                    throw new InvalidDataFormatException(getIdentifier(),
-                                            ATypeTag.SERIALIZED_POLYGON_TYPE_TAG);
-                                }
-                                out.writeByte(ATypeTag.SERIALIZED_POLYGON_TYPE_TAG);
-                                out.writeShort(listAccessor.size() / 2);
 
-                                final int skipTypeTag = listAccessor.itemsAreSelfDescribing() ? 1 : 0;
-                                for (int i = 0; i < listAccessor.size() / 2; i++) {
-                                    int firstDoubleOffset = listAccessor.getItemOffset(i * 2) + skipTypeTag;
-                                    int secondDobuleOffset = listAccessor.getItemOffset((i * 2) + 1) + skipTypeTag;
-
-                                    APointSerializerDeserializer.serialize(
-                                            ADoubleSerializerDeserializer.getDouble(listBytes, firstDoubleOffset),
-                                            ADoubleSerializerDeserializer.getDouble(listBytes, secondDobuleOffset),
-                                            out);
-                                }
-                                result.set(resultStorage);
-                            } catch (AsterixException ex) {
-                                throw new HyracksDataException(ex);
                             }
-                        } catch (IOException e1) {
-                            throw new HyracksDataException(e1);
+                            if (listAccessor.size() < 6) {
+                                throw new InvalidDataFormatException(getIdentifier(),
+                                        ATypeTag.SERIALIZED_POLYGON_TYPE_TAG);
+                            } else if (listAccessor.size() % 2 != 0) {
+                                throw new InvalidDataFormatException(getIdentifier(),
+                                        ATypeTag.SERIALIZED_POLYGON_TYPE_TAG);
+                            }
+                            out.writeByte(ATypeTag.SERIALIZED_POLYGON_TYPE_TAG);
+                            out.writeShort(listAccessor.size() / 2);
+
+                            final int skipTypeTag = listAccessor.itemsAreSelfDescribing() ? 1 : 0;
+                            for (int i = 0; i < listAccessor.size() / 2; i++) {
+                                int firstDoubleOffset = listAccessor.getItemOffset(i * 2) + skipTypeTag;
+                                int secondDobuleOffset = listAccessor.getItemOffset((i * 2) + 1) + skipTypeTag;
+
+                                APointSerializerDeserializer.serialize(
+                                        ADoubleSerializerDeserializer.getDouble(listBytes, firstDoubleOffset),
+                                        ADoubleSerializerDeserializer.getDouble(listBytes, secondDobuleOffset), out);
+                            }
+                            result.set(resultStorage);
+                        } catch (IOException e) {
+                            throw HyracksDataException.create(e);
                         }
                     }
                 };
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetItemDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetItemDescriptor.java
index 276c9ae..0c55ef1 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetItemDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/GetItemDescriptor.java
@@ -21,7 +21,6 @@
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
 import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.asterix.om.functions.IFunctionDescriptor;
@@ -106,8 +105,8 @@
                             itemIndex = ATypeHierarchy.getIntegerValue(BuiltinFunctions.GET_ITEM.getName(), 0,
                                     indexBytes, indexOffset);
                         } else {
-                            throw new TypeMismatchException(BuiltinFunctions.GET_ITEM,
-                                    0, serOrderedList[offset], ATypeTag.SERIALIZED_ORDEREDLIST_TYPE_TAG);
+                            throw new TypeMismatchException(BuiltinFunctions.GET_ITEM, 0, serOrderedList[offset],
+                                    ATypeTag.SERIALIZED_ORDEREDLIST_TYPE_TAG);
                         }
 
                         if (itemIndex < 0 || itemIndex >= AOrderedListSerializerDeserializer
@@ -124,26 +123,25 @@
                             serItemTypeTag = serOrderedList[offset + 1];
                         }
 
-                        itemOffset = AOrderedListSerializerDeserializer.getItemOffset(serOrderedList, offset,
-                                itemIndex);
+                        itemOffset =
+                                AOrderedListSerializerDeserializer.getItemOffset(serOrderedList, offset, itemIndex);
 
                         if (selfDescList) {
                             itemTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serOrderedList[itemOffset]);
-                            itemLength = NonTaggedFormatUtil.getFieldValueLength(serOrderedList, itemOffset, itemTag,
-                                    true) + 1;
+                            itemLength =
+                                    NonTaggedFormatUtil.getFieldValueLength(serOrderedList, itemOffset, itemTag, true)
+                                            + 1;
                             result.set(serOrderedList, itemOffset, itemLength);
                         } else {
-                            itemLength = NonTaggedFormatUtil.getFieldValueLength(serOrderedList, itemOffset, itemTag,
-                                    false);
+                            itemLength =
+                                    NonTaggedFormatUtil.getFieldValueLength(serOrderedList, itemOffset, itemTag, false);
                             resultStorage.reset();
                             output.writeByte(serItemTypeTag);
                             output.write(serOrderedList, itemOffset, itemLength);
                             result.set(resultStorage);
                         }
                     } catch (IOException e) {
-                        throw new HyracksDataException(e);
-                    } catch (AsterixException e) {
-                        throw new HyracksDataException(e);
+                        throw HyracksDataException.create(e);
                     }
                 }
             };
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SimilarityDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SimilarityDescriptor.java
index 385b2c1..8584d06 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SimilarityDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SimilarityDescriptor.java
@@ -21,7 +21,6 @@
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.functions.FunctionConstants;
 import org.apache.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
 import org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
@@ -33,13 +32,13 @@
 import org.apache.asterix.fuzzyjoin.similarity.SimilarityMetric;
 import org.apache.asterix.om.base.ADouble;
 import org.apache.asterix.om.base.AMutableDouble;
-import org.apache.asterix.runtime.exceptions.TypeMismatchException;
 import org.apache.asterix.om.functions.IFunctionDescriptor;
 import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
 import org.apache.asterix.runtime.evaluators.common.SimilarityFiltersCache;
+import org.apache.asterix.runtime.exceptions.TypeMismatchException;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
@@ -55,8 +54,8 @@
 public class SimilarityDescriptor extends AbstractScalarFunctionDynamicDescriptor {
 
     private static final long serialVersionUID = 1L;
-    private final static FunctionIdentifier FID = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "similarity@7",
-            7);
+    private final static FunctionIdentifier FID =
+            new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "similarity@7", 7);
     public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
         @Override
         public IFunctionDescriptor createFunctionDescriptor() {
@@ -94,8 +93,8 @@
                     // result
                     private final AMutableDouble res = new AMutableDouble(0);
                     @SuppressWarnings("unchecked")
-                    private final ISerializerDeserializer<ADouble> doubleSerde = SerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ADOUBLE);
+                    private final ISerializerDeserializer<ADouble> doubleSerde =
+                            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADOUBLE);
 
                     @Override
                     public void evaluate(IFrameTupleReference tuple, IPointable result) throws HyracksDataException {
@@ -120,8 +119,8 @@
                             throw new TypeMismatchException(getIdentifier(), 1, data[offset],
                                     ATypeTag.SERIALIZED_DOUBLE_TYPE_TAG);
                         }
-                        SimilarityFilters similarityFilters = similarityFiltersCache.get(similarityThreshold, data,
-                                offset, len);
+                        SimilarityFilters similarityFilters =
+                                similarityFiltersCache.get(similarityThreshold, data, offset, len);
 
                         evalLen1.evaluate(tuple, inputVal);
                         data = inputVal.getByteArray();
@@ -167,26 +166,16 @@
                                 lengthTokens1 = AOrderedListSerializerDeserializer.getNumberOfItems(serList, offset);
                                 // read tokens
                                 for (i = 0; i < lengthTokens1; i++) {
-                                    int itemOffset;
-                                    try {
-                                        itemOffset = AOrderedListSerializerDeserializer.getItemOffset(serList, offset,
-                                                i);
-                                    } catch (AsterixException e) {
-                                        throw new HyracksDataException(e);
-                                    }
+                                    int itemOffset =
+                                            AOrderedListSerializerDeserializer.getItemOffset(serList, offset, i);
                                     tokens1.add(IntegerPointable.getInteger(serList, itemOffset));
                                 }
                             } else {
                                 lengthTokens1 = AUnorderedListSerializerDeserializer.getNumberOfItems(serList, offset);
                                 // read tokens
                                 for (i = 0; i < lengthTokens1; i++) {
-                                    int itemOffset;
-                                    try {
-                                        itemOffset = AUnorderedListSerializerDeserializer.getItemOffset(serList, offset,
-                                                i);
-                                    } catch (AsterixException e) {
-                                        throw new HyracksDataException(e);
-                                    }
+                                    int itemOffset =
+                                            AUnorderedListSerializerDeserializer.getItemOffset(serList, offset, i);
                                     tokens1.add(IntegerPointable.getInteger(serList, itemOffset));
                                 }
                             }
@@ -213,26 +202,16 @@
                                 lengthTokens2 = AOrderedListSerializerDeserializer.getNumberOfItems(serList, offset);
                                 // read tokens
                                 for (i = 0; i < lengthTokens2; i++) {
-                                    int itemOffset;
-                                    try {
-                                        itemOffset = AOrderedListSerializerDeserializer.getItemOffset(serList, offset,
-                                                i);
-                                    } catch (AsterixException e) {
-                                        throw new HyracksDataException(e);
-                                    }
+                                    int itemOffset =
+                                            AOrderedListSerializerDeserializer.getItemOffset(serList, offset, i);
                                     tokens2.add(IntegerPointable.getInteger(serList, itemOffset));
                                 }
                             } else {
                                 lengthTokens2 = AUnorderedListSerializerDeserializer.getNumberOfItems(serList, offset);
                                 // read tokens
                                 for (i = 0; i < lengthTokens2; i++) {
-                                    int itemOffset;
-                                    try {
-                                        itemOffset = AUnorderedListSerializerDeserializer.getItemOffset(serList, offset,
-                                                i);
-                                    } catch (AsterixException e) {
-                                        throw new HyracksDataException(e);
-                                    }
+                                    int itemOffset =
+                                            AUnorderedListSerializerDeserializer.getItemOffset(serList, offset, i);
                                     tokens2.add(IntegerPointable.getInteger(serList, itemOffset));
                                 }
                             }
@@ -243,8 +222,8 @@
 
                             // -- - token prefix - --
                             evalTokenPrefix.evaluate(tuple, inputVal);
-                            int tokenPrefix = IntegerPointable.getInteger(inputVal.getByteArray(),
-                                    inputVal.getStartOffset() + 1);
+                            int tokenPrefix =
+                                    IntegerPointable.getInteger(inputVal.getByteArray(), inputVal.getStartOffset() + 1);
 
                             //
                             // -- - position filter - --
@@ -272,7 +251,7 @@
                         try {
                             doubleSerde.serialize(res, out);
                         } catch (IOException e) {
-                            throw new HyracksDataException(e);
+                            throw HyracksDataException.create(e);
                         }
                         result.set(resultStorage);
                     }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/StringConcatDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/StringConcatDescriptor.java
index 8ad3255..851e1a4 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/StringConcatDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/StringConcatDescriptor.java
@@ -21,11 +21,9 @@
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
 import org.apache.asterix.om.base.AMissing;
 import org.apache.asterix.om.base.ANull;
-import org.apache.asterix.runtime.exceptions.TypeMismatchException;
 import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.asterix.om.functions.IFunctionDescriptor;
 import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
@@ -33,6 +31,7 @@
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
 import org.apache.asterix.runtime.evaluators.common.ListAccessor;
+import org.apache.asterix.runtime.exceptions.TypeMismatchException;
 import org.apache.asterix.runtime.exceptions.UnsupportedItemTypeException;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
@@ -72,10 +71,10 @@
                     private final IPointable inputArgList = new VoidPointable();
                     private final IScalarEvaluator evalList = listEvalFactory.createScalarEvaluator(ctx);
                     @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<ANull> nullSerde = SerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ANULL);
-                    private ISerializerDeserializer<AMissing> missingSerde = SerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.AMISSING);
+                    private ISerializerDeserializer<ANull> nullSerde =
+                            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+                    private ISerializerDeserializer<AMissing> missingSerde =
+                            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AMISSING);
                     private final byte[] tempLengthArray = new byte[5];
 
                     @Override
@@ -93,49 +92,45 @@
                                         ATypeTag.SERIALIZED_UNORDEREDLIST_TYPE_TAG);
                             }
                             listAccessor.reset(listBytes, listOffset);
-                            try {
-                                // calculate length first
-                                int utf8Len = 0;
-                                for (int i = 0; i < listAccessor.size(); i++) {
-                                    int itemOffset = listAccessor.getItemOffset(i);
-                                    ATypeTag itemType = listAccessor.getItemType(itemOffset);
-                                    // Increase the offset by 1 if the give list has heterogeneous elements,
-                                    // since the item itself has a typetag.
-                                    if (listAccessor.itemsAreSelfDescribing()) {
-                                        itemOffset += 1;
-                                    }
-                                    if (itemType != ATypeTag.STRING) {
-                                        if (itemType == ATypeTag.NULL) {
-                                            nullSerde.serialize(ANull.NULL, out);
-                                            result.set(resultStorage);
-                                            return;
-                                        }
-                                        if (itemType == ATypeTag.MISSING) {
-                                            missingSerde.serialize(AMissing.MISSING, out);
-                                            result.set(resultStorage);
-                                            return;
-                                        }
-                                        throw new UnsupportedItemTypeException(getIdentifier(), itemType.serialize());
-                                    }
-                                    utf8Len += UTF8StringUtil.getUTFLength(listBytes, itemOffset);
+                            // calculate length first
+                            int utf8Len = 0;
+                            for (int i = 0; i < listAccessor.size(); i++) {
+                                int itemOffset = listAccessor.getItemOffset(i);
+                                ATypeTag itemType = listAccessor.getItemType(itemOffset);
+                                // Increase the offset by 1 if the give list has heterogeneous elements,
+                                // since the item itself has a typetag.
+                                if (listAccessor.itemsAreSelfDescribing()) {
+                                    itemOffset += 1;
                                 }
-                                out.writeByte(ATypeTag.SERIALIZED_STRING_TYPE_TAG);
-                                int cbytes = UTF8StringUtil.encodeUTF8Length(utf8Len, tempLengthArray, 0);
-                                out.write(tempLengthArray, 0, cbytes);
-                                for (int i = 0; i < listAccessor.size(); i++) {
-                                    int itemOffset = listAccessor.getItemOffset(i);
-                                    if (listAccessor.itemsAreSelfDescribing()) {
-                                        itemOffset += 1;
+                                if (itemType != ATypeTag.STRING) {
+                                    if (itemType == ATypeTag.NULL) {
+                                        nullSerde.serialize(ANull.NULL, out);
+                                        result.set(resultStorage);
+                                        return;
                                     }
-                                    utf8Len = UTF8StringUtil.getUTFLength(listBytes, itemOffset);
-                                    out.write(listBytes, UTF8StringUtil.getNumBytesToStoreLength(utf8Len) + itemOffset,
-                                            utf8Len);
+                                    if (itemType == ATypeTag.MISSING) {
+                                        missingSerde.serialize(AMissing.MISSING, out);
+                                        result.set(resultStorage);
+                                        return;
+                                    }
+                                    throw new UnsupportedItemTypeException(getIdentifier(), itemType.serialize());
                                 }
-                            } catch (AsterixException ex) {
-                                throw new HyracksDataException(ex);
+                                utf8Len += UTF8StringUtil.getUTFLength(listBytes, itemOffset);
                             }
-                        } catch (IOException e1) {
-                            throw new HyracksDataException(e1);
+                            out.writeByte(ATypeTag.SERIALIZED_STRING_TYPE_TAG);
+                            int cbytes = UTF8StringUtil.encodeUTF8Length(utf8Len, tempLengthArray, 0);
+                            out.write(tempLengthArray, 0, cbytes);
+                            for (int i = 0; i < listAccessor.size(); i++) {
+                                int itemOffset = listAccessor.getItemOffset(i);
+                                if (listAccessor.itemsAreSelfDescribing()) {
+                                    itemOffset += 1;
+                                }
+                                utf8Len = UTF8StringUtil.getUTFLength(listBytes, itemOffset);
+                                out.write(listBytes, UTF8StringUtil.getNumBytesToStoreLength(utf8Len) + itemOffset,
+                                        utf8Len);
+                            }
+                        } catch (IOException e) {
+                            throw HyracksDataException.create(e);
                         }
                         result.set(resultStorage);
                     }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/StringJoinDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/StringJoinDescriptor.java
index 6cfdffc..70b3687 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/StringJoinDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/StringJoinDescriptor.java
@@ -21,7 +21,6 @@
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
 import org.apache.asterix.om.base.AMissing;
 import org.apache.asterix.om.base.ANull;
@@ -74,10 +73,10 @@
                     private final IScalarEvaluator evalList = listEvalFactory.createScalarEvaluator(ctx);
                     private final IScalarEvaluator evalSep = sepEvalFactory.createScalarEvaluator(ctx);
                     @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<ANull> nullSerde = SerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ANULL);
-                    private ISerializerDeserializer<AMissing> missingSerde = SerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.AMISSING);
+                    private ISerializerDeserializer<ANull> nullSerde =
+                            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
+                    private ISerializerDeserializer<AMissing> missingSerde =
+                            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AMISSING);
                     private final byte[] tempLengthArray = new byte[5];
 
                     @Override
@@ -151,8 +150,8 @@
                                     out.writeByte(sepBytes[sepOffset + 1 + sepMetaLen + j]);
                                 }
                             }
-                        } catch (IOException | AsterixException ex) {
-                            throw new HyracksDataException(ex);
+                        } catch (IOException ex) {
+                            throw HyracksDataException.create(ex);
                         }
                         result.set(resultStorage);
                     }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/binary/BinaryConcatDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/binary/BinaryConcatDescriptor.java
index b394062..907dfd3 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/binary/BinaryConcatDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/binary/BinaryConcatDescriptor.java
@@ -21,7 +21,6 @@
 
 import java.io.IOException;
 
-import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
 import org.apache.asterix.om.base.AMissing;
 import org.apache.asterix.om.base.ANull;
@@ -72,11 +71,11 @@
                     private final ListAccessor listAccessor = new ListAccessor();
                     private final byte[] metaBuffer = new byte[5];
                     @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<ANull> nullSerde = SerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ANULL);
+                    private ISerializerDeserializer<ANull> nullSerde =
+                            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
                     @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<AMissing> missingSerde = SerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.AMISSING);
+                    private ISerializerDeserializer<AMissing> missingSerde =
+                            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AMISSING);
 
                     @Override
                     public void evaluate(IFrameTupleReference tuple, IPointable result) throws HyracksDataException {
@@ -118,9 +117,7 @@
                                         itemOffset + ByteArrayPointable.getNumberBytesToStoreMeta(length), length);
                             }
                         } catch (IOException e) {
-                            throw new HyracksDataException(e);
-                        } catch (AsterixException e) {
-                            throw new HyracksDataException(e);
+                            throw HyracksDataException.create(e);
                         }
                         result.set(resultStorage);
                     }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessByIndexEvalFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessByIndexEvalFactory.java
index 3321efc..d5918cb 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessByIndexEvalFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessByIndexEvalFactory.java
@@ -21,9 +21,7 @@
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
-import org.apache.asterix.runtime.exceptions.TypeMismatchException;
 import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
@@ -31,6 +29,7 @@
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.om.utils.NonTaggedFormatUtil;
 import org.apache.asterix.om.utils.RecordUtil;
+import org.apache.asterix.runtime.exceptions.TypeMismatchException;
 import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
@@ -90,15 +89,15 @@
                     int offset = inputArg0.getStartOffset();
 
                     if (serRecord[offset] != ATypeTag.SERIALIZED_RECORD_TYPE_TAG) {
-                        throw new TypeMismatchException(BuiltinFunctions.FIELD_ACCESS_BY_INDEX, 0,
-                                serRecord[offset], ATypeTag.SERIALIZED_RECORD_TYPE_TAG);
+                        throw new TypeMismatchException(BuiltinFunctions.FIELD_ACCESS_BY_INDEX, 0, serRecord[offset],
+                                ATypeTag.SERIALIZED_RECORD_TYPE_TAG);
                     }
                     eval1.evaluate(tuple, inputArg1);
                     byte[] indexBytes = inputArg1.getByteArray();
                     int indexOffset = inputArg1.getStartOffset();
                     if (indexBytes[indexOffset] != ATypeTag.SERIALIZED_INT32_TYPE_TAG) {
-                        throw new TypeMismatchException(BuiltinFunctions.FIELD_ACCESS_BY_INDEX, 1,
-                                indexBytes[offset], ATypeTag.SERIALIZED_INT32_TYPE_TAG);
+                        throw new TypeMismatchException(BuiltinFunctions.FIELD_ACCESS_BY_INDEX, 1, indexBytes[offset],
+                                ATypeTag.SERIALIZED_INT32_TYPE_TAG);
                     }
                     fieldIndex = IntegerPointable.getInteger(indexBytes, indexOffset + 1);
                     fieldValueType = recordType.getFieldTypes()[fieldIndex];
@@ -137,9 +136,7 @@
                     out.write(serRecord, fieldValueOffset, fieldValueLength);
                     result.set(resultStorage);
                 } catch (IOException e) {
-                    throw new HyracksDataException(e);
-                } catch (AsterixException e) {
-                    throw new HyracksDataException(e);
+                    throw HyracksDataException.create(e);
                 }
             }
         };
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessByNameEvalFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessByNameEvalFactory.java
index 87fa292..2078921 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessByNameEvalFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessByNameEvalFactory.java
@@ -21,13 +21,12 @@
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
-import org.apache.asterix.runtime.exceptions.TypeMismatchException;
 import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.EnumDeserializer;
 import org.apache.asterix.om.utils.NonTaggedFormatUtil;
+import org.apache.asterix.runtime.exceptions.TypeMismatchException;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -94,9 +93,7 @@
                             fieldValueTypeTag, true) + 1;
                     result.set(serRecord, fieldValueOffset, fieldValueLength);
                 } catch (IOException e) {
-                    throw new HyracksDataException(e);
-                } catch (AsterixException e) {
-                    throw new HyracksDataException(e);
+                    throw HyracksDataException.create(e);
                 }
             }
         };
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessNestedEvalFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessNestedEvalFactory.java
index 2da8e90..067a458 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessNestedEvalFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/FieldAccessNestedEvalFactory.java
@@ -22,7 +22,6 @@
 import java.io.IOException;
 import java.util.List;
 
-import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
 import org.apache.asterix.om.base.AMissing;
@@ -80,11 +79,11 @@
             private final IPointable[] fieldPointables = new VoidPointable[fieldPath.size()];
             private final RuntimeRecordTypeInfo[] recTypeInfos = new RuntimeRecordTypeInfo[fieldPath.size()];
             @SuppressWarnings("unchecked")
-            private final ISerializerDeserializer<ANull> nullSerde = SerializerDeserializerProvider.INSTANCE
-                    .getSerializerDeserializer(BuiltinType.ANULL);
+            private final ISerializerDeserializer<ANull> nullSerde =
+                    SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
             @SuppressWarnings("unchecked")
-            private final ISerializerDeserializer<AMissing> missingSerde = SerializerDeserializerProvider.INSTANCE
-                    .getSerializerDeserializer(BuiltinType.AMISSING);
+            private final ISerializerDeserializer<AMissing> missingSerde =
+                    SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AMISSING);
 
             {
                 generateFieldsPointables();
@@ -100,8 +99,7 @@
                     ArrayBackedValueStorage storage = new ArrayBackedValueStorage();
                     DataOutput out = storage.getDataOutput();
                     AString as = new AString(fieldPath.get(i));
-                    SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(as.getType()).serialize(as,
-                                out);
+                    SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(as.getType()).serialize(as, out);
                     fieldPointables[i] = new VoidPointable();
                     fieldPointables[i].set(storage);
                 }
@@ -118,8 +116,8 @@
                     int len = inputArg0.getLength();
 
                     if (serRecord[start] != ATypeTag.SERIALIZED_RECORD_TYPE_TAG) {
-                        throw new TypeMismatchException(BuiltinFunctions.FIELD_ACCESS_NESTED, 0,
-                                serRecord[start], ATypeTag.SERIALIZED_RECORD_TYPE_TAG);
+                        throw new TypeMismatchException(BuiltinFunctions.FIELD_ACCESS_NESTED, 0, serRecord[start],
+                                ATypeTag.SERIALIZED_RECORD_TYPE_TAG);
                     }
 
                     int subFieldIndex = -1;
@@ -141,8 +139,7 @@
                             subType = ((AUnionType) subType).getActualType();
                             byte serializedTypeTag = subType.getTypeTag().serialize();
                             if (serializedTypeTag != ATypeTag.SERIALIZED_RECORD_TYPE_TAG) {
-                                throw new UnsupportedTypeException(
-                                        BuiltinFunctions.FIELD_ACCESS_NESTED.getName(),
+                                throw new UnsupportedTypeException(BuiltinFunctions.FIELD_ACCESS_NESTED.getName(),
                                         serializedTypeTag);
                             }
                             if (subType.getTypeTag() == ATypeTag.OBJECT) {
@@ -198,8 +195,7 @@
                         // type check
                         if (pathIndex < fieldPointables.length - 1
                                 && serRecord[start] != ATypeTag.SERIALIZED_RECORD_TYPE_TAG) {
-                            throw new UnsupportedTypeException(BuiltinFunctions.FIELD_ACCESS_NESTED,
-                                    serRecord[start]);
+                            throw new UnsupportedTypeException(BuiltinFunctions.FIELD_ACCESS_NESTED, serRecord[start]);
                         }
                     }
 
@@ -215,8 +211,9 @@
                         }
 
                         subTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serRecord[subFieldOffset]);
-                        subFieldLength = NonTaggedFormatUtil.getFieldValueLength(serRecord, subFieldOffset, subTypeTag,
-                                true) + 1;
+                        subFieldLength =
+                                NonTaggedFormatUtil.getFieldValueLength(serRecord, subFieldOffset, subTypeTag, true)
+                                        + 1;
 
                         if (pathIndex >= fieldPointables.length - 1) {
                             continue;
@@ -232,8 +229,8 @@
                             return;
                         }
                         if (serRecord[start] != ATypeTag.SERIALIZED_RECORD_TYPE_TAG) {
-                                throw new UnsupportedTypeException(
-                                    BuiltinFunctions.FIELD_ACCESS_NESTED.getName(), serRecord[start]);
+                            throw new UnsupportedTypeException(BuiltinFunctions.FIELD_ACCESS_NESTED.getName(),
+                                    serRecord[start]);
                         }
                     }
                     // emit the final result.
@@ -244,8 +241,8 @@
                         out.write(serRecord, subFieldOffset, subFieldLength);
                         result.set(resultStorage);
                     }
-                } catch (IOException | AsterixException e) {
-                    throw new HyracksDataException(e);
+                } catch (IOException e) {
+                    throw HyracksDataException.create(e);
                 }
             }
         };
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/GetRecordFieldValueEvalFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/GetRecordFieldValueEvalFactory.java
index c162e3d..95fb91a 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/GetRecordFieldValueEvalFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/GetRecordFieldValueEvalFactory.java
@@ -21,9 +21,7 @@
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
-import org.apache.asterix.runtime.exceptions.TypeMismatchException;
 import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
@@ -31,6 +29,7 @@
 import org.apache.asterix.om.types.runtime.RuntimeRecordTypeInfo;
 import org.apache.asterix.om.utils.NonTaggedFormatUtil;
 import org.apache.asterix.om.utils.RecordUtil;
+import org.apache.asterix.runtime.exceptions.TypeMismatchException;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -107,8 +106,8 @@
                             return;
                         }
                         ATypeTag fieldTypeTag = recordType.getFieldTypes()[subFieldIndex].getTypeTag();
-                        subFieldLength = NonTaggedFormatUtil.getFieldValueLength(serRecord, subFieldOffset,
-                                fieldTypeTag, false);
+                        subFieldLength =
+                                NonTaggedFormatUtil.getFieldValueLength(serRecord, subFieldOffset, fieldTypeTag, false);
                         // write result.
                         out.writeByte(fieldTypeTag.serialize());
                         out.write(serRecord, subFieldOffset, subFieldLength);
@@ -125,16 +124,15 @@
                         return;
                     }
                     // Get the field length.
-                    ATypeTag fieldValueTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER
-                            .deserialize(serRecord[subFieldOffset]);
-                    subFieldLength = NonTaggedFormatUtil.getFieldValueLength(serRecord, subFieldOffset,
-                            fieldValueTypeTag, true) + 1;
+                    ATypeTag fieldValueTypeTag =
+                            EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serRecord[subFieldOffset]);
+                    subFieldLength =
+                            NonTaggedFormatUtil.getFieldValueLength(serRecord, subFieldOffset, fieldValueTypeTag, true)
+                                    + 1;
                     // write result.
                     result.set(serRecord, subFieldOffset, subFieldLength);
                 } catch (IOException e) {
-                    throw new HyracksDataException(e);
-                } catch (AsterixException e) {
-                    throw new HyracksDataException(e);
+                    throw HyracksDataException.create(e);
                 }
             }
         };
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index 0a11684..1f41c82 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -199,7 +199,7 @@
         try {
             if (ctx.getSharedObject() != null) {
                 PrimaryIndexLogMarkerCallback callback = new PrimaryIndexLogMarkerCallback((AbstractLSMIndex) index);
-                TaskUtil.putInSharedMap(ILogMarkerCallback.KEY_MARKER_CALLBACK, callback, ctx);
+                TaskUtil.put(ILogMarkerCallback.KEY_MARKER_CALLBACK, callback, ctx);
             }
             missingTupleBuilder = new ArrayTupleBuilder(1);
             DataOutput out = missingTupleBuilder.getDataOutput();
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
index 85e92c3..3957c06 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
@@ -20,8 +20,8 @@
 
 import java.nio.ByteBuffer;
 
-import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ILockManager;
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/NoTupleSourceRuntimeFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/NoTupleSourceRuntimeFactory.java
index 377ad60..25306a6 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/NoTupleSourceRuntimeFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/NoTupleSourceRuntimeFactory.java
@@ -41,6 +41,9 @@
             public void open() throws HyracksDataException {
                 try {
                     writer.open();
+                } catch (Throwable e) {
+                    writer.fail();
+                    throw e;
                 } finally {
                     writer.close();
                 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/ScanCollectionDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/ScanCollectionDescriptor.java
index 8ee10dc..3409e61 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/ScanCollectionDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/ScanCollectionDescriptor.java
@@ -21,13 +21,12 @@
 
 import java.io.IOException;
 
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.runtime.exceptions.TypeMismatchException;
 import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.asterix.om.functions.IFunctionDescriptor;
 import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.runtime.evaluators.common.ListAccessor;
+import org.apache.asterix.runtime.exceptions.TypeMismatchException;
 import org.apache.asterix.runtime.unnestingfunctions.base.AbstractUnnestingFunctionDynamicDescriptor;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
@@ -112,9 +111,7 @@
                             }
                         }
                     } catch (IOException e) {
-                        throw new HyracksDataException(e);
-                    } catch (AsterixException e) {
-                        throw new HyracksDataException(e);
+                        throw HyracksDataException.create(e);
                     }
                     return false;
                 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/SubsetCollectionDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/SubsetCollectionDescriptor.java
index 84a59ac..47fbc4f 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/SubsetCollectionDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/SubsetCollectionDescriptor.java
@@ -20,7 +20,6 @@
 
 import java.io.IOException;
 
-import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
@@ -80,8 +79,7 @@
                         try {
                             evalStart.evaluate(tuple, inputVal);
                             posStart = ATypeHierarchy.getIntegerValue(getIdentifier().getName(), 0,
-                                    inputVal.getByteArray(),
-                                    inputVal.getStartOffset());
+                                    inputVal.getByteArray(), inputVal.getStartOffset());
 
                             evalLen.evaluate(tuple, inputVal);
                             numItems = ATypeHierarchy.getIntegerValue(getIdentifier().getName(), 1,
@@ -128,8 +126,8 @@
                             int offset = inputVal.getStartOffset();
                             int itemLength = 0;
                             try {
-                                int itemOffset = AOrderedListSerializerDeserializer.getItemOffset(serList, offset,
-                                        posCrt);
+                                int itemOffset =
+                                        AOrderedListSerializerDeserializer.getItemOffset(serList, offset, posCrt);
                                 if (selfDescList) {
                                     itemTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serList[itemOffset]);
                                 }
@@ -141,9 +139,7 @@
                                 resultStorage.getDataOutput().write(serList, itemOffset,
                                         itemLength + (!selfDescList ? 0 : 1));
                             } catch (IOException e) {
-                                throw new HyracksDataException(e);
-                            } catch (AsterixException e) {
-                                throw new HyracksDataException(e);
+                                throw HyracksDataException.create(e);
                             }
                             result.set(resultStorage);
                             ++posCrt;
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java
index 71acecf..d0b7b47 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java
@@ -21,28 +21,20 @@
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.commons.lang3.mutable.Mutable;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
 import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
 import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
-import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
-import org.apache.hyracks.algebricks.runtime.operators.base.SinkRuntimeFactory;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
 import org.apache.hyracks.dataflow.std.misc.SinkOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.union.UnionAllOperatorDescriptor;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
 
 public class SinkPOperator extends AbstractPhysicalOperator {
 
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/EnforcePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/EnforcePushRuntime.java
new file mode 100644
index 0000000..26002e6
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/EnforcePushRuntime.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.runtime.base;
+
+import org.apache.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.dataflow.EnforceFrameWriter;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class EnforcePushRuntime extends EnforceFrameWriter implements IPushRuntime {
+
+    private final IPushRuntime pushRuntime;
+
+    private EnforcePushRuntime(IPushRuntime pushRuntime) {
+        super(pushRuntime);
+        this.pushRuntime = pushRuntime;
+    }
+
+    @Override
+    public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+        pushRuntime.setOutputFrameWriter(index, writer, recordDesc);
+    }
+
+    @Override
+    public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
+        pushRuntime.setInputRecordDescriptor(index, recordDescriptor);
+    }
+
+    public static IPushRuntime enforce(IPushRuntime pushRuntime) {
+        return pushRuntime instanceof EnforcePushRuntime || pushRuntime instanceof NestedTupleSourceRuntime
+                ? pushRuntime : new EnforcePushRuntime(pushRuntime);
+    }
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntime.java
index 27d6900..d33927b 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntime.java
@@ -22,7 +22,26 @@
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 
 public interface IPushRuntime extends IFrameWriter {
-    public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc);
 
-    public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor);
+    /**
+     * Sets the output frame writer for this writer
+     *
+     * @param index,
+     *            the index of the output channel.
+     * @param writer,
+     *            the writer for writing output.
+     * @param recordDesc,
+     *            the output record descriptor.
+     */
+    void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc);
+
+    /**
+     * Sets the input record descriptor for this writer.
+     *
+     * @param index,
+     *            the index of the input channel.
+     * @param recordDescriptor,
+     *            the corresponding input record descriptor.
+     */
+    void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor);
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
index 42e5157..e99b61b 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
@@ -124,6 +124,7 @@
 
             @Override
             public void fail() throws HyracksDataException {
+                failed = true;
                 if (isOpen) {
                     writer.fail();
                 }
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
index b397f23..893aa61 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
@@ -21,6 +21,7 @@
 import java.nio.ByteBuffer;
 
 import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import org.apache.hyracks.algebricks.runtime.base.EnforcePushRuntime;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import org.apache.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime;
@@ -29,6 +30,7 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.std.group.AbstractAccumulatingAggregatorDescriptorFactory;
@@ -52,7 +54,6 @@
     @Override
     public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDesc,
             RecordDescriptor outRecordDescriptor, int[] keys, int[] partialKeys) throws HyracksDataException {
-
         final AggregatorOutput outputWriter = new AggregatorOutput(subplans, keyFieldIdx.length, decorFieldIdx.length);
         final NestedTupleSourceRuntime[] pipelines = new NestedTupleSourceRuntime[subplans.length];
         for (int i = 0; i < subplans.length; i++) {
@@ -91,8 +92,8 @@
             }
 
             @Override
-            public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor stateAccessor, int tIndex,
-                    AggregateState state) throws HyracksDataException {
+            public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor stateAccessor,
+                    int tIndex, AggregateState state) throws HyracksDataException {
                 for (int i = 0; i < pipelines.length; i++) {
                     outputWriter.setInputIdx(i);
                     pipelines[i].close();
@@ -144,9 +145,13 @@
         IFrameWriter start = writer;
         IPushRuntimeFactory[] runtimeFactories = subplan.getRuntimeFactories();
         RecordDescriptor[] recordDescriptors = subplan.getRecordDescriptors();
+        // should enforce protocol
+        boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT);
         for (int i = runtimeFactories.length - 1; i >= 0; i--) {
             IPushRuntime newRuntime = runtimeFactories[i].createPushRuntime(ctx);
-            newRuntime.setFrameWriter(0, start, recordDescriptors[i]);
+            newRuntime = enforce ? EnforcePushRuntime.enforce(newRuntime) : newRuntime;
+            start = enforce ? EnforcePushRuntime.enforce(start) : start;
+            newRuntime.setOutputFrameWriter(0, start, recordDescriptors[i]);
             if (i > 0) {
                 newRuntime.setInputRecordDescriptor(0, recordDescriptors[i - 1]);
             } else {
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
index 52245e1..8b8e320 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
@@ -21,6 +21,7 @@
 import java.nio.ByteBuffer;
 
 import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import org.apache.hyracks.algebricks.runtime.base.EnforcePushRuntime;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import org.apache.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime;
@@ -28,8 +29,10 @@
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.EnforceFrameWriter;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -58,11 +61,14 @@
     public IAggregatorDescriptor createAggregator(final IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
             RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults,
             final IFrameWriter writer) throws HyracksDataException {
-        final RunningAggregatorOutput outputWriter = new RunningAggregatorOutput(ctx, subplans, keyFieldIdx.length,
-                decorFieldIdx.length, writer);
+        final RunningAggregatorOutput outputWriter =
+                new RunningAggregatorOutput(ctx, subplans, keyFieldIdx.length, decorFieldIdx.length, writer);
+        // should enforce protocol
+        boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT);
+        IFrameWriter enforcedWriter = enforce ? EnforceFrameWriter.enforce(outputWriter) : outputWriter;
         final NestedTupleSourceRuntime[] pipelines = new NestedTupleSourceRuntime[subplans.length];
         for (int i = 0; i < subplans.length; i++) {
-                pipelines[i] = (NestedTupleSourceRuntime) assemblePipeline(subplans[i], outputWriter, ctx);
+            pipelines[i] = (NestedTupleSourceRuntime) assemblePipeline(subplans[i], enforcedWriter, ctx);
         }
 
         final ArrayTupleBuilder gbyTb = outputWriter.getGroupByTupleBuilder();
@@ -136,13 +142,17 @@
 
     private IFrameWriter assemblePipeline(AlgebricksPipeline subplan, IFrameWriter writer, IHyracksTaskContext ctx)
             throws HyracksDataException {
+        // should enforce protocol
+        boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT);
         // plug the operators
         IFrameWriter start = writer;
         IPushRuntimeFactory[] runtimeFactories = subplan.getRuntimeFactories();
         RecordDescriptor[] recordDescriptors = subplan.getRecordDescriptors();
         for (int i = runtimeFactories.length - 1; i >= 0; i--) {
             IPushRuntime newRuntime = runtimeFactories[i].createPushRuntime(ctx);
-            newRuntime.setFrameWriter(0, start, recordDescriptors[i]);
+            newRuntime = enforce ? EnforcePushRuntime.enforce(newRuntime) : newRuntime;
+            start = enforce ? EnforceFrameWriter.enforce(start) : start;
+            newRuntime.setOutputFrameWriter(0, start, recordDescriptors[i]);
             if (i > 0) {
                 newRuntime.setInputRecordDescriptor(0, recordDescriptors[i - 1]);
             } else {
@@ -206,8 +216,9 @@
                 int start = 0;
                 int offset = 0;
                 for (int i = 0; i < fieldEnds.length; i++) {
-                    if (i > 0)
+                    if (i > 0) {
                         start = fieldEnds[i - 1];
+                    }
                     offset = fieldEnds[i] - start;
                     tb.addField(data, start, offset);
                 }
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java
index 33e7c73..5cced8d 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java
@@ -29,7 +29,7 @@
     protected boolean failed;
 
     @Override
-    public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+    public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
         this.writer = writer;
         this.outputRecordDesc = recordDesc;
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSinkPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSinkPushRuntime.java
index e430461..d47199d 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSinkPushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSinkPushRuntime.java
@@ -26,7 +26,7 @@
     protected RecordDescriptor inputRecordDesc;
 
     @Override
-    public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+    public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
         throw new IllegalStateException();
     }
 
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java
index b7707d4..35563e0 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java
@@ -27,20 +27,29 @@
 
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        // nextFrame will never be called on this runtime
         throw new UnsupportedOperationException();
     }
 
     @Override
     public void close() throws HyracksDataException {
+        // close is a no op since this operator completes operating in open()
     }
 
     @Override
     public void fail() throws HyracksDataException {
-        writer.fail();
+        // fail is a no op since if a failure happened, the operator would've already called fail() on downstream
+    }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        // flush will never be called on this runtime
+        throw new UnsupportedOperationException();
     }
 
     @Override
     public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
+        // setInputRecordDescriptor will never be called on this runtime since it has no input
         throw new UnsupportedOperationException();
     }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
index 4678887..d3c9951 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
@@ -81,6 +81,7 @@
 
             @Override
             public void fail() throws HyracksDataException {
+                failed = true;
                 pgw.fail();
             }
 
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
index 1294614..f6ebf19 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
@@ -112,6 +112,7 @@
         return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
 
             private IFrameWriter startOfPipeline;
+            private boolean opened = false;
 
             @Override
             public void open() throws HyracksDataException {
@@ -124,6 +125,7 @@
                             pipelineInputRecordDescriptor, pipelineOutputRecordDescriptor);
                     startOfPipeline = pa.assemblePipeline(writer, ctx);
                 }
+                opened = true;
                 startOfPipeline.open();
             }
 
@@ -134,12 +136,16 @@
 
             @Override
             public void close() throws HyracksDataException {
-                startOfPipeline.close();
+                if (opened) {
+                    startOfPipeline.close();
+                }
             }
 
             @Override
             public void fail() throws HyracksDataException {
-                startOfPipeline.fail();
+                if (opened) {
+                    startOfPipeline.fail();
+                }
             }
 
             @Override
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
index 03e2aaf..e1081e0 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
@@ -19,11 +19,14 @@
 package org.apache.hyracks.algebricks.runtime.operators.meta;
 
 import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import org.apache.hyracks.algebricks.runtime.base.EnforcePushRuntime;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.EnforceFrameWriter;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobFlag;
 
 public class PipelineAssembler {
 
@@ -44,18 +47,21 @@
         this.outputArity = outputArity;
     }
 
-    public IFrameWriter assemblePipeline(IFrameWriter writer, IHyracksTaskContext ctx) throws
-            HyracksDataException {
+    public IFrameWriter assemblePipeline(IFrameWriter writer, IHyracksTaskContext ctx) throws HyracksDataException {
+        // should enforce protocol
+        boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT);
         // plug the operators
         IFrameWriter start = writer;// this.writer;
         for (int i = pipeline.getRuntimeFactories().length - 1; i >= 0; i--) {
             IPushRuntime newRuntime = pipeline.getRuntimeFactories()[i].createPushRuntime(ctx);
+            newRuntime = enforce ? EnforcePushRuntime.enforce(newRuntime) : newRuntime;
+            start = enforce ? EnforceFrameWriter.enforce(start) : start;
             if (i == pipeline.getRuntimeFactories().length - 1) {
                 if (outputArity == 1) {
-                    newRuntime.setFrameWriter(0, start, pipelineOutputRecordDescriptor);
+                    newRuntime.setOutputFrameWriter(0, start, pipelineOutputRecordDescriptor);
                 }
             } else {
-                newRuntime.setFrameWriter(0, start, pipeline.getRecordDescriptors()[i]);
+                newRuntime.setOutputFrameWriter(0, start, pipeline.getRecordDescriptors()[i]);
             }
             if (i > 0) {
                 newRuntime.setInputRecordDescriptor(0, pipeline.getRecordDescriptors()[i - 1]);
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
index 3e30f73..925ff93 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
@@ -79,11 +79,6 @@
             }
 
             @Override
-            public void fail() throws HyracksDataException {
-                writer.fail();
-            }
-
-            @Override
             public void close() throws HyracksDataException {
                 try {
                     frameSorter.sort();
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
index e123adf..496679f 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
@@ -65,11 +65,6 @@
         }
 
         @Override
-        public void fail() throws HyracksDataException {
-            writer.fail();
-        }
-
-        @Override
         public void flush() throws HyracksDataException {
             writer.flush();
         }
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
index 38fe7d1..b3e2b64 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
@@ -117,8 +117,9 @@
 
             @Override
             public void fail() throws HyracksDataException {
+                failed = true;
                 if (isOpen) {
-                    super.fail();
+                    writer.fail();
                 }
             }
 
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
index ebf3d3a..55146e2 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
@@ -81,6 +81,7 @@
 
     @Override
     public void fail() throws HyracksDataException {
+        // fail() is a no op. in close we will cleanup
     }
 
     @Override
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
index 0f57fd7..d286c3b 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
@@ -108,8 +108,9 @@
 
             @Override
             public void fail() throws HyracksDataException {
+                failed = true;
                 if (isOpen) {
-                    super.fail();
+                    writer.fail();
                 }
             }
 
@@ -118,6 +119,9 @@
                 if (isOpen) {
                     try {
                         flushIfNotFailed();
+                    } catch (Throwable th) {
+                        writer.fail();
+                        throw th;
                     } finally {
                         writer.close();
                     }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
index aa9232e..e2868ae 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -177,7 +177,7 @@
         }
 
         public StartJobFunction(JobId jobId) {
-            this(null, null, null, jobId);
+            this(null, null, EnumSet.noneOf(JobFlag.class), jobId);
         }
 
         public StartJobFunction(byte[] acggfBytes, EnumSet<JobFlag> jobFlags) {
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
index 4b3aff2..4f37b90 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
@@ -102,14 +102,14 @@
 
     @Override
     public JobId startJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
-        JobSpecificationActivityClusterGraphGeneratorFactory jsacggf = new JobSpecificationActivityClusterGraphGeneratorFactory(
-                jobSpec);
+        IActivityClusterGraphGeneratorFactory jsacggf =
+                new JobSpecificationActivityClusterGraphGeneratorFactory(jobSpec);
         return startJob(jsacggf, jobFlags);
     }
 
     @Override
     public JobId distributeJob(JobSpecification jobSpec) throws Exception {
-        JobSpecificationActivityClusterGraphGeneratorFactory jsacggf =
+        IActivityClusterGraphGeneratorFactory jsacggf =
                 new JobSpecificationActivityClusterGraphGeneratorFactory(jobSpec);
         return distributeJob(jsacggf);
     }
@@ -206,15 +206,14 @@
     @Override
     public JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags)
             throws Exception {
-        JobSpecificationActivityClusterGraphGeneratorFactory jsacggf = new JobSpecificationActivityClusterGraphGeneratorFactory(
-                jobSpec);
+        IActivityClusterGraphGeneratorFactory jsacggf =
+                new JobSpecificationActivityClusterGraphGeneratorFactory(jobSpec);
         return startJob(deploymentId, jsacggf, jobFlags);
     }
 
     @Override
     public JobId startJob(DeploymentId deploymentId, IActivityClusterGraphGeneratorFactory acggf,
-            EnumSet<JobFlag> jobFlags)
-            throws Exception {
+            EnumSet<JobFlag> jobFlags) throws Exception {
         return hci.startJob(deploymentId, JavaSerializationUtils.serialize(acggf), jobFlags);
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
index c8e4cf8..df693b2 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.api.context;
 
 import java.io.Serializable;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
@@ -26,6 +27,7 @@
 import org.apache.hyracks.api.deployment.DeploymentId;
 import org.apache.hyracks.api.io.IWorkspaceFileFactory;
 import org.apache.hyracks.api.job.IOperatorEnvironment;
+import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.profiling.counters.ICounterContext;
 import org.apache.hyracks.api.resources.IDeallocatableRegistry;
 
@@ -48,4 +50,6 @@
     void setSharedObject(Object object);
 
     Object getSharedObject();
+
+    Set<JobFlag> getJobFlags();
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/EnforceFrameWriter.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/EnforceFrameWriter.java
new file mode 100644
index 0000000..7dd50f1
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/EnforceFrameWriter.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.dataflow;
+
+import java.nio.ByteBuffer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class EnforceFrameWriter implements IFrameWriter {
+
+    private static final Logger LOGGER = Logger.getLogger(EnforceFrameWriter.class.getName());
+    private final IFrameWriter writer;
+    private boolean failed = false;
+    private Throwable failure = null;
+
+    protected EnforceFrameWriter(IFrameWriter writer) {
+        this.writer = writer;
+    }
+
+    @Override
+    public final void open() throws HyracksDataException {
+        try {
+            failed = true;
+            writer.open();
+            failed = false;
+        } catch (Throwable th) {
+            failure = th;
+            throw th;
+        }
+    }
+
+    @Override
+    public final void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        if (failed) {
+            HyracksDataException hde = HyracksDataException.create(ErrorCode.NEXT_FRAME_ON_FAILED_WRITER);
+            hde.addSuppressed(failure);
+            LOGGER.log(Level.WARNING, "nextFrame called on a failed writer", hde);
+            throw hde;
+        }
+        try {
+            failed = true;
+            writer.nextFrame(buffer);
+            failed = false;
+        } catch (Throwable th) {
+            failure = th;
+            throw th;
+        }
+    }
+
+    @Override
+    public final void flush() throws HyracksDataException {
+        if (failed) {
+            HyracksDataException hde = HyracksDataException.create(ErrorCode.FLUSH_ON_FAILED_WRITER);
+            hde.addSuppressed(failure);
+            LOGGER.log(Level.WARNING, "flush called on a failed writer", hde);
+            throw hde;
+        }
+        try {
+            failed = true;
+            writer.flush();
+            failed = false;
+        } catch (Throwable th) {
+            failure = th;
+            throw th;
+        }
+    }
+
+    @Override
+    public final void fail() throws HyracksDataException {
+        failed = true;
+        if (failure == null) {
+            failure = new Exception("Failed signal from upstream");
+        }
+        writer.fail();
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        writer.close();
+    }
+
+    public static IFrameWriter enforce(IFrameWriter writer) {
+        return writer instanceof EnforceFrameWriter ? writer : new EnforceFrameWriter(writer);
+    }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorDescriptor.java
index 9148d6b..fc46b9d 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorDescriptor.java
@@ -20,12 +20,12 @@
 
 import java.io.Serializable;
 
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.hyracks.api.application.ICCServiceContext;
 import org.apache.hyracks.api.constraints.IConstraintAcceptor;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
 /**
  * Descriptor for operators in Hyracks.
  *
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorNodePushable.java
index f6c201e..82433e8 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorNodePushable.java
@@ -23,16 +23,15 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public interface IOperatorNodePushable {
-    public void initialize() throws HyracksDataException;
+    void initialize() throws HyracksDataException;
 
-    public void deinitialize() throws HyracksDataException;
+    void deinitialize() throws HyracksDataException;
 
-    public int getInputArity();
+    int getInputArity();
 
-    public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc)
-            throws HyracksDataException;
+    void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) throws HyracksDataException;
 
-    public IFrameWriter getInputFrameWriter(int index);
+    IFrameWriter getInputFrameWriter(int index);
 
-    public String getDisplayName();
+    String getDisplayName();
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index 3c70dba..92fe096 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -91,6 +91,8 @@
     public static final int RESOURCE_DOES_NOT_EXIST = 55;
     public static final int DISK_COMPONENT_SCAN_NOT_ALLOWED_FOR_SECONDARY_INDEX = 56;
     public static final int CANNOT_FIND_MATTER_TUPLE_FOR_ANTI_MATTER_TUPLE = 57;
+    public static final int NEXT_FRAME_ON_FAILED_WRITER = 58;
+    public static final int FLUSH_ON_FAILED_WRITER = 59;
 
     // Compilation error codes.
     public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java
index a33c6c9a..7225cd4 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java
@@ -19,5 +19,6 @@
 package org.apache.hyracks.api.job;
 
 public enum JobFlag {
-    PROFILE_RUNTIME
+    PROFILE_RUNTIME,
+    ENFORCE_CONTRACT
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/ActivityClusterGraphRewriter.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/ActivityClusterGraphRewriter.java
index 38e7fbe..21fb985 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/ActivityClusterGraphRewriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/ActivityClusterGraphRewriter.java
@@ -29,7 +29,6 @@
 import java.util.Set;
 
 import org.apache.commons.lang3.tuple.Pair;
-
 import org.apache.hyracks.api.dataflow.ActivityId;
 import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
 import org.apache.hyracks.api.dataflow.IActivity;
@@ -50,7 +49,8 @@
  * @author yingyib
  */
 public class ActivityClusterGraphRewriter {
-    private static String ONE_TO_ONE_CONNECTOR = "org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor";
+    private static String ONE_TO_ONE_CONNECTOR =
+            "org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor";
 
     /**
      * rewrite an activity cluster graph to eliminate
@@ -82,22 +82,22 @@
     private void rewriteInterActivityCluster(ActivityCluster ac,
             Map<IActivity, SuperActivity> invertedActivitySuperActivityMap) {
         Map<ActivityId, Set<ActivityId>> blocked2BlockerMap = ac.getBlocked2BlockerMap();
-        Map<ActivityId, ActivityId> invertedAid2SuperAidMap = new HashMap<ActivityId, ActivityId>();
+        Map<ActivityId, ActivityId> invertedAid2SuperAidMap = new HashMap<>();
         for (Entry<IActivity, SuperActivity> entry : invertedActivitySuperActivityMap.entrySet()) {
             invertedAid2SuperAidMap.put(entry.getKey().getActivityId(), entry.getValue().getActivityId());
         }
-        Map<ActivityId, Set<ActivityId>> replacedBlocked2BlockerMap = new HashMap<ActivityId, Set<ActivityId>>();
+        Map<ActivityId, Set<ActivityId>> replacedBlocked2BlockerMap = new HashMap<>();
         for (Entry<ActivityId, Set<ActivityId>> entry : blocked2BlockerMap.entrySet()) {
             ActivityId blocked = entry.getKey();
             ActivityId replacedBlocked = invertedAid2SuperAidMap.get(blocked);
             Set<ActivityId> blockers = entry.getValue();
             Set<ActivityId> replacedBlockers = null;
             if (blockers != null) {
-                replacedBlockers = new HashSet<ActivityId>();
+                replacedBlockers = new HashSet<>();
                 for (ActivityId blocker : blockers) {
                     replacedBlockers.add(invertedAid2SuperAidMap.get(blocker));
-                    ActivityCluster dependingAc = ac.getActivityClusterGraph().getActivityMap()
-                            .get(invertedAid2SuperAidMap.get(blocker));
+                    ActivityCluster dependingAc =
+                            ac.getActivityClusterGraph().getActivityMap().get(invertedAid2SuperAidMap.get(blocker));
                     if (!ac.getDependencies().contains(dependingAc)) {
                         ac.getDependencies().add(dependingAc);
                     }
@@ -128,12 +128,12 @@
         Map<ActivityId, IActivity> activities = ac.getActivityMap();
         Map<ActivityId, List<IConnectorDescriptor>> activityInputMap = ac.getActivityInputMap();
         Map<ActivityId, List<IConnectorDescriptor>> activityOutputMap = ac.getActivityOutputMap();
-        Map<ConnectorDescriptorId, Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> connectorActivityMap = ac
-                .getConnectorActivityMap();
+        Map<ConnectorDescriptorId, Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> connectorActivityMap =
+                ac.getConnectorActivityMap();
         ActivityClusterGraph acg = ac.getActivityClusterGraph();
-        Map<ActivityId, IActivity> startActivities = new HashMap<ActivityId, IActivity>();
+        Map<ActivityId, IActivity> startActivities = new HashMap<>();
         Map<ActivityId, SuperActivity> superActivities = new HashMap<ActivityId, SuperActivity>();
-        Map<ActivityId, Queue<IActivity>> toBeExpendedMap = new HashMap<ActivityId, Queue<IActivity>>();
+        Map<ActivityId, Queue<IActivity>> toBeExpendedMap = new HashMap<>();
 
         /**
          * Build the initial super activities
@@ -185,8 +185,8 @@
                 List<IConnectorDescriptor> outputConnectors = activityOutputMap.get(expendingActivity.getActivityId());
                 if (outputConnectors != null) {
                     for (IConnectorDescriptor outputConn : outputConnectors) {
-                        Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>> endPoints = connectorActivityMap
-                                .get(outputConn.getConnectorId());
+                        Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>> endPoints =
+                                connectorActivityMap.get(outputConn.getConnectorId());
                         IActivity newActivity = endPoints.getRight().getLeft();
                         SuperActivity existingSuperActivity = invertedActivitySuperActivityMap.get(newActivity);
                         if (outputConn.getClass().getName().contains(ONE_TO_ONE_CONNECTOR)) {
@@ -343,7 +343,7 @@
         SuperActivity superActivity = new SuperActivity(acg.getActivityClusterGraph(), acg.getId(), activityId);
         superActivities.put(activityId, superActivity);
         superActivity.addActivity(activity);
-        Queue<IActivity> toBeExpended = new LinkedList<IActivity>();
+        Queue<IActivity> toBeExpended = new LinkedList<>();
         toBeExpended.add(activity);
         toBeExpendedMap.put(activityId, toBeExpended);
         invertedActivitySuperActivityMap.put(activity, superActivity);
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivity.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivity.java
index 28e098f..476a744 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivity.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivity.java
@@ -25,7 +25,6 @@
 import java.util.Map.Entry;
 
 import org.apache.commons.lang3.tuple.Pair;
-
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.ActivityId;
 import org.apache.hyracks.api.dataflow.IActivity;
@@ -58,7 +57,7 @@
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             final IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
             throws HyracksDataException {
-        final Map<ActivityId, IActivity> startActivities = new HashMap<ActivityId, IActivity>();
+        final Map<ActivityId, IActivity> startActivities = new HashMap<>();
         Map<ActivityId, IActivity> activities = getActivityMap();
         for (Entry<ActivityId, IActivity> entry : activities.entrySet()) {
             /**
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index 314bf8b..7fdf106 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -35,12 +35,14 @@
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.ActivityId;
+import org.apache.hyracks.api.dataflow.EnforceFrameWriter;
 import org.apache.hyracks.api.dataflow.IActivity;
 import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobFlag;
 
 /**
  * The runtime of a SuperActivity, which internally executes a DAG of one-to-one
@@ -90,18 +92,18 @@
     private void init() throws HyracksDataException {
         Queue<Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> childQueue = new LinkedList<>();
         List<IConnectorDescriptor> outputConnectors;
-
+        final boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT);
         /*
          * Set up the source operators
          */
         for (Entry<ActivityId, IActivity> entry : startActivities.entrySet()) {
-            IOperatorNodePushable opPushable = entry.getValue().createPushRuntime(ctx, recordDescProvider, partition,
-                    nPartitions);
+            IOperatorNodePushable opPushable =
+                    entry.getValue().createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
             operatorNodePushablesBFSOrder.add(opPushable);
             operatorNodePushables.put(entry.getKey(), opPushable);
             inputArity += opPushable.getInputArity();
-            outputConnectors = MapUtils.getObject(parent.getActivityOutputMap(), entry.getKey(),
-                    Collections.emptyList());
+            outputConnectors =
+                    MapUtils.getObject(parent.getActivityOutputMap(), entry.getKey(), Collections.emptyList());
             for (IConnectorDescriptor conn : outputConnectors) {
                 childQueue.add(parent.getConnectorActivityMap().get(conn.getConnectorId()));
             }
@@ -131,7 +133,9 @@
             /*
              * construct the dataflow connection from a producer to a consumer
              */
-            sourceOp.setOutputFrameWriter(outputChannel, destOp.getInputFrameWriter(inputChannel),
+            IFrameWriter writer = destOp.getInputFrameWriter(inputChannel);
+            writer = enforce ? EnforceFrameWriter.enforce(writer) : writer;
+            sourceOp.setOutputFrameWriter(outputChannel, writer,
                     recordDescProvider.getInputRecordDescriptor(destId, inputChannel));
 
             /*
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 604b534..8add407 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -76,4 +76,7 @@
 55 = Resource does not exist for %1$s
 56 = LSM disk component scan is not allowed for a secondary index
 57 = Couldn't find the matter tuple for anti-matter tuple in the primary index
+58 = nextFrame called on a failed writer
+59 = flush called on a failed writer
+
 10000 = The given rule collection %1$s is not an instance of the List class.
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/ActivityClusterPlanner.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/ActivityClusterPlanner.java
index 57b8c50..3f46316 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/ActivityClusterPlanner.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/ActivityClusterPlanner.java
@@ -48,9 +48,9 @@
 import org.apache.hyracks.control.cc.job.ActivityClusterPlan;
 import org.apache.hyracks.control.cc.job.ActivityPlan;
 import org.apache.hyracks.control.cc.job.JobRun;
-import org.apache.hyracks.control.cc.job.Task;
 import org.apache.hyracks.control.cc.job.TaskCluster;
 import org.apache.hyracks.control.cc.job.TaskClusterId;
+import org.apache.hyracks.control.cc.job.Task;
 
 class ActivityClusterPlanner {
     private static final Logger LOGGER = Logger.getLogger(ActivityClusterPlanner.class.getName());
@@ -102,12 +102,10 @@
                     ActivityCluster dAC = ac.getActivityClusterGraph().getActivityMap().get(danId);
                     ActivityClusterPlan dACP = jobRun.getActivityClusterPlanMap().get(dAC.getId());
                     assert dACP != null : "IllegalStateEncountered: Dependent AC is being planned without a plan for "
-                            + "dependency AC: Encountered no plan for ActivityID "
-                            + danId;
+                            + "dependency AC: Encountered no plan for ActivityID " + danId;
                     Task[] dATasks = dACP.getActivityPlanMap().get(danId).getTasks();
                     assert dATasks != null : "IllegalStateEncountered: Dependent AC is being planned without a plan for"
-                            + " dependency AC: Encountered no plan for ActivityID "
-                            + danId;
+                            + " dependency AC: Encountered no plan for ActivityID " + danId;
                     assert dATasks.length == tasks.length : "Dependency activity partitioned differently from "
                             + "dependent: " + dATasks.length + " != " + tasks.length;
                     Task dTask = dATasks[i];
@@ -125,8 +123,8 @@
     private TaskCluster[] computeTaskClusters(ActivityCluster ac, JobRun jobRun,
             Map<ActivityId, ActivityPlan> activityPlanMap) {
         Set<ActivityId> activities = ac.getActivityMap().keySet();
-        Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> taskConnectivity = computeTaskConnectivity(jobRun,
-                activityPlanMap, activities);
+        Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> taskConnectivity =
+                computeTaskConnectivity(jobRun, activityPlanMap, activities);
 
         TaskCluster[] taskClusters = ac.getActivityClusterGraph().isUseConnectorPolicyForScheduling()
                 ? buildConnectorPolicyAwareTaskClusters(ac, activityPlanMap, taskConnectivity)
@@ -139,13 +137,13 @@
                 List<Pair<TaskId, ConnectorDescriptorId>> cInfoList = taskConnectivity.get(tid);
                 if (cInfoList != null) {
                     for (Pair<TaskId, ConnectorDescriptorId> p : cInfoList) {
-                        Task targetTS = activityPlanMap.get(p.getLeft().getActivityId()).getTasks()[p.getLeft()
-                                .getPartition()];
+                        Task targetTS =
+                                activityPlanMap.get(p.getLeft().getActivityId()).getTasks()[p.getLeft().getPartition()];
                         TaskCluster targetTC = targetTS.getTaskCluster();
                         if (targetTC != tc) {
                             ConnectorDescriptorId cdId = p.getRight();
-                            PartitionId pid = new PartitionId(jobRun.getJobId(), cdId, tid.getPartition(), p.getLeft()
-                                    .getPartition());
+                            PartitionId pid = new PartitionId(jobRun.getJobId(), cdId, tid.getPartition(),
+                                    p.getLeft().getPartition());
                             tc.getProducedPartitions().add(pid);
                             targetTC.getRequiredPartitions().add(pid);
                             partitionProducingTaskClusterMap.put(pid, tc);
@@ -170,8 +168,8 @@
             Task[] tasks = ap.getTasks();
             taskStates.addAll(Arrays.asList(tasks));
         }
-        TaskCluster tc = new TaskCluster(new TaskClusterId(ac.getId(), 0), ac, taskStates.toArray(new Task[taskStates
-                .size()]));
+        TaskCluster tc = new TaskCluster(new TaskClusterId(ac.getId(), 0), ac,
+                taskStates.toArray(new Task[taskStates.size()]));
         for (Task t : tc.getTasks()) {
             t.setTaskCluster(tc);
         }
@@ -209,8 +207,8 @@
                 }
                 for (int i = 0; i < nProducers; ++i) {
                     c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
-                    List<Pair<TaskId, ConnectorDescriptorId>> cInfoList = taskConnectivity
-                            .get(ac1TaskStates[i].getTaskId());
+                    List<Pair<TaskId, ConnectorDescriptorId>> cInfoList =
+                            taskConnectivity.get(ac1TaskStates[i].getTaskId());
                     if (cInfoList == null) {
                         cInfoList = new ArrayList<>();
                         taskConnectivity.put(ac1TaskStates[i].getTaskId(), cInfoList);
@@ -358,9 +356,9 @@
 
                 int[] fanouts = new int[nProducers];
                 if (c.allProducersToAllConsumers()) {
-                        for (int i = 0; i < nProducers; ++i) {
-                            fanouts[i] = nConsumers;
-                        }
+                    for (int i = 0; i < nProducers; ++i) {
+                        fanouts[i] = nConsumers;
+                    }
                 } else {
                     for (int i = 0; i < nProducers; ++i) {
                         c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
@@ -402,8 +400,8 @@
                 throw new HyracksException("No value found for " + lv);
             }
             if (!(value instanceof Number)) {
-                throw new HyracksException("Unexpected type of value bound to " + lv + ": " + value.getClass() + "("
-                        + value + ")");
+                throw new HyracksException(
+                        "Unexpected type of value bound to " + lv + ": " + value.getClass() + "(" + value + ")");
             }
             int nParts = ((Number) value).intValue();
             if (nParts <= 0) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
index 55a7a82..95a6d9b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
@@ -20,7 +20,6 @@
 
 import java.io.PrintWriter;
 import java.io.StringWriter;
-import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -116,10 +115,10 @@
     }
 
     //Run a Pre-distributed job by passing the JobId
-    public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, JobId jobId,
+    public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, JobId jobId, Set<JobFlag> jobFlags,
             PreDistributedJobDescriptor distributedJobDescriptor)
             throws HyracksException {
-        this(deploymentId, jobId, EnumSet.noneOf(JobFlag.class),
+        this(deploymentId, jobId, jobFlags,
                 distributedJobDescriptor.getJobSpecification(), distributedJobDescriptor.getActivityClusterGraph());
         Set<Constraint> constaints = distributedJobDescriptor.getActivityClusterGraphConstraints();
         this.scheduler = new JobExecutor(ccs, this, constaints, true);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
index 2dbb631..e083d2a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
@@ -68,7 +68,7 @@
                 run = new JobRun(ccs, deploymentId, jobId, acggf, acgg, jobFlags);
             } else {
                 //ActivityClusterGraph has already been distributed
-                run = new JobRun(ccs, deploymentId, jobId,
+                run = new JobRun(ccs, deploymentId, jobId, jobFlags,
                         ccs.getPreDistributedJobStore().getDistributedJobDescriptor(jobId));
             }
             jobManager.add(run);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
index f83df3a..cbc6146 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.control.common.controllers;
 
+import static org.apache.hyracks.control.common.config.OptionTypes.BOOLEAN;
 import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER;
 import static org.apache.hyracks.control.common.config.OptionTypes.LONG;
 import static org.apache.hyracks.control.common.config.OptionTypes.STRING;
@@ -64,7 +65,8 @@
         CLUSTER_TOPOLOGY(STRING),
         JOB_QUEUE_CLASS(STRING, "org.apache.hyracks.control.cc.scheduler.FIFOJobQueue"),
         JOB_QUEUE_CAPACITY(INTEGER, 4096),
-        JOB_MANAGER_CLASS(STRING, "org.apache.hyracks.control.cc.job.JobManager");
+        JOB_MANAGER_CLASS(STRING, "org.apache.hyracks.control.cc.job.JobManager"),
+        ENFORCE_FRAME_WRITER_PROTOCOL(BOOLEAN, false);
 
         private final IOptionType parser;
         private Object defaultValue;
@@ -156,6 +158,9 @@
                     return "The maximum number of jobs to queue before rejecting new jobs";
                 case JOB_MANAGER_CLASS:
                     return "Specify the implementation class name for the job manager";
+                case ENFORCE_FRAME_WRITER_PROTOCOL:
+                    return "A flag indicating if runtime should enforce frame writer protocol and detect "
+                            + "bad behaving operators";
                 default:
                     throw new IllegalStateException("NYI: " + this);
             }
@@ -357,4 +362,12 @@
     public int getJobQueueCapacity() {
         return getAppConfig().getInt(Option.JOB_QUEUE_CAPACITY);
     }
+
+    public boolean getEnforceFrameWriterProtocol() {
+        return getAppConfig().getBoolean(Option.ENFORCE_FRAME_WRITER_PROTOCOL);
+    }
+
+    public void setEnforceFrameWriterProtocol(boolean enforce) {
+        configManager.set(Option.ENFORCE_FRAME_WRITER_PROTOCOL, enforce);
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
index 86cc19e..caa3210 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
@@ -65,8 +65,10 @@
         MESSAGING_PUBLIC_ADDRESS(STRING, PUBLIC_ADDRESS),
         MESSAGING_PUBLIC_PORT(INTEGER, MESSAGING_LISTEN_PORT),
         CLUSTER_CONNECT_RETRIES(INTEGER, 5),
-        IODEVICES(STRING_ARRAY, appConfig -> new String[] {
-                FileUtil.joinPath(appConfig.getString(ControllerConfig.Option.DEFAULT_DIR), "iodevice") },
+        IODEVICES(
+                STRING_ARRAY,
+                appConfig -> new String[] {
+                        FileUtil.joinPath(appConfig.getString(ControllerConfig.Option.DEFAULT_DIR), "iodevice") },
                 "<value of " + ControllerConfig.Option.DEFAULT_DIR.cmdline() + ">/iodevice"),
         NET_THREAD_COUNT(INTEGER, 1),
         NET_BUFFER_COUNT(INTEGER, 1),
@@ -97,7 +99,7 @@
         }
 
         <T> Option(IOptionType<T> parser, Function<IApplicationConfig, T> defaultValue,
-                   String defaultValueDescription) {
+                String defaultValueDescription) {
             this.parser = parser;
             this.defaultValue = defaultValue;
             this.defaultValueDescription = defaultValueDescription;
@@ -252,6 +254,7 @@
         return configManager;
     }
 
+    @Override
     public IApplicationConfig getAppConfig() {
         return appConfig;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 7224b49..2ab9d9d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -48,6 +48,7 @@
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.api.io.IWorkspaceFileFactory;
 import org.apache.hyracks.api.job.IOperatorEnvironment;
+import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.profiling.counters.ICounter;
 import org.apache.hyracks.api.job.profiling.counters.ICounterContext;
 import org.apache.hyracks.api.partitions.PartitionId;
@@ -98,9 +99,13 @@
 
     private Object sharedObject;
 
-    public Task(Joblet joblet, TaskAttemptId taskId, String displayName, ExecutorService executor,
-            NodeControllerService ncs, List<List<PartitionChannel>> inputChannelsFromConnectors) {
+    private final Set<JobFlag> jobFlags;
+
+    public Task(Joblet joblet, Set<JobFlag> jobFlags, TaskAttemptId taskId, String displayName,
+            ExecutorService executor, NodeControllerService ncs,
+            List<List<PartitionChannel>> inputChannelsFromConnectors) {
         this.joblet = joblet;
+        this.jobFlags = jobFlags;
         this.taskAttemptId = taskId;
         this.displayName = displayName;
         this.executorService = executor;
@@ -411,4 +416,9 @@
     public Object getSharedObject() {
         return sharedObject;
     }
+
+    @Override
+    public Set<JobFlag> getJobFlags() {
+        return jobFlags;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
index 02e8051..95f3e83 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
@@ -37,6 +37,7 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.ActivityId;
 import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
+import org.apache.hyracks.api.dataflow.EnforceFrameWriter;
 import org.apache.hyracks.api.dataflow.IActivity;
 import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
@@ -131,12 +132,10 @@
                 }
                 final int partition = tid.getPartition();
                 List<IConnectorDescriptor> inputs = ac.getActivityInputMap().get(aid);
-                task = new Task(joblet, taId, han.getClass().getName(), ncs.getExecutor(), ncs,
+                task = new Task(joblet, flags, taId, han.getClass().getName(), ncs.getExecutor(), ncs,
                         createInputChannels(td, inputs));
                 IOperatorNodePushable operator = han.createPushRuntime(task, rdp, partition, td.getPartitionCount());
-
                 List<IPartitionCollector> collectors = new ArrayList<>();
-
                 if (inputs != null) {
                     for (int i = 0; i < inputs.size(); ++i) {
                         IConnectorDescriptor conn = inputs.get(i);
@@ -145,26 +144,28 @@
                             LOGGER.info("input: " + i + ": " + conn.getConnectorId());
                         }
                         RecordDescriptor recordDesc = ac.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
-                        IPartitionCollector collector = createPartitionCollector(td, partition, task, i, conn,
-                                recordDesc, cPolicy);
+                        IPartitionCollector collector =
+                                createPartitionCollector(td, partition, task, i, conn, recordDesc, cPolicy);
                         collectors.add(collector);
                     }
                 }
                 List<IConnectorDescriptor> outputs = ac.getActivityOutputMap().get(aid);
                 if (outputs != null) {
+                    final boolean enforce = flags.contains(JobFlag.ENFORCE_CONTRACT);
                     for (int i = 0; i < outputs.size(); ++i) {
                         final IConnectorDescriptor conn = outputs.get(i);
                         RecordDescriptor recordDesc = ac.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
                         IConnectorPolicy cPolicy = connectorPoliciesMap.get(conn.getConnectorId());
 
-                        IPartitionWriterFactory pwFactory = createPartitionWriterFactory(task, cPolicy, jobId, conn,
-                                partition, taId, flags);
+                        IPartitionWriterFactory pwFactory =
+                                createPartitionWriterFactory(task, cPolicy, jobId, conn, partition, taId, flags);
 
                         if (LOGGER.isLoggable(Level.INFO)) {
                             LOGGER.info("output: " + i + ": " + conn.getConnectorId());
                         }
-                        IFrameWriter writer = conn.createPartitioner(task, recordDesc, pwFactory, partition, td
-                                .getPartitionCount(), td.getOutputPartitionCounts()[i]);
+                        IFrameWriter writer = conn.createPartitioner(task, recordDesc, pwFactory, partition,
+                                td.getPartitionCount(), td.getOutputPartitionCounts()[i]);
+                        writer = enforce ? EnforceFrameWriter.enforce(writer) : writer;
                         operator.setOutputFrameWriter(i, writer, recordDesc);
                     }
                 }
@@ -203,11 +204,11 @@
     private IPartitionCollector createPartitionCollector(TaskAttemptDescriptor td, final int partition, Task task,
             int i, IConnectorDescriptor conn, RecordDescriptor recordDesc, IConnectorPolicy cPolicy)
             throws HyracksDataException {
-        IPartitionCollector collector = conn.createPartitionCollector(task, recordDesc, partition, td
-                .getInputPartitionCounts()[i], td.getPartitionCount());
+        IPartitionCollector collector = conn.createPartitionCollector(task, recordDesc, partition,
+                td.getInputPartitionCounts()[i], td.getPartitionCount());
         if (cPolicy.materializeOnReceiveSide()) {
-            return new ReceiveSideMaterializingCollector(task, ncs.getPartitionManager(), collector, task
-                    .getTaskAttemptId(), ncs.getExecutor());
+            return new ReceiveSideMaterializingCollector(task, ncs.getPartitionManager(), collector,
+                    task.getTaskAttemptId(), ncs.getExecutor());
         } else {
             return collector;
         }
@@ -222,8 +223,9 @@
                 factory = new IPartitionWriterFactory() {
                     @Override
                     public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
-                        return new MaterializedPartitionWriter(ctx, ncs.getPartitionManager(), new PartitionId(jobId,
-                                conn.getConnectorId(), senderIndex, receiverIndex), taId, ncs.getExecutor());
+                        return new MaterializedPartitionWriter(ctx, ncs.getPartitionManager(),
+                                new PartitionId(jobId, conn.getConnectorId(), senderIndex, receiverIndex), taId,
+                                ncs.getExecutor());
                     }
                 };
             } else {
@@ -231,9 +233,9 @@
 
                     @Override
                     public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
-                        return new MaterializingPipelinedPartition(ctx, ncs.getPartitionManager(), new PartitionId(
-                                jobId, conn.getConnectorId(), senderIndex, receiverIndex), taId, ncs
-                                        .getExecutor());
+                        return new MaterializingPipelinedPartition(ctx, ncs.getPartitionManager(),
+                                new PartitionId(jobId, conn.getConnectorId(), senderIndex, receiverIndex), taId,
+                                ncs.getExecutor());
                     }
                 };
             }
@@ -241,8 +243,8 @@
             factory = new IPartitionWriterFactory() {
                 @Override
                 public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
-                    return new PipelinedPartition(ctx, ncs.getPartitionManager(), new PartitionId(jobId, conn
-                            .getConnectorId(), senderIndex, receiverIndex), taId);
+                    return new PipelinedPartition(ctx, ncs.getPartitionManager(),
+                            new PartitionId(jobId, conn.getConnectorId(), senderIndex, receiverIndex), taId);
                 }
             };
         }
@@ -272,11 +274,14 @@
                 if (inputAddresses[i] != null) {
                     for (int j = 0; j < inputAddresses[i].length; j++) {
                         NetworkAddress networkAddress = inputAddresses[i][j];
-                        PartitionId pid = new PartitionId(jobId, inputs.get(i).getConnectorId(), j, td
-                                .getTaskAttemptId().getTaskId().getPartition());
-                        PartitionChannel channel = new PartitionChannel(pid, new NetworkInputChannel(ncs
-                                .getNetworkManager(), new InetSocketAddress(InetAddress.getByAddress(networkAddress
-                                        .lookupIpAddress()), networkAddress.getPort()), pid, 5));
+                        PartitionId pid = new PartitionId(jobId, inputs.get(i).getConnectorId(), j,
+                                td.getTaskAttemptId().getTaskId().getPartition());
+                        PartitionChannel channel = new PartitionChannel(pid,
+                                new NetworkInputChannel(ncs.getNetworkManager(),
+                                        new InetSocketAddress(
+                                                InetAddress.getByAddress(networkAddress.lookupIpAddress()),
+                                                networkAddress.getPort()),
+                                        pid, 5));
                         channels.add(channel);
                     }
                 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TaskUtil.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TaskUtil.java
index 4a2021f..8472479 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TaskUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TaskUtil.java
@@ -58,7 +58,7 @@
      * @param ctx
      * @param object
      */
-    public static void putInSharedMap(String key, Object object, IHyracksTaskContext ctx) {
+    public static void put(String key, Object object, IHyracksTaskContext ctx) {
         TaskUtil.getSharedMap(ctx, true).put(key, object);
     }
 
@@ -74,4 +74,18 @@
         Map<String, Object> sharedMap = TaskUtil.getSharedMap(ctx, false);
         return sharedMap == null ? null : (T) sharedMap.get(key);
     }
+
+    /**
+     * get a <T> object from the shared map of the task, return the default if not found
+     *
+     * @param key
+     * @param ctx
+     * @param defaultValue
+     *
+     * @return the value associated with the key if found, defaultValue otherwise
+     */
+    public static <T> T get(String key, IHyracksTaskContext ctx, T defaultValue) {
+        T value = get(key, ctx);
+        return value == null ? defaultValue : value;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
index a83273a..e338961 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
@@ -18,8 +18,6 @@
  */
 package org.apache.hyracks.dataflow.std.base;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.hyracks.api.application.ICCServiceContext;
 import org.apache.hyracks.api.constraints.IConstraintAcceptor;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
@@ -27,6 +25,9 @@
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
 public abstract class AbstractOperatorDescriptor implements IOperatorDescriptor {
     private static final long serialVersionUID = 1L;
 
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index 5de272a..8b9fe00 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -110,8 +110,7 @@
     private int[] probePSizeInTups;
 
     public OptimizedHybridHashJoin(IHyracksTaskContext ctx, int memSizeInFrames, int numOfPartitions,
-            String probeRelName,
-            String buildRelName, int[] probeKeys, int[] buildKeys, IBinaryComparator[] comparators,
+            String probeRelName, String buildRelName, int[] probeKeys, int[] buildKeys, IBinaryComparator[] comparators,
             RecordDescriptor probeRd, RecordDescriptor buildRd, ITuplePartitionComputer probeHpc,
             ITuplePartitionComputer buildHpc, IPredicateEvaluator predEval, boolean isLeftOuter,
             IMissingWriterFactory[] nullWriterFactories1) {
@@ -223,7 +222,8 @@
         return writer;
     }
 
-    public void closeBuild() throws HyracksDataException {
+    public void closeBuild(boolean isFailed) throws HyracksDataException {
+        // TODO: Should use the isFailed flag to simply cleanup in case of failure
         // Flushes the remaining chunks of the all spilled partitions to the disk.
         closeAllSpilledPartitions(SIDE.BUILD);
 
@@ -259,8 +259,8 @@
                 break;
         }
         try {
-            for (int pid = spilledStatus.nextSetBit(0); pid >= 0
-                    && pid < numOfPartitions; pid = spilledStatus.nextSetBit(pid + 1)) {
+            for (int pid = spilledStatus.nextSetBit(0); pid >= 0 && pid < numOfPartitions; pid =
+                    spilledStatus.nextSetBit(pid + 1)) {
                 if (bufferManager.getNumTuples(pid) > 0) {
                     bufferManager.flushPartition(pid, getSpillWriterOrCreateNewOneIfNotExist(pid, whichSide));
                     bufferManager.clearPartition(pid);
@@ -293,16 +293,15 @@
 
         // For partitions in main memory, we deduct their size from the free space.
         int inMemTupCount = 0;
-        for (int p = spilledStatus.nextClearBit(0); p >= 0
-                && p < numOfPartitions; p = spilledStatus.nextClearBit(p + 1)) {
+        for (int p = spilledStatus.nextClearBit(0); p >= 0 && p < numOfPartitions; p =
+                spilledStatus.nextClearBit(p + 1)) {
             freeSpace -= bufferManager.getPhysicalSize(p);
             inMemTupCount += buildPSizeInTups[p];
         }
 
         // Calculates the expected hash table size for the given number of tuples in main memory
         // and deducts it from the free space.
-        long hashTableByteSizeForInMemTuples = SerializableHashTable.getExpectedTableByteSize(inMemTupCount,
-                frameSize);
+        long hashTableByteSizeForInMemTuples = SerializableHashTable.getExpectedTableByteSize(inMemTupCount, frameSize);
         freeSpace -= hashTableByteSizeForInMemTuples;
 
         // In the case where free space is less than zero after considering the hash table size,
@@ -317,8 +316,9 @@
             int pidToSpill = selectSinglePartitionToSpill(freeSpace, inMemTupCount, frameSize);
             if (pidToSpill >= 0) {
                 // There is a suitable one. We spill that partition to the disk.
-                long hashTableSizeDecrease = -SerializableHashTable.calculateByteSizeDeltaForTableSizeChange(
-                        inMemTupCount, -buildPSizeInTups[pidToSpill], frameSize);
+                long hashTableSizeDecrease =
+                        -SerializableHashTable.calculateByteSizeDeltaForTableSizeChange(inMemTupCount,
+                                -buildPSizeInTups[pidToSpill], frameSize);
                 freeSpace = freeSpace + bufferManager.getPhysicalSize(pidToSpill) + hashTableSizeDecrease;
                 inMemTupCount -= buildPSizeInTups[pidToSpill];
                 spillPartition(pidToSpill);
@@ -327,8 +327,8 @@
             } else {
                 // There is no single suitable partition. So, we need to spill multiple partitions to the disk
                 // in order to accommodate the hash table.
-                for (int p = spilledStatus.nextClearBit(0); p >= 0
-                        && p < numOfPartitions; p = spilledStatus.nextClearBit(p + 1)) {
+                for (int p = spilledStatus.nextClearBit(0); p >= 0 && p < numOfPartitions; p =
+                        spilledStatus.nextClearBit(p + 1)) {
                     int spaceToBeReturned = bufferManager.getPhysicalSize(p);
                     int numberOfTuplesToBeSpilled = buildPSizeInTups[p];
                     if (spaceToBeReturned == 0 || numberOfTuplesToBeSpilled == 0) {
@@ -340,9 +340,9 @@
                     // Since the number of tuples in memory has been decreased,
                     // the hash table size will be decreased, too.
                     // We put minus since the method returns a negative value to represent a newly reclaimed space.
-                    long expectedHashTableSizeDecrease = -SerializableHashTable
-                            .calculateByteSizeDeltaForTableSizeChange(inMemTupCount, -numberOfTuplesToBeSpilled,
-                                    frameSize);
+                    long expectedHashTableSizeDecrease =
+                            -SerializableHashTable.calculateByteSizeDeltaForTableSizeChange(inMemTupCount,
+                                    -numberOfTuplesToBeSpilled, frameSize);
                     freeSpace = freeSpace + spaceToBeReturned + expectedHashTableSizeDecrease;
                     // Adjusts the hash table size
                     inMemTupCount -= numberOfTuplesToBeSpilled;
@@ -356,8 +356,7 @@
         // If more partitions have been spilled to the disk, calculate the expected hash table size again
         // before bringing some partitions to main memory.
         if (moreSpilled) {
-            hashTableByteSizeForInMemTuples = SerializableHashTable.getExpectedTableByteSize(inMemTupCount,
-                    frameSize);
+            hashTableByteSizeForInMemTuples = SerializableHashTable.getExpectedTableByteSize(inMemTupCount, frameSize);
         }
 
         // Brings back some partitions if there is enough free space.
@@ -387,8 +386,8 @@
         long minSpaceAfterSpill = (long) memSizeInFrames * frameSize;
         int minSpaceAfterSpillPartID = -1;
 
-        for (int p = spilledStatus.nextClearBit(0); p >= 0
-                && p < numOfPartitions; p = spilledStatus.nextClearBit(p + 1)) {
+        for (int p = spilledStatus.nextClearBit(0); p >= 0 && p < numOfPartitions; p =
+                spilledStatus.nextClearBit(p + 1)) {
             if (buildPSizeInTups[p] == 0 || bufferManager.getPhysicalSize(p) == 0) {
                 continue;
             }
@@ -408,8 +407,8 @@
     }
 
     private int selectPartitionsToReload(long freeSpace, int pid, int inMemTupCount) {
-        for (int i = spilledStatus.nextSetBit(pid); i >= 0
-                && i < numOfPartitions; i = spilledStatus.nextSetBit(i + 1)) {
+        for (int i = spilledStatus.nextSetBit(pid); i >= 0 && i < numOfPartitions; i =
+                spilledStatus.nextSetBit(i + 1)) {
             int spilledTupleCount = buildPSizeInTups[i];
             // Expected hash table size increase after reloading this partition
             long expectedHashTableByteSizeIncrease = SerializableHashTable.calculateByteSizeDeltaForTableSizeChange(
@@ -632,8 +631,8 @@
         buf.append("(A) Spilled partitions" + "\n");
         int spilledTupleCount = 0;
         int spilledPartByteSize = 0;
-        for (int pid = spilledStatus.nextSetBit(0); pid >= 0
-                && pid < numOfPartitions; pid = spilledStatus.nextSetBit(pid + 1)) {
+        for (int pid = spilledStatus.nextSetBit(0); pid >= 0 && pid < numOfPartitions; pid =
+                spilledStatus.nextSetBit(pid + 1)) {
             if (whichSide == SIDE.BUILD) {
                 spilledTupleCount += buildPSizeInTups[pid];
                 spilledPartByteSize += buildRFWriters[pid].getFileSize();
@@ -653,8 +652,8 @@
         buf.append("(B) In-memory partitions" + "\n");
         int inMemoryTupleCount = 0;
         int inMemoryPartByteSize = 0;
-        for (int pid = spilledStatus.nextClearBit(0); pid >= 0
-                && pid < numOfPartitions; pid = spilledStatus.nextClearBit(pid + 1)) {
+        for (int pid = spilledStatus.nextClearBit(0); pid >= 0 && pid < numOfPartitions; pid =
+                spilledStatus.nextClearBit(pid + 1)) {
             if (whichSide == SIDE.BUILD) {
                 inMemoryTupleCount += buildPSizeInTups[pid];
                 inMemoryPartByteSize += bufferManager.getPhysicalSize(pid);
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index c44c583..e7dfaaf 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -268,17 +268,19 @@
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
             }
 
-            final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ? null
-                    : predEvaluatorFactory.createPredicateEvaluator());
+            final IPredicateEvaluator predEvaluator =
+                    predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator();
 
-            IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
+            return new AbstractUnaryInputSinkOperatorNodePushable() {
                 private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(
                         ctx.getJobletContext().getJobId(), new TaskId(getActivityId(), partition));
 
-                ITuplePartitionComputer probeHpc = new FieldHashPartitionComputerFamily(probeKeys,
-                        hashFunctionGeneratorFactories).createPartitioner(0);
-                ITuplePartitionComputer buildHpc = new FieldHashPartitionComputerFamily(buildKeys,
-                        hashFunctionGeneratorFactories).createPartitioner(0);
+                ITuplePartitionComputer probeHpc =
+                        new FieldHashPartitionComputerFamily(probeKeys, hashFunctionGeneratorFactories)
+                                .createPartitioner(0);
+                ITuplePartitionComputer buildHpc =
+                        new FieldHashPartitionComputerFamily(buildKeys, hashFunctionGeneratorFactories)
+                                .createPartitioner(0);
                 boolean isFailed = false;
 
                 @Override
@@ -287,8 +289,8 @@
                         throw new HyracksDataException("Not enough memory is assigend for Hybrid Hash Join.");
                     }
                     state.memForJoin = memSizeInFrames - 2;
-                    state.numOfPartitions = getNumberOfPartitions(state.memForJoin, inputsize0, fudgeFactor,
-                            nPartitions);
+                    state.numOfPartitions =
+                            getNumberOfPartitions(state.memForJoin, inputsize0, fudgeFactor, nPartitions);
                     state.hybridHJ = new OptimizedHybridHashJoin(ctx, state.memForJoin, state.numOfPartitions,
                             PROBE_REL, BUILD_REL, probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc,
                             buildHpc, predEvaluator, isLeftOuter, nonMatchWriterFactories);
@@ -307,7 +309,7 @@
 
                 @Override
                 public void close() throws HyracksDataException {
-                    state.hybridHJ.closeBuild();
+                    state.hybridHJ.closeBuild(isFailed);
                     if (isFailed) {
                         state.hybridHJ.clearBuildTempFiles();
                     } else {
@@ -324,7 +326,6 @@
                 }
 
             };
-            return op;
         }
     }
 
@@ -355,21 +356,21 @@
             final RecordDescriptor buildRd = recordDescProvider.getInputRecordDescriptor(buildAid, 0);
             final RecordDescriptor probeRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
             final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
-            final ITuplePairComparator nljComparatorProbe2Build = tuplePairComparatorFactoryProbe2Build
-                    .createTuplePairComparator(ctx);
-            final ITuplePairComparator nljComparatorBuild2Probe = tuplePairComparatorFactoryBuild2Probe
-                    .createTuplePairComparator(ctx);
-            final IPredicateEvaluator predEvaluator = predEvaluatorFactory == null ? null
-                    : predEvaluatorFactory.createPredicateEvaluator();
+            final ITuplePairComparator nljComparatorProbe2Build =
+                    tuplePairComparatorFactoryProbe2Build.createTuplePairComparator(ctx);
+            final ITuplePairComparator nljComparatorBuild2Probe =
+                    tuplePairComparatorFactoryBuild2Probe.createTuplePairComparator(ctx);
+            final IPredicateEvaluator predEvaluator =
+                    predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator();
 
             for (int i = 0; i < comparatorFactories.length; i++) {
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
             }
 
-            final IMissingWriter[] nonMatchWriter = isLeftOuter ? new IMissingWriter[nonMatchWriterFactories.length]
-                    : null;
-            final ArrayTupleBuilder nullTupleBuild = isLeftOuter ? new ArrayTupleBuilder(buildRd.getFieldCount())
-                    : null;
+            final IMissingWriter[] nonMatchWriter =
+                    isLeftOuter ? new IMissingWriter[nonMatchWriterFactories.length] : null;
+            final ArrayTupleBuilder nullTupleBuild =
+                    isLeftOuter ? new ArrayTupleBuilder(buildRd.getFieldCount()) : null;
             if (isLeftOuter) {
                 DataOutput out = nullTupleBuild.getDataOutput();
                 for (int i = 0; i < nonMatchWriterFactories.length; i++) {
@@ -379,7 +380,7 @@
                 }
             }
 
-            IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+            return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
                 private BuildAndPartitionTaskState state;
                 private IFrame rPartbuff = new VSizeFrame(ctx);
 
@@ -432,8 +433,8 @@
                         }
                         BitSet partitionStatus = state.hybridHJ.getPartitionStatus();
                         rPartbuff.reset();
-                        for (int pid = partitionStatus.nextSetBit(0); pid >= 0; pid = partitionStatus
-                                .nextSetBit(pid + 1)) {
+                        for (int pid = partitionStatus.nextSetBit(0); pid >= 0; pid =
+                                partitionStatus.nextSetBit(pid + 1)) {
                             RunFileReader bReader = state.hybridHJ.getBuildRFReader(pid);
                             RunFileReader pReader = state.hybridHJ.getProbeRFReader(pid);
 
@@ -474,10 +475,12 @@
                 //The buildSideReader should be always the original buildSideReader, so should the probeSideReader
                 private void joinPartitionPair(RunFileReader buildSideReader, RunFileReader probeSideReader,
                         int buildSizeInTuple, int probeSizeInTuple, int level) throws HyracksDataException {
-                    ITuplePartitionComputer probeHpc = new FieldHashPartitionComputerFamily(probeKeys,
-                            hashFunctionGeneratorFactories).createPartitioner(level);
-                    ITuplePartitionComputer buildHpc = new FieldHashPartitionComputerFamily(buildKeys,
-                            hashFunctionGeneratorFactories).createPartitioner(level);
+                    ITuplePartitionComputer probeHpc =
+                            new FieldHashPartitionComputerFamily(probeKeys, hashFunctionGeneratorFactories)
+                                    .createPartitioner(level);
+                    ITuplePartitionComputer buildHpc =
+                            new FieldHashPartitionComputerFamily(buildKeys, hashFunctionGeneratorFactories)
+                                    .createPartitioner(level);
 
                     int frameSize = ctx.getInitialFrameSize();
                     long buildPartSize = (long) Math.ceil((double) buildSideReader.getFileSize() / (double) frameSize);
@@ -492,10 +495,10 @@
                     }
 
                     // Calculate the expected hash table size for the both side.
-                    long expectedHashTableSizeForBuildInFrame = SerializableHashTable
-                            .getExpectedTableFrameCount(buildSizeInTuple, frameSize);
-                    long expectedHashTableSizeForProbeInFrame = SerializableHashTable
-                            .getExpectedTableFrameCount(probeSizeInTuple, frameSize);
+                    long expectedHashTableSizeForBuildInFrame =
+                            SerializableHashTable.getExpectedTableFrameCount(buildSizeInTuple, frameSize);
+                    long expectedHashTableSizeForProbeInFrame =
+                            SerializableHashTable.getExpectedTableFrameCount(probeSizeInTuple, frameSize);
 
                     //Apply in-Mem HJ if possible
                     if (!skipInMemoryHJ && ((buildPartSize + expectedHashTableSizeForBuildInFrame < state.memForJoin)
@@ -580,15 +583,19 @@
                     rHHj.setIsReversed(isReversed);
                     try {
                         buildSideReader.open();
+                        boolean isFailed = false;
                         try {
                             rHHj.initBuild();
                             rPartbuff.reset();
                             while (buildSideReader.nextFrame(rPartbuff)) {
                                 rHHj.build(rPartbuff.getBuffer());
                             }
+                        } catch (Throwable th) {
+                            isFailed = true;
+                            throw th;
                         } finally {
                             // Makes sure that files are always properly closed.
-                            rHHj.closeBuild();
+                            rHHj.closeBuild(isFailed);
                         }
                     } finally {
                         buildSideReader.close();
@@ -645,9 +652,8 @@
 
                         } else { //Case 2.1.2 - Switch to NLJ
                             if (LOGGER.isLoggable(Level.FINE)) {
-                                LOGGER.fine(
-                                        "\t\t>>>Case 2.1.2 - SWITCHED to NLJ RecursiveHHJ WITH "
-                                                + "(isLeftOuter || build<probe) - [Level " + level + "]");
+                                LOGGER.fine("\t\t>>>Case 2.1.2 - SWITCHED to NLJ RecursiveHHJ WITH "
+                                        + "(isLeftOuter || build<probe) - [Level " + level + "]");
                             }
                             for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
                                 RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
@@ -695,8 +701,8 @@
                             probeTupleAccessor.reset(rPartbuff.getBuffer());
                             for (int tid = 0; tid < probeTupleAccessor.getTupleCount(); tid++) {
                                 FrameUtils.appendConcatToWriter(writer, nullResultAppender, probeTupleAccessor, tid,
-                                    nullTupleBuild.getFieldEndOffsets(), nullTupleBuild.getByteArray(), 0,
-                                    nullTupleBuild.getSize());
+                                        nullTupleBuild.getFieldEndOffsets(), nullTupleBuild.getByteArray(), 0,
+                                        nullTupleBuild.getSize());
                             }
                         }
                         nullResultAppender.write(writer, true);
@@ -712,8 +718,8 @@
                     boolean isReversed = pKeys == OptimizedHybridHashJoinOperatorDescriptor.this.buildKeys
                             && bKeys == OptimizedHybridHashJoinOperatorDescriptor.this.probeKeys;
                     assert isLeftOuter ? !isReversed : true : "LeftOut Join can not reverse roles";
-                    IDeallocatableFramePool framePool = new DeallocatableFramePool(ctx,
-                            state.memForJoin * ctx.getInitialFrameSize());
+                    IDeallocatableFramePool framePool =
+                            new DeallocatableFramePool(ctx, state.memForJoin * ctx.getInitialFrameSize());
                     ISimpleFrameBufferManager bufferManager = new FramePoolBackedFrameBufferManager(framePool);
 
                     ISerializableTable table = new SerializableHashTable(tabSize, ctx, bufferManager);
@@ -776,11 +782,11 @@
                     // Hence the reverse relation is different.
                     boolean isReversed = outerRd == buildRd && innerRd == probeRd;
                     assert isLeftOuter ? !isReversed : true : "LeftOut Join can not reverse roles";
-                    ITuplePairComparator nljComptorOuterInner = isReversed ? nljComparatorBuild2Probe
-                            : nljComparatorProbe2Build;
-                    NestedLoopJoin nlj = new NestedLoopJoin(ctx, new FrameTupleAccessor(outerRd),
-                            new FrameTupleAccessor(innerRd), nljComptorOuterInner, memorySize, predEvaluator,
-                            isLeftOuter, nonMatchWriter);
+                    ITuplePairComparator nljComptorOuterInner =
+                            isReversed ? nljComparatorBuild2Probe : nljComparatorProbe2Build;
+                    NestedLoopJoin nlj =
+                            new NestedLoopJoin(ctx, new FrameTupleAccessor(outerRd), new FrameTupleAccessor(innerRd),
+                                    nljComptorOuterInner, memorySize, predEvaluator, isLeftOuter, nonMatchWriter);
                     nlj.setIsReversed(isReversed);
 
                     IFrame cacheBuff = new VSizeFrame(ctx);
@@ -814,7 +820,6 @@
                     }
                 }
             };
-            return op;
         }
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java
index 48e6b35..9ac9296 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java
@@ -18,15 +18,11 @@
  */
 package org.apache.hyracks.dataflow.std.misc;
 
-import java.nio.ByteBuffer;
-
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
 import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 
 public class NullSinkOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
     private static final long serialVersionUID = 1L;
@@ -38,22 +34,6 @@
     @Override
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
-        return new AbstractUnaryInputSinkOperatorNodePushable() {
-            @Override
-            public void open() throws HyracksDataException {
-            }
-
-            @Override
-            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-            }
-
-            @Override
-            public void close() throws HyracksDataException {
-            }
-
-            @Override
-            public void fail() throws HyracksDataException {
-            }
-        };
+        return new SinkOperatorNodePushable();
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorDescriptor.java
index fe64472..d987a35 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorDescriptor.java
@@ -19,15 +19,11 @@
 
 package org.apache.hyracks.dataflow.std.misc;
 
-import java.nio.ByteBuffer;
-
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
 import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 
 public class SinkOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
     private static final long serialVersionUID = 1L;
@@ -39,22 +35,6 @@
     @Override
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
-        return new AbstractUnaryInputSinkOperatorNodePushable() {
-            @Override
-            public void open() throws HyracksDataException {
-            }
-
-            @Override
-            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-            }
-
-            @Override
-            public void close() throws HyracksDataException {
-            }
-
-            @Override
-            public void fail() throws HyracksDataException {
-            }
-        };
+        return new SinkOperatorNodePushable();
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorNodePushable.java
new file mode 100644
index 0000000..85e1bb8
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorNodePushable.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.misc;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+
+public class SinkOperatorNodePushable extends AbstractUnaryInputSinkOperatorNodePushable {
+
+    @Override
+    public void open() throws HyracksDataException {
+        // Does nothing.
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        // Does nothing.
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        // Does nothing.
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        // Does nothing.
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
index f1a777b..d3120bb 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
@@ -21,7 +21,6 @@
 import java.io.IOException;
 import java.io.PrintStream;
 import java.nio.ByteBuffer;
-import java.util.logging.Logger;
 
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameWriter;
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java
index bcebb7d..bf2268d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java
@@ -63,6 +63,13 @@
         flushWriter.open();
         try {
             getSorter().flush(flushWriter);
+        } catch (Throwable th) {
+            try {
+                flushWriter.fail();
+            } catch (Throwable secondFailure) {
+                th.addSuppressed(secondFailure);
+            }
+            throw th;
         } finally {
             flushWriter.close();
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
index c27e4ec..752e6e2 100644
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
@@ -27,7 +27,6 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
index 82fedb0..3845f2f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
@@ -86,23 +86,55 @@
 
     @Override
     public void close() throws HyracksDataException {
+        boolean failed = false;
+        Throwable closeException = null;
         try {
             // bulkloader can be null if an exception is thrown before it is initialized.
             if (bulkLoader != null) {
                 bulkLoader.end();
             }
         } catch (Throwable th) {
-            throw HyracksDataException.create(th);
+            failed = true;
+            closeException = th;
+            try {
+                writer.fail();
+            } catch (Throwable failFailure) {
+                closeException.addSuppressed(failFailure);
+            }
         } finally {
             if (index != null) {
                 // If index was opened!
                 try {
                     indexHelper.close();
+                } catch (Throwable th) {
+                    if (closeException == null) {
+                        closeException = th;
+                    } else {
+                        closeException.addSuppressed(th);
+                    }
+                    if (!failed) {
+                        try {
+                            writer.fail();
+                        } catch (Throwable failFailure) {
+                            closeException.addSuppressed(failFailure);
+                        }
+                    }
                 } finally {
-                    writer.close();
+                    try {
+                        writer.close();
+                    } catch (Throwable th) {
+                        if (closeException == null) {
+                            closeException = th;
+                        } else {
+                            closeException.addSuppressed(th);
+                        }
+                    }
                 }
             }
         }
+        if (closeException != null) { // NOSONAR false positive
+            throw HyracksDataException.create(closeException);
+        }
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
index 509607c..840ede4 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
@@ -78,6 +78,7 @@
     protected final boolean appendIndexFilter;
     protected ArrayTupleBuilder nonFilterTupleBuild;
     protected final ISearchOperationCallbackFactory searchCallbackFactory;
+    protected boolean failed = false;
 
     public IndexSearchOperatorNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc, int partition,
             int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, IIndexDataflowHelperFactory indexHelperFactory,
@@ -118,6 +119,7 @@
 
     @Override
     public void open() throws HyracksDataException {
+        failed = true;
         writer.open();
         indexHelper.open();
         index = indexHelper.getIndexInstance();
@@ -148,8 +150,9 @@
                 frameTuple = new FrameTupleReference();
             }
         } catch (Exception e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
+        failed = false;
     }
 
     protected void writeSearchResults(int tupleIndex) throws Exception {
@@ -183,6 +186,7 @@
 
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        failed = true;
         accessor.reset(buffer);
         int tupleCount = accessor.getTupleCount();
         try {
@@ -193,33 +197,44 @@
                 writeSearchResults(i);
             }
         } catch (Exception e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
+        failed = false;
     }
 
     @Override
     public void flush() throws HyracksDataException {
+        failed = true;
         appender.flush(writer);
+        failed = false;
     }
 
     @Override
     public void close() throws HyracksDataException {
-        HyracksDataException closeException = null;
+        Throwable closeException = null;
         if (index != null) {
             // if index == null, then the index open was not successful
-            try {
-                if (appender.getTupleCount() > 0) {
-                    appender.write(writer, true);
+            if (!failed) {
+                try {
+                    if (appender.getTupleCount() > 0) {
+                        appender.write(writer, true);
+                    }
+                } catch (Throwable th) {
+                    closeException = th;
+                    failed = true;
+                    try {
+                        writer.fail();
+                    } catch (Throwable failException) {
+                        closeException.addSuppressed(failException);
+                    }
                 }
-            } catch (Throwable th) {
-                closeException = new HyracksDataException(th);
             }
 
             try {
                 cursor.close();
             } catch (Throwable th) {
                 if (closeException == null) {
-                    closeException = new HyracksDataException(th);
+                    closeException = th;
                 } else {
                     closeException.addSuppressed(th);
                 }
@@ -228,7 +243,7 @@
                 indexHelper.close();
             } catch (Throwable th) {
                 if (closeException == null) {
-                    closeException = new HyracksDataException(th);
+                    closeException = th;
                 } else {
                     closeException.addSuppressed(th);
                 }
@@ -239,18 +254,19 @@
             writer.close();
         } catch (Throwable th) {
             if (closeException == null) {
-                closeException = new HyracksDataException(th);
+                closeException = th;
             } else {
                 closeException.addSuppressed(th);
             }
         }
         if (closeException != null) {
-            throw closeException;
+            throw HyracksDataException.create(closeException);
         }
     }
 
     @Override
     public void fail() throws HyracksDataException {
+        failed = true;
         writer.fail();
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
index 2171122..b100300 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
@@ -20,8 +20,10 @@
 
 import java.io.Serializable;
 import java.nio.ByteBuffer;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.hyracks.api.context.IHyracksJobletContext;
@@ -33,6 +35,7 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.profiling.counters.ICounterContext;
 import org.apache.hyracks.api.resources.IDeallocatable;
 import org.apache.hyracks.control.nc.io.WorkspaceFileFactory;
@@ -155,4 +158,9 @@
     public Object getSharedObject() {
         return sharedObject;
     }
+
+    @Override
+    public Set<JobFlag> getJobFlags() {
+        return EnumSet.noneOf(JobFlag.class);
+    }
 }