Support IFrameWriter contract check.

- add a instance-level flag for injecting operators to
  check IFrameWriter contract violations;
- check contract violations in runtime tests.

Change-Id: I9827b06f640858f27ec1bcca2a39991780bee3b1
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1618
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index 9d01d63..ba98f73 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -25,7 +25,6 @@
 import java.io.IOException;
 import java.net.Inet4Address;
 import java.util.ArrayList;
-import java.util.EnumSet;
 import java.util.List;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -42,9 +41,6 @@
 import org.apache.hyracks.api.application.INCApplication;
 import org.apache.hyracks.api.client.HyracksConnection;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.job.JobFlag;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.common.config.ConfigManager;
 import org.apache.hyracks.control.common.controllers.CCConfig;
@@ -131,6 +127,7 @@
         ccConfig.setClusterListenPort(DEFAULT_HYRACKS_CC_CLUSTER_PORT);
         ccConfig.setResultTTL(120000L);
         ccConfig.setResultSweepThreshold(1000L);
+        ccConfig.setEnforceFrameWriterProtocol(true);
         configManager.set(ControllerConfig.Option.DEFAULT_DIR, joinPath(getDefaultStoragePath(), "asterixdb"));
         return ccConfig;
     }
@@ -215,13 +212,6 @@
         }
     }
 
-    public void runJob(JobSpecification spec) throws Exception {
-        GlobalConfig.ASTERIX_LOGGER.info(spec.toJSON().toString());
-        JobId jobId = hcc.startJob(spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
-        GlobalConfig.ASTERIX_LOGGER.info(jobId.toString());
-        hcc.waitForCompletion(jobId);
-    }
-
     protected String getDefaultStoragePath() {
         return joinPath("target", "io", "dir");
     }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 80c05db..7ce9df6 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -28,6 +28,7 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
@@ -191,8 +192,10 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileSplit;
 import org.apache.hyracks.api.io.UnmanagedFileSplit;
+import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.control.common.controllers.CCConfig;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 
 /*
@@ -214,6 +217,7 @@
     protected final IRewriterFactory rewriterFactory;
     protected final IStorageComponentProvider componentProvider;
     protected final ExecutorService executorService;
+    protected final EnumSet<JobFlag> jobFlags = EnumSet.noneOf(JobFlag.class);
 
     public QueryTranslator(ICcApplicationContext appCtx, List<Statement> statements, SessionOutput output,
             ILangCompilationProvider compliationProvider, IStorageComponentProvider componentProvider,
@@ -228,6 +232,9 @@
         rewriterFactory = compliationProvider.getRewriterFactory();
         activeDataverse = MetadataBuiltinEntities.DEFAULT_DATAVERSE;
         this.executorService = executorService;
+        if (appCtx.getServiceContext().getAppConfig().getBoolean(CCConfig.Option.ENFORCE_FRAME_WRITER_PROTOCOL)) {
+            this.jobFlags.add(JobFlag.ENFORCE_CONTRACT);
+        }
     }
 
     public SessionOutput getSessionOutput() {
@@ -621,7 +628,7 @@
                 progress.setValue(ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA);
 
                 // #. runJob
-                JobUtils.runJob(hcc, jobSpec, true);
+                runJob(hcc, jobSpec);
 
                 // #. begin new metadataTxn
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
@@ -653,7 +660,7 @@
                     JobSpecification jobSpec = DatasetUtil.dropDatasetJobSpec(dataset, metadataProvider);
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                     bActiveTxn = false;
-                    JobUtils.runJob(hcc, jobSpec, true);
+                    runJob(hcc, jobSpec);
                 } catch (Exception e2) {
                     e.addSuppressed(e2);
                     if (bActiveTxn) {
@@ -900,7 +907,7 @@
                                 "Failed to create job spec for replicating Files Index For external dataset");
                     }
                     filesIndexReplicated = true;
-                    JobUtils.runJob(hcc, spec, true);
+                    runJob(hcc, spec);
                 }
             }
 
@@ -937,7 +944,7 @@
             progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
 
             // #. create the index artifact in NC.
-            JobUtils.runJob(hcc, spec, true);
+            runJob(hcc, spec);
 
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
             bActiveTxn = true;
@@ -948,7 +955,7 @@
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
 
-            JobUtils.runJob(hcc, spec, true);
+            runJob(hcc, spec);
 
             // #. begin new metadataTxn
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
@@ -986,7 +993,7 @@
                             ExternalIndexingOperations.buildDropFilesIndexJobSpec(metadataProvider, ds);
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                     bActiveTxn = false;
-                    JobUtils.runJob(hcc, jobSpec, true);
+                    runJob(hcc, jobSpec);
                 } catch (Exception e2) {
                     e.addSuppressed(e2);
                     if (bActiveTxn) {
@@ -1005,7 +1012,7 @@
                     JobSpecification jobSpec = IndexUtil.buildDropIndexJobSpec(index, metadataProvider, ds);
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                     bActiveTxn = false;
-                    JobUtils.runJob(hcc, jobSpec, true);
+                    runJob(hcc, jobSpec);
                 } catch (Exception e2) {
                     e.addSuppressed(e2);
                     if (bActiveTxn) {
@@ -1189,7 +1196,7 @@
             progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
 
             for (JobSpecification jobSpec : jobsToExecute) {
-                JobUtils.runJob(hcc, jobSpec, true);
+                runJob(hcc, jobSpec);
             }
 
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
@@ -1226,7 +1233,7 @@
                 // remove the all indexes in NC
                 try {
                     for (JobSpecification jobSpec : jobsToExecute) {
-                        JobUtils.runJob(hcc, jobSpec, true);
+                        runJob(hcc, jobSpec);
                     }
                 } catch (Exception e2) {
                     // do no throw exception since still the metadata needs to be compensated.
@@ -1405,7 +1412,7 @@
                 progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
 
                 for (JobSpecification jobSpec : jobsToExecute) {
-                    JobUtils.runJob(hcc, jobSpec, true);
+                    runJob(hcc, jobSpec);
                 }
 
                 // #. begin a new transaction
@@ -1466,7 +1473,7 @@
                 progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
 
                 for (JobSpecification jobSpec : jobsToExecute) {
-                    JobUtils.runJob(hcc, jobSpec, true);
+                    runJob(hcc, jobSpec);
                 }
 
                 // #. begin a new transaction
@@ -1496,7 +1503,7 @@
                 // remove the all indexes in NC
                 try {
                     for (JobSpecification jobSpec : jobsToExecute) {
-                        JobUtils.runJob(hcc, jobSpec, true);
+                        runJob(hcc, jobSpec);
                     }
                 } catch (Exception e2) {
                     // do no throw exception since still the metadata needs to be compensated.
@@ -1659,7 +1666,7 @@
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
             if (spec != null) {
-                JobUtils.runJob(hcc, spec, true);
+                runJob(hcc, spec);
             }
         } catch (Exception e) {
             if (bActiveTxn) {
@@ -1725,7 +1732,7 @@
                 if (jobSpec == null) {
                     return jobSpec;
                 }
-                JobUtils.runJob(hcc, jobSpec, true);
+                runJob(hcc, jobSpec);
             } finally {
                 locker.unlock();
             }
@@ -1753,7 +1760,7 @@
             bActiveTxn = false;
 
             if (jobSpec != null && !compileOnly) {
-                JobUtils.runJob(hcc, jobSpec, true);
+                runJob(hcc, jobSpec);
             }
             return jobSpec;
         } catch (Exception e) {
@@ -1935,7 +1942,7 @@
             } else {
                 JobSpecification spec = FeedOperations.buildRemoveFeedStorageJob(metadataProvider,
                         MetadataManager.INSTANCE.getFeed(mdTxnCtx, feedId.getDataverse(), feedId.getEntityName()));
-                JobUtils.runJob(hcc, spec, true);
+                runJob(hcc, spec);
                 MetadataManager.INSTANCE.dropFeed(mdTxnCtx, dataverseName, feedName);
             }
 
@@ -2022,6 +2029,9 @@
             activeEventHandler.registerListener(listener);
             IActiveEventSubscriber eventSubscriber = listener.subscribe(ActivityState.STARTED);
             feedJob.setProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, entityId);
+
+            // TODO(Yingyi): currently we do not check IFrameWriter protocol violations for Feed jobs.
+            // We will need to design general exception handling mechanism for feeds.
             JobUtils.runJob(hcc, feedJob,
                     Boolean.valueOf(metadataProvider.getConfig().get(StartFeedStatement.WAIT_FOR_COMPLETION)));
             eventSubscriber.sync();
@@ -2211,7 +2221,7 @@
 
             // #. run the jobs
             for (JobSpecification jobSpec : jobsToExecute) {
-                JobUtils.runJob(hcc, jobSpec, true);
+                runJob(hcc, jobSpec);
             }
         } catch (Exception e) {
             if (bActiveTxn) {
@@ -2300,14 +2310,14 @@
                 }
                 break;
             case IMMEDIATE:
-                createAndRunJob(hcc, null, compiler, locker, resultDelivery, id -> {
+                createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> {
                     final ResultReader resultReader = new ResultReader(hdc, id, resultSetId);
                     ResultUtil.printResults(appCtx, resultReader, sessionOutput, stats,
                             metadataProvider.findOutputRecordType());
                 }, clientContextId, ctx);
                 break;
             case DEFERRED:
-                createAndRunJob(hcc, null, compiler, locker, resultDelivery, id -> {
+                createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> {
                     ResultUtil.printResultHandle(sessionOutput, new ResultHandle(id, resultSetId));
                     if (outMetadata != null) {
                         outMetadata.getResultSets()
@@ -2325,7 +2335,7 @@
             ResultSetId resultSetId, MutableBoolean printed) {
         Mutable<JobId> jobId = new MutableObject<>(JobId.INVALID);
         try {
-            createAndRunJob(hcc, jobId, compiler, locker, resultDelivery, id -> {
+            createAndRunJob(hcc, jobFlags, jobId, compiler, locker, resultDelivery, id -> {
                 final ResultHandle handle = new ResultHandle(id, resultSetId);
                 ResultUtil.printStatus(sessionOutput, AbstractQueryApiServlet.ResultStatus.RUNNING);
                 ResultUtil.printResultHandle(sessionOutput, handle);
@@ -2353,16 +2363,20 @@
         }
     }
 
-    private static void createAndRunJob(IHyracksClientConnection hcc, Mutable<JobId> jId, IStatementCompiler compiler,
-            IMetadataLocker locker, ResultDelivery resultDelivery, IResultPrinter printer, String clientContextId,
-            IStatementExecutorContext ctx) throws Exception {
+    private void runJob(IHyracksClientConnection hcc, JobSpecification jobSpec) throws Exception {
+        JobUtils.runJob(hcc, jobSpec, jobFlags, true);
+    }
+
+    private static void createAndRunJob(IHyracksClientConnection hcc, EnumSet<JobFlag> jobFlags, Mutable<JobId> jId,
+            IStatementCompiler compiler, IMetadataLocker locker, ResultDelivery resultDelivery, IResultPrinter printer,
+            String clientContextId, IStatementExecutorContext ctx) throws Exception {
         locker.lock();
         try {
             final JobSpecification jobSpec = compiler.compile();
             if (jobSpec == null) {
                 return;
             }
-            final JobId jobId = JobUtils.runJob(hcc, jobSpec, false);
+            final JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
             if (ctx != null && clientContextId != null) {
                 ctx.put(clientContextId, jobId); // Adds the running job into the context.
             }
@@ -2507,14 +2521,14 @@
             transactionState = TransactionState.BEGIN;
 
             // run the files update job
-            JobUtils.runJob(hcc, spec, true);
+            runJob(hcc, spec);
 
             for (Index index : indexes) {
                 if (!ExternalIndexingOperations.isFileIndex(index)) {
                     spec = ExternalIndexingOperations.buildIndexUpdateOp(ds, index, metadataFiles, addedFiles,
                             appendedFiles, metadataProvider);
                     // run the files update job
-                    JobUtils.runJob(hcc, spec, true);
+                    runJob(hcc, spec);
                 }
             }
 
@@ -2533,7 +2547,7 @@
             bActiveTxn = false;
             transactionState = TransactionState.READY_TO_COMMIT;
             // We don't release the latch since this job is expected to be quick
-            JobUtils.runJob(hcc, spec, true);
+            runJob(hcc, spec);
             // Start a new metadata transaction to record the final state of the transaction
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
             metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -2602,7 +2616,7 @@
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                 bActiveTxn = false;
                 try {
-                    JobUtils.runJob(hcc, spec, true);
+                    runJob(hcc, spec);
                 } catch (Exception e2) {
                     // This should never happen -- fix throw illegal
                     e.addSuppressed(e2);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index f99793a..7d4b41d 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -205,7 +205,7 @@
                 NoOpOperationCallbackFactory.INSTANCE, filterFields, filterFields, false);
         BTreeSearchOperatorNodePushable searchOp =
                 searchOpDesc.createPushRuntime(ctx, primaryIndexInfo.getSearchRecordDescriptorProvider(), PARTITION, 1);
-        emptyTupleOp.setFrameWriter(0, searchOp,
+        emptyTupleOp.setOutputFrameWriter(0, searchOp,
                 primaryIndexInfo.getSearchRecordDescriptorProvider().getInputRecordDescriptor(null, 0));
         searchOp.setOutputFrameWriter(0, countOp, primaryIndexInfo.rDesc);
         return emptyTupleOp;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
index 5e3da5f..b80f8f8 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
@@ -40,6 +40,9 @@
 import org.apache.asterix.runtime.utils.CcApplicationContext;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.SessionOutput;
+import org.apache.hyracks.api.application.ICCServiceContext;
+import org.apache.hyracks.api.config.IApplicationConfig;
+import org.apache.hyracks.control.common.controllers.CCConfig;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -59,6 +62,11 @@
         ExternalProperties mockAsterixExternalProperties = mock(ExternalProperties.class);
         when(mockAsterixAppContextInfo.getExternalProperties()).thenReturn(mockAsterixExternalProperties);
         when(mockAsterixExternalProperties.getAPIServerPort()).thenReturn(19002);
+        ICCServiceContext mockServiceContext = mock(ICCServiceContext.class);
+        when(mockAsterixAppContextInfo.getServiceContext()).thenReturn(mockServiceContext);
+        IApplicationConfig mockApplicationConfig = mock(IApplicationConfig.class);
+        when(mockServiceContext.getAppConfig()).thenReturn(mockApplicationConfig);
+        when(mockApplicationConfig.getBoolean(CCConfig.Option.ENFORCE_FRAME_WRITER_PROTOCOL)).thenReturn(true);
 
         // Mocks AsterixClusterProperties.
         Cluster mockCluster = mock(Cluster.class);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java
index 41f9e67..cacbfbc 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java
@@ -19,7 +19,10 @@
 
 package org.apache.asterix.common.utils;
 
+import java.util.EnumSet;
+
 import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 
@@ -32,8 +35,13 @@
 
     public static JobId runJob(IHyracksClientConnection hcc, JobSpecification spec, boolean waitForCompletion)
             throws Exception {
+        return runJob(hcc, spec, EnumSet.noneOf(JobFlag.class), waitForCompletion);
+    }
+
+    public static JobId runJob(IHyracksClientConnection hcc, JobSpecification spec, EnumSet<JobFlag> jobFlags,
+            boolean waitForCompletion) throws Exception {
         spec.setMaxReattempts(0);
-        final JobId jobId = hcc.startJob(spec);
+        final JobId jobId = hcc.startJob(spec, jobFlags);
         if (waitForCompletion) {
             hcc.waitForCompletion(jobId);
         }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java
index 71acecf..d0b7b47 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java
@@ -21,28 +21,20 @@
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.commons.lang3.mutable.Mutable;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
 import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
 import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
-import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
-import org.apache.hyracks.algebricks.runtime.operators.base.SinkRuntimeFactory;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
 import org.apache.hyracks.dataflow.std.misc.SinkOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.union.UnionAllOperatorDescriptor;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
 
 public class SinkPOperator extends AbstractPhysicalOperator {
 
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/EnforcePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/EnforcePushRuntime.java
new file mode 100644
index 0000000..26002e6
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/EnforcePushRuntime.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.runtime.base;
+
+import org.apache.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.dataflow.EnforceFrameWriter;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class EnforcePushRuntime extends EnforceFrameWriter implements IPushRuntime {
+
+    private final IPushRuntime pushRuntime;
+
+    private EnforcePushRuntime(IPushRuntime pushRuntime) {
+        super(pushRuntime);
+        this.pushRuntime = pushRuntime;
+    }
+
+    @Override
+    public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+        pushRuntime.setOutputFrameWriter(index, writer, recordDesc);
+    }
+
+    @Override
+    public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
+        pushRuntime.setInputRecordDescriptor(index, recordDescriptor);
+    }
+
+    public static IPushRuntime enforce(IPushRuntime pushRuntime) {
+        return pushRuntime instanceof EnforcePushRuntime || pushRuntime instanceof NestedTupleSourceRuntime
+                ? pushRuntime : new EnforcePushRuntime(pushRuntime);
+    }
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntime.java
index 27d6900..de00697 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntime.java
@@ -22,7 +22,26 @@
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 
 public interface IPushRuntime extends IFrameWriter {
-    public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc);
 
-    public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor);
+    /**
+     * Sets the output frame writer for this writer.
+     *
+     * @param index,
+     *            the index of the output channel.
+     * @param writer,
+     *            the writer for writing output.
+     * @param recordDesc,
+     *            the output record descriptor.
+     */
+    void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc);
+
+    /**
+     * Sets the input record descriptor for this writer.
+     *
+     * @param index,
+     *            the index of the input channel.
+     * @param recordDescriptor,
+     *            the corresponding input record descriptor.
+     */
+    void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor);
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
index 42e5157..e99b61b 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
@@ -124,6 +124,7 @@
 
             @Override
             public void fail() throws HyracksDataException {
+                failed = true;
                 if (isOpen) {
                     writer.fail();
                 }
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
index b397f23..893aa61 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
@@ -21,6 +21,7 @@
 import java.nio.ByteBuffer;
 
 import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import org.apache.hyracks.algebricks.runtime.base.EnforcePushRuntime;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import org.apache.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime;
@@ -29,6 +30,7 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.std.group.AbstractAccumulatingAggregatorDescriptorFactory;
@@ -52,7 +54,6 @@
     @Override
     public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDesc,
             RecordDescriptor outRecordDescriptor, int[] keys, int[] partialKeys) throws HyracksDataException {
-
         final AggregatorOutput outputWriter = new AggregatorOutput(subplans, keyFieldIdx.length, decorFieldIdx.length);
         final NestedTupleSourceRuntime[] pipelines = new NestedTupleSourceRuntime[subplans.length];
         for (int i = 0; i < subplans.length; i++) {
@@ -91,8 +92,8 @@
             }
 
             @Override
-            public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor stateAccessor, int tIndex,
-                    AggregateState state) throws HyracksDataException {
+            public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor stateAccessor,
+                    int tIndex, AggregateState state) throws HyracksDataException {
                 for (int i = 0; i < pipelines.length; i++) {
                     outputWriter.setInputIdx(i);
                     pipelines[i].close();
@@ -144,9 +145,13 @@
         IFrameWriter start = writer;
         IPushRuntimeFactory[] runtimeFactories = subplan.getRuntimeFactories();
         RecordDescriptor[] recordDescriptors = subplan.getRecordDescriptors();
+        // should enforce protocol
+        boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT);
         for (int i = runtimeFactories.length - 1; i >= 0; i--) {
             IPushRuntime newRuntime = runtimeFactories[i].createPushRuntime(ctx);
-            newRuntime.setFrameWriter(0, start, recordDescriptors[i]);
+            newRuntime = enforce ? EnforcePushRuntime.enforce(newRuntime) : newRuntime;
+            start = enforce ? EnforcePushRuntime.enforce(start) : start;
+            newRuntime.setOutputFrameWriter(0, start, recordDescriptors[i]);
             if (i > 0) {
                 newRuntime.setInputRecordDescriptor(0, recordDescriptors[i - 1]);
             } else {
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
index 52245e1..8b8e320 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
@@ -21,6 +21,7 @@
 import java.nio.ByteBuffer;
 
 import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import org.apache.hyracks.algebricks.runtime.base.EnforcePushRuntime;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import org.apache.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime;
@@ -28,8 +29,10 @@
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.EnforceFrameWriter;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -58,11 +61,14 @@
     public IAggregatorDescriptor createAggregator(final IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
             RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults,
             final IFrameWriter writer) throws HyracksDataException {
-        final RunningAggregatorOutput outputWriter = new RunningAggregatorOutput(ctx, subplans, keyFieldIdx.length,
-                decorFieldIdx.length, writer);
+        final RunningAggregatorOutput outputWriter =
+                new RunningAggregatorOutput(ctx, subplans, keyFieldIdx.length, decorFieldIdx.length, writer);
+        // should enforce protocol
+        boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT);
+        IFrameWriter enforcedWriter = enforce ? EnforceFrameWriter.enforce(outputWriter) : outputWriter;
         final NestedTupleSourceRuntime[] pipelines = new NestedTupleSourceRuntime[subplans.length];
         for (int i = 0; i < subplans.length; i++) {
-                pipelines[i] = (NestedTupleSourceRuntime) assemblePipeline(subplans[i], outputWriter, ctx);
+            pipelines[i] = (NestedTupleSourceRuntime) assemblePipeline(subplans[i], enforcedWriter, ctx);
         }
 
         final ArrayTupleBuilder gbyTb = outputWriter.getGroupByTupleBuilder();
@@ -136,13 +142,17 @@
 
     private IFrameWriter assemblePipeline(AlgebricksPipeline subplan, IFrameWriter writer, IHyracksTaskContext ctx)
             throws HyracksDataException {
+        // should enforce protocol
+        boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT);
         // plug the operators
         IFrameWriter start = writer;
         IPushRuntimeFactory[] runtimeFactories = subplan.getRuntimeFactories();
         RecordDescriptor[] recordDescriptors = subplan.getRecordDescriptors();
         for (int i = runtimeFactories.length - 1; i >= 0; i--) {
             IPushRuntime newRuntime = runtimeFactories[i].createPushRuntime(ctx);
-            newRuntime.setFrameWriter(0, start, recordDescriptors[i]);
+            newRuntime = enforce ? EnforcePushRuntime.enforce(newRuntime) : newRuntime;
+            start = enforce ? EnforceFrameWriter.enforce(start) : start;
+            newRuntime.setOutputFrameWriter(0, start, recordDescriptors[i]);
             if (i > 0) {
                 newRuntime.setInputRecordDescriptor(0, recordDescriptors[i - 1]);
             } else {
@@ -206,8 +216,9 @@
                 int start = 0;
                 int offset = 0;
                 for (int i = 0; i < fieldEnds.length; i++) {
-                    if (i > 0)
+                    if (i > 0) {
                         start = fieldEnds[i - 1];
+                    }
                     offset = fieldEnds[i] - start;
                     tb.addField(data, start, offset);
                 }
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java
index 33e7c73..5cced8d 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java
@@ -29,7 +29,7 @@
     protected boolean failed;
 
     @Override
-    public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+    public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
         this.writer = writer;
         this.outputRecordDesc = recordDesc;
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSinkPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSinkPushRuntime.java
index e430461..d47199d 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSinkPushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSinkPushRuntime.java
@@ -26,7 +26,7 @@
     protected RecordDescriptor inputRecordDesc;
 
     @Override
-    public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+    public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
         throw new IllegalStateException();
     }
 
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java
index b7707d4..35563e0 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java
@@ -27,20 +27,29 @@
 
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        // nextFrame will never be called on this runtime
         throw new UnsupportedOperationException();
     }
 
     @Override
     public void close() throws HyracksDataException {
+        // close is a no op since this operator completes operating in open()
     }
 
     @Override
     public void fail() throws HyracksDataException {
-        writer.fail();
+        // fail is a no op since if a failure happened, the operator would've already called fail() on downstream
+    }
+
+    @Override
+    public void flush() throws HyracksDataException {
+        // flush will never be called on this runtime
+        throw new UnsupportedOperationException();
     }
 
     @Override
     public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
+        // setInputRecordDescriptor will never be called on this runtime since it has no input
         throw new UnsupportedOperationException();
     }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
index 1294614..f6ebf19 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
@@ -112,6 +112,7 @@
         return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
 
             private IFrameWriter startOfPipeline;
+            private boolean opened = false;
 
             @Override
             public void open() throws HyracksDataException {
@@ -124,6 +125,7 @@
                             pipelineInputRecordDescriptor, pipelineOutputRecordDescriptor);
                     startOfPipeline = pa.assemblePipeline(writer, ctx);
                 }
+                opened = true;
                 startOfPipeline.open();
             }
 
@@ -134,12 +136,16 @@
 
             @Override
             public void close() throws HyracksDataException {
-                startOfPipeline.close();
+                if (opened) {
+                    startOfPipeline.close();
+                }
             }
 
             @Override
             public void fail() throws HyracksDataException {
-                startOfPipeline.fail();
+                if (opened) {
+                    startOfPipeline.fail();
+                }
             }
 
             @Override
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
index 03e2aaf..e1081e0 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
@@ -19,11 +19,14 @@
 package org.apache.hyracks.algebricks.runtime.operators.meta;
 
 import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import org.apache.hyracks.algebricks.runtime.base.EnforcePushRuntime;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.EnforceFrameWriter;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobFlag;
 
 public class PipelineAssembler {
 
@@ -44,18 +47,21 @@
         this.outputArity = outputArity;
     }
 
-    public IFrameWriter assemblePipeline(IFrameWriter writer, IHyracksTaskContext ctx) throws
-            HyracksDataException {
+    public IFrameWriter assemblePipeline(IFrameWriter writer, IHyracksTaskContext ctx) throws HyracksDataException {
+        // should enforce protocol
+        boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT);
         // plug the operators
         IFrameWriter start = writer;// this.writer;
         for (int i = pipeline.getRuntimeFactories().length - 1; i >= 0; i--) {
             IPushRuntime newRuntime = pipeline.getRuntimeFactories()[i].createPushRuntime(ctx);
+            newRuntime = enforce ? EnforcePushRuntime.enforce(newRuntime) : newRuntime;
+            start = enforce ? EnforceFrameWriter.enforce(start) : start;
             if (i == pipeline.getRuntimeFactories().length - 1) {
                 if (outputArity == 1) {
-                    newRuntime.setFrameWriter(0, start, pipelineOutputRecordDescriptor);
+                    newRuntime.setOutputFrameWriter(0, start, pipelineOutputRecordDescriptor);
                 }
             } else {
-                newRuntime.setFrameWriter(0, start, pipeline.getRecordDescriptors()[i]);
+                newRuntime.setOutputFrameWriter(0, start, pipeline.getRecordDescriptors()[i]);
             }
             if (i > 0) {
                 newRuntime.setInputRecordDescriptor(0, pipeline.getRecordDescriptors()[i - 1]);
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
index 3e30f73..925ff93 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
@@ -79,11 +79,6 @@
             }
 
             @Override
-            public void fail() throws HyracksDataException {
-                writer.fail();
-            }
-
-            @Override
             public void close() throws HyracksDataException {
                 try {
                     frameSorter.sort();
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
index 2b7c2da..3ccceed 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
@@ -56,6 +56,11 @@
             }
 
             @Override
+            public void fail() throws HyracksDataException {
+                writer.fail();
+            }
+
+            @Override
             public void close() throws HyracksDataException {
                 writer.close();
             }
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
index e123adf..496679f 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
@@ -65,11 +65,6 @@
         }
 
         @Override
-        public void fail() throws HyracksDataException {
-            writer.fail();
-        }
-
-        @Override
         public void flush() throws HyracksDataException {
             writer.flush();
         }
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
index 38fe7d1..33b7725 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
@@ -118,7 +118,7 @@
             @Override
             public void fail() throws HyracksDataException {
                 if (isOpen) {
-                    super.fail();
+                    writer.fail();
                 }
             }
 
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
index ebf3d3a..55146e2 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntime.java
@@ -81,6 +81,7 @@
 
     @Override
     public void fail() throws HyracksDataException {
+        // fail() is a no op. in close we will cleanup
     }
 
     @Override
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
index 0f57fd7..171544d 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
@@ -85,7 +85,6 @@
             private IScalarEvaluator eval;
             private IMissingWriter missingWriter = null;
             private ArrayTupleBuilder missingTupleBuilder = null;
-            private boolean isOpen = false;
 
             @Override
             public void open() throws HyracksDataException {
@@ -93,7 +92,6 @@
                     initAccessAppendFieldRef(ctx);
                     eval = cond.createScalarEvaluator(ctx);
                 }
-                isOpen = true;
                 writer.open();
 
                 //prepare nullTupleBuilder
@@ -107,20 +105,11 @@
             }
 
             @Override
-            public void fail() throws HyracksDataException {
-                if (isOpen) {
-                    super.fail();
-                }
-            }
-
-            @Override
             public void close() throws HyracksDataException {
-                if (isOpen) {
-                    try {
-                        flushIfNotFailed();
-                    } finally {
-                        writer.close();
-                    }
+                try {
+                    flushIfNotFailed();
+                } finally {
+                    writer.close();
                 }
             }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
index aa9232e..e2868ae 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -177,7 +177,7 @@
         }
 
         public StartJobFunction(JobId jobId) {
-            this(null, null, null, jobId);
+            this(null, null, EnumSet.noneOf(JobFlag.class), jobId);
         }
 
         public StartJobFunction(byte[] acggfBytes, EnumSet<JobFlag> jobFlags) {
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
index ad54110..75cbf61 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
@@ -102,14 +102,14 @@
 
     @Override
     public JobId startJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
-        JobSpecificationActivityClusterGraphGeneratorFactory jsacggf = new JobSpecificationActivityClusterGraphGeneratorFactory(
-                jobSpec);
+        IActivityClusterGraphGeneratorFactory jsacggf =
+                new JobSpecificationActivityClusterGraphGeneratorFactory(jobSpec);
         return startJob(jsacggf, jobFlags);
     }
 
     @Override
     public JobId distributeJob(JobSpecification jobSpec) throws Exception {
-        JobSpecificationActivityClusterGraphGeneratorFactory jsacggf =
+        IActivityClusterGraphGeneratorFactory jsacggf =
                 new JobSpecificationActivityClusterGraphGeneratorFactory(jobSpec);
         return distributeJob(jsacggf);
     }
@@ -212,15 +212,14 @@
     @Override
     public JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags)
             throws Exception {
-        JobSpecificationActivityClusterGraphGeneratorFactory jsacggf = new JobSpecificationActivityClusterGraphGeneratorFactory(
-                jobSpec);
+        IActivityClusterGraphGeneratorFactory jsacggf =
+                new JobSpecificationActivityClusterGraphGeneratorFactory(jobSpec);
         return startJob(deploymentId, jsacggf, jobFlags);
     }
 
     @Override
     public JobId startJob(DeploymentId deploymentId, IActivityClusterGraphGeneratorFactory acggf,
-            EnumSet<JobFlag> jobFlags)
-            throws Exception {
+            EnumSet<JobFlag> jobFlags) throws Exception {
         return hci.startJob(deploymentId, JavaSerializationUtils.serialize(acggf), jobFlags);
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
index c8e4cf8..df693b2 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.api.context;
 
 import java.io.Serializable;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
@@ -26,6 +27,7 @@
 import org.apache.hyracks.api.deployment.DeploymentId;
 import org.apache.hyracks.api.io.IWorkspaceFileFactory;
 import org.apache.hyracks.api.job.IOperatorEnvironment;
+import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.profiling.counters.ICounterContext;
 import org.apache.hyracks.api.resources.IDeallocatableRegistry;
 
@@ -48,4 +50,6 @@
     void setSharedObject(Object object);
 
     Object getSharedObject();
+
+    Set<JobFlag> getJobFlags();
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/EnforceFrameWriter.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/EnforceFrameWriter.java
new file mode 100644
index 0000000..bf54e01
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/EnforceFrameWriter.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.dataflow;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class EnforceFrameWriter implements IFrameWriter {
+
+    // The downstream data consumer of this writer.
+    private final IFrameWriter writer;
+
+    // A flag that indicates whether the data consumer of this writer has failed.
+    private boolean downstreamFailed = false;
+
+    // A flag that indicates whether the the data producer of this writer has called fail() for this writer.
+    // There could be two cases:
+    // CASE 1: the downstream of this writer fails and the exception is propagated to the source operator, which
+    //         cascades to the fail() of this writer;
+    // CASE 2: the failure happens in the upstream of this writer and the source operator cascades to the fail()
+    //         of this writer.
+    private boolean failCalledByUpstream = false;
+
+    // A flag that indicates whether the downstream of this writer is open.
+    private boolean downstreamOpen = false;
+
+    protected EnforceFrameWriter(IFrameWriter writer) {
+        this.writer = writer;
+    }
+
+    @Override
+    public final void open() throws HyracksDataException {
+        try {
+            if (downstreamOpen) {
+                throw HyracksDataException.create(ErrorCode.OPEN_ON_OPEN_WRITER);
+            }
+            if (downstreamFailed || failCalledByUpstream) {
+                throw HyracksDataException.create(ErrorCode.OPEN_ON_FAILED_WRITER);
+            }
+            writer.open();
+            downstreamOpen = true;
+        } catch (Throwable th) {
+            downstreamFailed = true;
+            throw th;
+        }
+    }
+
+    @Override
+    public final void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        if (!downstreamOpen) {
+            throw HyracksDataException.create(ErrorCode.NEXT_FRAME_ON_CLOSED_WRITER);
+        }
+        if (downstreamFailed || failCalledByUpstream) {
+            throw HyracksDataException.create(ErrorCode.NEXT_FRAME_ON_FAILED_WRITER);
+        }
+        try {
+            writer.nextFrame(buffer);
+        } catch (Throwable th) {
+            downstreamFailed = true;
+            throw th;
+        }
+    }
+
+    @Override
+    public final void flush() throws HyracksDataException {
+        if (!downstreamOpen) {
+            throw HyracksDataException.create(ErrorCode.FLUSH_ON_CLOSED_WRITER);
+        }
+        if (downstreamFailed || failCalledByUpstream) {
+            throw HyracksDataException.create(ErrorCode.FLUSH_ON_FAILED_WRITER);
+        }
+        try {
+            writer.flush();
+        } catch (Throwable th) {
+            downstreamFailed = true;
+            throw th;
+        }
+    }
+
+    @Override
+    public final void fail() throws HyracksDataException {
+        writer.fail();
+        if (failCalledByUpstream) {
+            throw HyracksDataException.create(ErrorCode.FAIL_ON_FAILED_WRITER);
+        }
+        failCalledByUpstream = true;
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        writer.close();
+        downstreamOpen = false;
+        if (downstreamFailed && !failCalledByUpstream) {
+            throw HyracksDataException.create(ErrorCode.MISSED_FAIL_CALL);
+        }
+    }
+
+    public static IFrameWriter enforce(IFrameWriter writer) {
+        return writer instanceof EnforceFrameWriter ? writer : new EnforceFrameWriter(writer);
+    }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorNodePushable.java
index f6c201e..82433e8 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorNodePushable.java
@@ -23,16 +23,15 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public interface IOperatorNodePushable {
-    public void initialize() throws HyracksDataException;
+    void initialize() throws HyracksDataException;
 
-    public void deinitialize() throws HyracksDataException;
+    void deinitialize() throws HyracksDataException;
 
-    public int getInputArity();
+    int getInputArity();
 
-    public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc)
-            throws HyracksDataException;
+    void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) throws HyracksDataException;
 
-    public IFrameWriter getInputFrameWriter(int index);
+    IFrameWriter getInputFrameWriter(int index);
 
-    public String getDisplayName();
+    String getDisplayName();
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index b52a6a5..8f36fcd 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -92,6 +92,14 @@
     public static final int DISK_COMPONENT_SCAN_NOT_ALLOWED_FOR_SECONDARY_INDEX = 56;
     public static final int CANNOT_FIND_MATTER_TUPLE_FOR_ANTI_MATTER_TUPLE = 57;
     public static final int TASK_ABORTED = 58;
+    public static final int OPEN_ON_OPEN_WRITER = 59;
+    public static final int OPEN_ON_FAILED_WRITER = 60;
+    public static final int NEXT_FRAME_ON_FAILED_WRITER = 61;
+    public static final int NEXT_FRAME_ON_CLOSED_WRITER = 62;
+    public static final int FLUSH_ON_FAILED_WRITER = 63;
+    public static final int FLUSH_ON_CLOSED_WRITER = 64;
+    public static final int FAIL_ON_FAILED_WRITER = 65;
+    public static final int MISSED_FAIL_CALL = 66;
 
     // Compilation error codes.
     public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java
index a33c6c9a..7225cd4 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java
@@ -19,5 +19,6 @@
 package org.apache.hyracks.api.job;
 
 public enum JobFlag {
-    PROFILE_RUNTIME
+    PROFILE_RUNTIME,
+    ENFORCE_CONTRACT
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index 314bf8b..7fdf106 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -35,12 +35,14 @@
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.ActivityId;
+import org.apache.hyracks.api.dataflow.EnforceFrameWriter;
 import org.apache.hyracks.api.dataflow.IActivity;
 import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobFlag;
 
 /**
  * The runtime of a SuperActivity, which internally executes a DAG of one-to-one
@@ -90,18 +92,18 @@
     private void init() throws HyracksDataException {
         Queue<Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> childQueue = new LinkedList<>();
         List<IConnectorDescriptor> outputConnectors;
-
+        final boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT);
         /*
          * Set up the source operators
          */
         for (Entry<ActivityId, IActivity> entry : startActivities.entrySet()) {
-            IOperatorNodePushable opPushable = entry.getValue().createPushRuntime(ctx, recordDescProvider, partition,
-                    nPartitions);
+            IOperatorNodePushable opPushable =
+                    entry.getValue().createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
             operatorNodePushablesBFSOrder.add(opPushable);
             operatorNodePushables.put(entry.getKey(), opPushable);
             inputArity += opPushable.getInputArity();
-            outputConnectors = MapUtils.getObject(parent.getActivityOutputMap(), entry.getKey(),
-                    Collections.emptyList());
+            outputConnectors =
+                    MapUtils.getObject(parent.getActivityOutputMap(), entry.getKey(), Collections.emptyList());
             for (IConnectorDescriptor conn : outputConnectors) {
                 childQueue.add(parent.getConnectorActivityMap().get(conn.getConnectorId()));
             }
@@ -131,7 +133,9 @@
             /*
              * construct the dataflow connection from a producer to a consumer
              */
-            sourceOp.setOutputFrameWriter(outputChannel, destOp.getInputFrameWriter(inputChannel),
+            IFrameWriter writer = destOp.getInputFrameWriter(inputChannel);
+            writer = enforce ? EnforceFrameWriter.enforce(writer) : writer;
+            sourceOp.setOutputFrameWriter(outputChannel, writer,
                     recordDescProvider.getInputRecordDescriptor(destId, inputChannel));
 
             /*
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 35a2fc5..4bf069c 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -75,6 +75,13 @@
 56 = LSM disk component scan is not allowed for a secondary index
 57 = Couldn't find the matter tuple for anti-matter tuple in the primary index
 58 = Task %1$s was aborted
+59 = Data pipeline protocol violation: open() is called on a opened writer
+60 = Data pipeline protocol violation: open() is called on a failed writer
+61 = Data pipeline protocol violation: nextFrame() is called on a failed writer
+62 = Data pipeline protocol violation: nextFrame() is called on a closed writer
+63 = Data pipeline protocol violation: flush() is called on a failed writer
+64 = Data pipeline protocol violation: flush() is called on a closed writer
+65 = Data pipeline protocol violation: fail() is called twice on a writer
+66 = Data pipeline protocol violation: fail() is not called by the upstream when there is a failure in the downstream
 
-# 10000 ---- 19999: compilation errors
 10000 = The given rule collection %1$s is not an instance of the List class.
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
index 55a7a82..95a6d9b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
@@ -20,7 +20,6 @@
 
 import java.io.PrintWriter;
 import java.io.StringWriter;
-import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -116,10 +115,10 @@
     }
 
     //Run a Pre-distributed job by passing the JobId
-    public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, JobId jobId,
+    public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, JobId jobId, Set<JobFlag> jobFlags,
             PreDistributedJobDescriptor distributedJobDescriptor)
             throws HyracksException {
-        this(deploymentId, jobId, EnumSet.noneOf(JobFlag.class),
+        this(deploymentId, jobId, jobFlags,
                 distributedJobDescriptor.getJobSpecification(), distributedJobDescriptor.getActivityClusterGraph());
         Set<Constraint> constaints = distributedJobDescriptor.getActivityClusterGraphConstraints();
         this.scheduler = new JobExecutor(ccs, this, constaints, true);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
index 2dbb631..e083d2a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
@@ -68,7 +68,7 @@
                 run = new JobRun(ccs, deploymentId, jobId, acggf, acgg, jobFlags);
             } else {
                 //ActivityClusterGraph has already been distributed
-                run = new JobRun(ccs, deploymentId, jobId,
+                run = new JobRun(ccs, deploymentId, jobId, jobFlags,
                         ccs.getPreDistributedJobStore().getDistributedJobDescriptor(jobId));
             }
             jobManager.add(run);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
index f83df3a..cbc6146 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.control.common.controllers;
 
+import static org.apache.hyracks.control.common.config.OptionTypes.BOOLEAN;
 import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER;
 import static org.apache.hyracks.control.common.config.OptionTypes.LONG;
 import static org.apache.hyracks.control.common.config.OptionTypes.STRING;
@@ -64,7 +65,8 @@
         CLUSTER_TOPOLOGY(STRING),
         JOB_QUEUE_CLASS(STRING, "org.apache.hyracks.control.cc.scheduler.FIFOJobQueue"),
         JOB_QUEUE_CAPACITY(INTEGER, 4096),
-        JOB_MANAGER_CLASS(STRING, "org.apache.hyracks.control.cc.job.JobManager");
+        JOB_MANAGER_CLASS(STRING, "org.apache.hyracks.control.cc.job.JobManager"),
+        ENFORCE_FRAME_WRITER_PROTOCOL(BOOLEAN, false);
 
         private final IOptionType parser;
         private Object defaultValue;
@@ -156,6 +158,9 @@
                     return "The maximum number of jobs to queue before rejecting new jobs";
                 case JOB_MANAGER_CLASS:
                     return "Specify the implementation class name for the job manager";
+                case ENFORCE_FRAME_WRITER_PROTOCOL:
+                    return "A flag indicating if runtime should enforce frame writer protocol and detect "
+                            + "bad behaving operators";
                 default:
                     throw new IllegalStateException("NYI: " + this);
             }
@@ -357,4 +362,12 @@
     public int getJobQueueCapacity() {
         return getAppConfig().getInt(Option.JOB_QUEUE_CAPACITY);
     }
+
+    public boolean getEnforceFrameWriterProtocol() {
+        return getAppConfig().getBoolean(Option.ENFORCE_FRAME_WRITER_PROTOCOL);
+    }
+
+    public void setEnforceFrameWriterProtocol(boolean enforce) {
+        configManager.set(Option.ENFORCE_FRAME_WRITER_PROTOCOL, enforce);
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
index 42726a7..0fc3be7 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
@@ -64,8 +64,10 @@
         MESSAGING_PUBLIC_ADDRESS(STRING, PUBLIC_ADDRESS),
         MESSAGING_PUBLIC_PORT(INTEGER, MESSAGING_LISTEN_PORT),
         CLUSTER_CONNECT_RETRIES(INTEGER, 5),
-        IODEVICES(STRING_ARRAY, appConfig -> new String[] {
-                FileUtil.joinPath(appConfig.getString(ControllerConfig.Option.DEFAULT_DIR), "iodevice") },
+        IODEVICES(
+                STRING_ARRAY,
+                appConfig -> new String[] {
+                        FileUtil.joinPath(appConfig.getString(ControllerConfig.Option.DEFAULT_DIR), "iodevice") },
                 "<value of " + ControllerConfig.Option.DEFAULT_DIR.cmdline() + ">/iodevice"),
         NET_THREAD_COUNT(INTEGER, 1),
         NET_BUFFER_COUNT(INTEGER, 1),
@@ -95,7 +97,7 @@
         }
 
         <T> Option(IOptionType<T> parser, Function<IApplicationConfig, T> defaultValue,
-                   String defaultValueDescription) {
+                String defaultValueDescription) {
             this.parser = parser;
             this.defaultValue = defaultValue;
             this.defaultValueDescription = defaultValueDescription;
@@ -246,6 +248,7 @@
         return configManager;
     }
 
+    @Override
     public IApplicationConfig getAppConfig() {
         return appConfig;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 04d48f3..d689bc0 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -52,6 +52,7 @@
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.api.io.IWorkspaceFileFactory;
 import org.apache.hyracks.api.job.IOperatorEnvironment;
+import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.profiling.counters.ICounter;
 import org.apache.hyracks.api.job.profiling.counters.ICounterContext;
 import org.apache.hyracks.api.partitions.PartitionId;
@@ -104,9 +105,13 @@
 
     private Object sharedObject;
 
-    public Task(Joblet joblet, TaskAttemptId taskId, String displayName, ExecutorService executor,
-            NodeControllerService ncs, List<List<PartitionChannel>> inputChannelsFromConnectors) {
+    private final Set<JobFlag> jobFlags;
+
+    public Task(Joblet joblet, Set<JobFlag> jobFlags, TaskAttemptId taskId, String displayName,
+            ExecutorService executor, NodeControllerService ncs,
+            List<List<PartitionChannel>> inputChannelsFromConnectors) {
         this.joblet = joblet;
+        this.jobFlags = jobFlags;
         this.taskAttemptId = taskId;
         this.displayName = displayName;
         this.executorService = executor;
@@ -426,4 +431,9 @@
     public Object getSharedObject() {
         return sharedObject;
     }
+
+    @Override
+    public Set<JobFlag> getJobFlags() {
+        return jobFlags;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
index 02e8051..95f3e83 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
@@ -37,6 +37,7 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.ActivityId;
 import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
+import org.apache.hyracks.api.dataflow.EnforceFrameWriter;
 import org.apache.hyracks.api.dataflow.IActivity;
 import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
@@ -131,12 +132,10 @@
                 }
                 final int partition = tid.getPartition();
                 List<IConnectorDescriptor> inputs = ac.getActivityInputMap().get(aid);
-                task = new Task(joblet, taId, han.getClass().getName(), ncs.getExecutor(), ncs,
+                task = new Task(joblet, flags, taId, han.getClass().getName(), ncs.getExecutor(), ncs,
                         createInputChannels(td, inputs));
                 IOperatorNodePushable operator = han.createPushRuntime(task, rdp, partition, td.getPartitionCount());
-
                 List<IPartitionCollector> collectors = new ArrayList<>();
-
                 if (inputs != null) {
                     for (int i = 0; i < inputs.size(); ++i) {
                         IConnectorDescriptor conn = inputs.get(i);
@@ -145,26 +144,28 @@
                             LOGGER.info("input: " + i + ": " + conn.getConnectorId());
                         }
                         RecordDescriptor recordDesc = ac.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
-                        IPartitionCollector collector = createPartitionCollector(td, partition, task, i, conn,
-                                recordDesc, cPolicy);
+                        IPartitionCollector collector =
+                                createPartitionCollector(td, partition, task, i, conn, recordDesc, cPolicy);
                         collectors.add(collector);
                     }
                 }
                 List<IConnectorDescriptor> outputs = ac.getActivityOutputMap().get(aid);
                 if (outputs != null) {
+                    final boolean enforce = flags.contains(JobFlag.ENFORCE_CONTRACT);
                     for (int i = 0; i < outputs.size(); ++i) {
                         final IConnectorDescriptor conn = outputs.get(i);
                         RecordDescriptor recordDesc = ac.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
                         IConnectorPolicy cPolicy = connectorPoliciesMap.get(conn.getConnectorId());
 
-                        IPartitionWriterFactory pwFactory = createPartitionWriterFactory(task, cPolicy, jobId, conn,
-                                partition, taId, flags);
+                        IPartitionWriterFactory pwFactory =
+                                createPartitionWriterFactory(task, cPolicy, jobId, conn, partition, taId, flags);
 
                         if (LOGGER.isLoggable(Level.INFO)) {
                             LOGGER.info("output: " + i + ": " + conn.getConnectorId());
                         }
-                        IFrameWriter writer = conn.createPartitioner(task, recordDesc, pwFactory, partition, td
-                                .getPartitionCount(), td.getOutputPartitionCounts()[i]);
+                        IFrameWriter writer = conn.createPartitioner(task, recordDesc, pwFactory, partition,
+                                td.getPartitionCount(), td.getOutputPartitionCounts()[i]);
+                        writer = enforce ? EnforceFrameWriter.enforce(writer) : writer;
                         operator.setOutputFrameWriter(i, writer, recordDesc);
                     }
                 }
@@ -203,11 +204,11 @@
     private IPartitionCollector createPartitionCollector(TaskAttemptDescriptor td, final int partition, Task task,
             int i, IConnectorDescriptor conn, RecordDescriptor recordDesc, IConnectorPolicy cPolicy)
             throws HyracksDataException {
-        IPartitionCollector collector = conn.createPartitionCollector(task, recordDesc, partition, td
-                .getInputPartitionCounts()[i], td.getPartitionCount());
+        IPartitionCollector collector = conn.createPartitionCollector(task, recordDesc, partition,
+                td.getInputPartitionCounts()[i], td.getPartitionCount());
         if (cPolicy.materializeOnReceiveSide()) {
-            return new ReceiveSideMaterializingCollector(task, ncs.getPartitionManager(), collector, task
-                    .getTaskAttemptId(), ncs.getExecutor());
+            return new ReceiveSideMaterializingCollector(task, ncs.getPartitionManager(), collector,
+                    task.getTaskAttemptId(), ncs.getExecutor());
         } else {
             return collector;
         }
@@ -222,8 +223,9 @@
                 factory = new IPartitionWriterFactory() {
                     @Override
                     public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
-                        return new MaterializedPartitionWriter(ctx, ncs.getPartitionManager(), new PartitionId(jobId,
-                                conn.getConnectorId(), senderIndex, receiverIndex), taId, ncs.getExecutor());
+                        return new MaterializedPartitionWriter(ctx, ncs.getPartitionManager(),
+                                new PartitionId(jobId, conn.getConnectorId(), senderIndex, receiverIndex), taId,
+                                ncs.getExecutor());
                     }
                 };
             } else {
@@ -231,9 +233,9 @@
 
                     @Override
                     public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
-                        return new MaterializingPipelinedPartition(ctx, ncs.getPartitionManager(), new PartitionId(
-                                jobId, conn.getConnectorId(), senderIndex, receiverIndex), taId, ncs
-                                        .getExecutor());
+                        return new MaterializingPipelinedPartition(ctx, ncs.getPartitionManager(),
+                                new PartitionId(jobId, conn.getConnectorId(), senderIndex, receiverIndex), taId,
+                                ncs.getExecutor());
                     }
                 };
             }
@@ -241,8 +243,8 @@
             factory = new IPartitionWriterFactory() {
                 @Override
                 public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
-                    return new PipelinedPartition(ctx, ncs.getPartitionManager(), new PartitionId(jobId, conn
-                            .getConnectorId(), senderIndex, receiverIndex), taId);
+                    return new PipelinedPartition(ctx, ncs.getPartitionManager(),
+                            new PartitionId(jobId, conn.getConnectorId(), senderIndex, receiverIndex), taId);
                 }
             };
         }
@@ -272,11 +274,14 @@
                 if (inputAddresses[i] != null) {
                     for (int j = 0; j < inputAddresses[i].length; j++) {
                         NetworkAddress networkAddress = inputAddresses[i][j];
-                        PartitionId pid = new PartitionId(jobId, inputs.get(i).getConnectorId(), j, td
-                                .getTaskAttemptId().getTaskId().getPartition());
-                        PartitionChannel channel = new PartitionChannel(pid, new NetworkInputChannel(ncs
-                                .getNetworkManager(), new InetSocketAddress(InetAddress.getByAddress(networkAddress
-                                        .lookupIpAddress()), networkAddress.getPort()), pid, 5));
+                        PartitionId pid = new PartitionId(jobId, inputs.get(i).getConnectorId(), j,
+                                td.getTaskAttemptId().getTaskId().getPartition());
+                        PartitionChannel channel = new PartitionChannel(pid,
+                                new NetworkInputChannel(ncs.getNetworkManager(),
+                                        new InetSocketAddress(
+                                                InetAddress.getByAddress(networkAddress.lookupIpAddress()),
+                                                networkAddress.getPort()),
+                                        pid, 5));
                         channels.add(channel);
                     }
                 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
index 7901141..3105d42 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
@@ -190,6 +190,9 @@
             }
             aggregator.close();
             aggregateState.close();
+        } catch (Exception e) {
+            appenderWrapper.fail();
+            throw e;
         } finally {
             appenderWrapper.close();
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java
index 48e6b35..9ac9296 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java
@@ -18,15 +18,11 @@
  */
 package org.apache.hyracks.dataflow.std.misc;
 
-import java.nio.ByteBuffer;
-
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
 import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 
 public class NullSinkOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
     private static final long serialVersionUID = 1L;
@@ -38,22 +34,6 @@
     @Override
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
-        return new AbstractUnaryInputSinkOperatorNodePushable() {
-            @Override
-            public void open() throws HyracksDataException {
-            }
-
-            @Override
-            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-            }
-
-            @Override
-            public void close() throws HyracksDataException {
-            }
-
-            @Override
-            public void fail() throws HyracksDataException {
-            }
-        };
+        return new SinkOperatorNodePushable();
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorDescriptor.java
index fe64472..d987a35 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorDescriptor.java
@@ -19,15 +19,11 @@
 
 package org.apache.hyracks.dataflow.std.misc;
 
-import java.nio.ByteBuffer;
-
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
 import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 
 public class SinkOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
     private static final long serialVersionUID = 1L;
@@ -39,22 +35,6 @@
     @Override
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
-        return new AbstractUnaryInputSinkOperatorNodePushable() {
-            @Override
-            public void open() throws HyracksDataException {
-            }
-
-            @Override
-            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-            }
-
-            @Override
-            public void close() throws HyracksDataException {
-            }
-
-            @Override
-            public void fail() throws HyracksDataException {
-            }
-        };
+        return new SinkOperatorNodePushable();
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorNodePushable.java
new file mode 100644
index 0000000..85e1bb8
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SinkOperatorNodePushable.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.misc;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+
+public class SinkOperatorNodePushable extends AbstractUnaryInputSinkOperatorNodePushable {
+
+    @Override
+    public void open() throws HyracksDataException {
+        // Does nothing.
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        // Does nothing.
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        // Does nothing.
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        // Does nothing.
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java
index bcebb7d..3c11669 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSortRunGenerator.java
@@ -63,6 +63,9 @@
         flushWriter.open();
         try {
             getSorter().flush(flushWriter);
+        } catch (Exception e) {
+            flushWriter.fail();
+            throw e;
         } finally {
             flushWriter.close();
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
index 509607c..0352cea 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
@@ -78,6 +78,7 @@
     protected final boolean appendIndexFilter;
     protected ArrayTupleBuilder nonFilterTupleBuild;
     protected final ISearchOperationCallbackFactory searchCallbackFactory;
+    protected boolean failed = false;
 
     public IndexSearchOperatorNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc, int partition,
             int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, IIndexDataflowHelperFactory indexHelperFactory,
@@ -193,7 +194,7 @@
                 writeSearchResults(i);
             }
         } catch (Exception e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 
@@ -207,12 +208,15 @@
         HyracksDataException closeException = null;
         if (index != null) {
             // if index == null, then the index open was not successful
-            try {
-                if (appender.getTupleCount() > 0) {
-                    appender.write(writer, true);
+            if (!failed) {
+                try {
+                    if (appender.getTupleCount() > 0) {
+                        appender.write(writer, true);
+                    }
+                } catch (Throwable th) {
+                    writer.fail();
+                    closeException = new HyracksDataException(th);
                 }
-            } catch (Throwable th) {
-                closeException = new HyracksDataException(th);
             }
 
             try {
@@ -251,6 +255,7 @@
 
     @Override
     public void fail() throws HyracksDataException {
+        failed = true;
         writer.fail();
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
index 2171122..b100300 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
@@ -20,8 +20,10 @@
 
 import java.io.Serializable;
 import java.nio.ByteBuffer;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.hyracks.api.context.IHyracksJobletContext;
@@ -33,6 +35,7 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.profiling.counters.ICounterContext;
 import org.apache.hyracks.api.resources.IDeallocatable;
 import org.apache.hyracks.control.nc.io.WorkspaceFileFactory;
@@ -155,4 +158,9 @@
     public Object getSharedObject() {
         return sharedObject;
     }
+
+    @Override
+    public Set<JobFlag> getJobFlags() {
+        return EnumSet.noneOf(JobFlag.class);
+    }
 }