diff --git a/asterix-app/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java b/asterix-app/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
index 0daee1e..48bb6a9 100644
--- a/asterix-app/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
+++ b/asterix-app/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
@@ -44,13 +44,16 @@
     @Override
     public void initialize() throws HyracksDataException {
         FrameTupleAppender appender = new FrameTupleAppender(new VSizeFrame(ctx));
-        if (fieldSlots != null && tupleData != null && tupleSize > 0)
+        if (fieldSlots != null && tupleData != null && tupleSize > 0) {
             appender.append(fieldSlots, tupleData, 0, tupleSize);
-        writer.open();
-        try {
-            appender.flush(writer, true);
         }
-        finally {
+        try {
+            writer.open();
+            appender.flush(writer, true);
+        } catch (Throwable th) {
+            writer.fail();
+            throw new HyracksDataException(th);
+        } finally {
             writer.close();
         }
     }
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
index 4118f5e..51168ae 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
@@ -40,6 +40,7 @@
 public class AsterixLSMInsertDeleteOperatorNodePushable extends LSMIndexInsertUpdateDeleteOperatorNodePushable {
 
     private final boolean isPrimary;
+    private AbstractLSMIndex lsmIndex;
 
     public boolean isPrimary() {
         return isPrimary;
@@ -57,10 +58,10 @@
         RecordDescriptor inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
         accessor = new FrameTupleAccessor(inputRecDesc);
         writeBuffer = new VSizeFrame(ctx);
-        writer.open();
         indexHelper.open();
-        AbstractLSMIndex lsmIndex = (AbstractLSMIndex) indexHelper.getIndexInstance();
+        lsmIndex = (AbstractLSMIndex) indexHelper.getIndexInstance();
         try {
+            writer.open();
             modCallback = opDesc.getModificationOpCallbackFactory().createModificationOperationCallback(
                     indexHelper.getResourceName(), indexHelper.getResourceID(), lsmIndex, ctx);
             indexAccessor = lsmIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE);
@@ -69,15 +70,11 @@
                 tupleFilter = tupleFilterFactory.createTupleFilter(indexHelper.getTaskContext());
                 frameTuple = new FrameTupleReference();
             }
-
             IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
                     .getApplicationContext().getApplicationObject();
-
             AsterixLSMIndexUtil.checkAndSetFirstLSN(lsmIndex, runtimeCtx.getTransactionSubsystem().getLogManager());
-
-        } catch (Exception e) {
-            indexHelper.close();
-            throw new HyracksDataException(e);
+        } catch (Throwable th) {
+            throw new HyracksDataException(th);
         }
     }
 
@@ -114,18 +111,34 @@
                         }
                         break;
                     default: {
-                        throw new HyracksDataException("Unsupported operation " + op
-                                + " in tree index InsertDelete operator");
+                        throw new HyracksDataException(
+                                "Unsupported operation " + op + " in tree index InsertDelete operator");
                     }
                 }
             }
-        } catch (Exception e) {
-            e.printStackTrace();
-            throw new HyracksDataException(e);
+        } catch (Throwable th) {
+            throw new HyracksDataException(th);
         }
         writeBuffer.ensureFrameSize(buffer.capacity());
         FrameUtils.copyAndFlip(buffer, writeBuffer.getBuffer());
         FrameUtils.flushFrame(writeBuffer.getBuffer(), writer);
     }
 
+    @Override
+    public void close() throws HyracksDataException {
+        if (lsmIndex != null) {
+            try {
+                indexHelper.close();
+            } finally {
+                writer.close();
+            }
+        }
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        if (lsmIndex != null) {
+            writer.fail();
+        }
+    }
 }
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/DistributeFeedFrameWriter.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/DistributeFeedFrameWriter.java
index f5f89f7..d0e371e 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/DistributeFeedFrameWriter.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/feeds/DistributeFeedFrameWriter.java
@@ -98,8 +98,11 @@
 
     @Override
     public void close() throws HyracksDataException {
-        frameDistributor.close();
-        writer.close();
+        try {
+            frameDistributor.close();
+        } finally {
+            writer.close();
+        }
     }
 
     @Override
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalIndexBulkModifyOperatorNodePushable.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalIndexBulkModifyOperatorNodePushable.java
index d8865b4..cc7a212 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalIndexBulkModifyOperatorNodePushable.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalIndexBulkModifyOperatorNodePushable.java
@@ -60,6 +60,7 @@
         indexHelper.open();
         index = indexHelper.getIndexInstance();
         try {
+            writer.open();
             // Transactional BulkLoader
             bulkLoader = ((ITwoPCIndex) index).createTransactionBulkLoader(fillFactor, verifyInput, deletedFiles.length,
                     checkIfEmptyIndex);
@@ -69,9 +70,7 @@
                 filesIndexDescription.getBuddyBTreeTupleFromFileNumber(deleteTuple, buddyBTreeTupleBuilder, fileNumber);
                 ((ITwoPCIndexBulkLoader) bulkLoader).delete(deleteTuple);
             }
-        } catch (Exception e) {
-            ((ITwoPCIndexBulkLoader) bulkLoader).abort();
-            indexHelper.close();
+        } catch (Throwable e) {
             throw new HyracksDataException(e);
         }
     }
@@ -85,7 +84,6 @@
             try {
                 bulkLoader.add(tuple);
             } catch (IndexException e) {
-                ((ITwoPCIndexBulkLoader) bulkLoader).abort();
                 throw new HyracksDataException(e);
             }
         }
@@ -93,17 +91,29 @@
 
     @Override
     public void close() throws HyracksDataException {
-        try {
-            bulkLoader.end();
-        } catch (Exception e) {
-            throw new HyracksDataException(e);
-        } finally {
-            indexHelper.close();
+        if (index != null) {
+            try {
+                bulkLoader.end();
+            } catch (Throwable th) {
+                throw new HyracksDataException(th);
+            } finally {
+                try {
+                    indexHelper.close();
+                } finally {
+                    writer.close();
+                }
+            }
         }
     }
 
     @Override
     public void fail() throws HyracksDataException {
-        ((ITwoPCIndexBulkLoader) bulkLoader).abort();
+        if (index != null) {
+            try {
+                ((ITwoPCIndexBulkLoader) bulkLoader).abort();
+            } finally {
+                writer.fail();
+            }
+        }
     }
 }
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/external/ExternalLoopkupOperatorDiscriptor.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/external/ExternalLoopkupOperatorDiscriptor.java
index a7844ce..be1603b 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/external/ExternalLoopkupOperatorDiscriptor.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/external/ExternalLoopkupOperatorDiscriptor.java
@@ -44,13 +44,12 @@
     private final IControlledAdapterFactory adapterFactory;
     private final INullWriterFactory iNullWriterFactory;
 
-    public ExternalLoopkupOperatorDiscriptor(IOperatorDescriptorRegistry spec,
-            IControlledAdapterFactory adapterFactory, RecordDescriptor outRecDesc,
-            ExternalBTreeDataflowHelperFactory externalFilesIndexDataFlowHelperFactory, boolean propagateInput,
-            IIndexLifecycleManagerProvider lcManagerProvider, IStorageManagerInterface storageManager,
-            IFileSplitProvider fileSplitProvider, int datasetId, double bloomFilterFalsePositiveRate,
-            ISearchOperationCallbackFactory searchOpCallbackFactory, boolean retainNull,
-            INullWriterFactory iNullWriterFactory) {
+    public ExternalLoopkupOperatorDiscriptor(IOperatorDescriptorRegistry spec, IControlledAdapterFactory adapterFactory,
+            RecordDescriptor outRecDesc, ExternalBTreeDataflowHelperFactory externalFilesIndexDataFlowHelperFactory,
+            boolean propagateInput, IIndexLifecycleManagerProvider lcManagerProvider,
+            IStorageManagerInterface storageManager, IFileSplitProvider fileSplitProvider, int datasetId,
+            double bloomFilterFalsePositiveRate, ISearchOperationCallbackFactory searchOpCallbackFactory,
+            boolean retainNull, INullWriterFactory iNullWriterFactory) {
         super(spec, 1, 1, outRecDesc, storageManager, lcManagerProvider, fileSplitProvider,
                 new FilesIndexDescription().EXTERNAL_FILE_INDEX_TYPE_TRAITS,
                 new FilesIndexDescription().FILES_INDEX_COMP_FACTORIES, FilesIndexDescription.BLOOM_FILTER_FIELDS,
@@ -63,7 +62,7 @@
     @Override
     public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
             final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
-            throws HyracksDataException {
+                    throws HyracksDataException {
         // Create a file index accessor to be used for files lookup operations
         // Note that all file index accessors will use partition 0 since we only have 1 files index per NC 
         final ExternalFileIndexAccessor fileIndexAccessor = new ExternalFileIndexAccessor(
@@ -73,18 +72,22 @@
             // The adapter that uses the file index along with the coming tuples to access files in HDFS
             private final IControlledAdapter adapter = adapterFactory.createAdapter(ctx, fileIndexAccessor,
                     recordDescProvider.getInputRecordDescriptor(getActivityId(), 0));
+            private boolean indexOpen = false;
+            private boolean writerOpen = false;
 
             @Override
             public void open() throws HyracksDataException {
                 //Open the file index accessor here
                 fileIndexAccessor.openIndex();
+                indexOpen = true;
                 try {
                     adapter.initialize(ctx, iNullWriterFactory);
-                } catch (Exception e) {
+                } catch (Throwable th) {
                     // close the files index
                     fileIndexAccessor.closeIndex();
-                    throw new HyracksDataException("error during opening a controlled adapter", e);
+                    throw new HyracksDataException(th);
                 }
+                writerOpen = true;
                 writer.open();
             }
 
@@ -92,13 +95,19 @@
             public void close() throws HyracksDataException {
                 try {
                     adapter.close(writer);
-                } catch (Exception e) {
-                    e.printStackTrace();
-                    throw new HyracksDataException("controlled adapter failed to close", e);
+                } catch (Throwable th) {
+                    throw new HyracksDataException(th);
                 } finally {
-                    //close the file index
-                    fileIndexAccessor.closeIndex();
-                    writer.close();
+                    try {
+                        if (indexOpen) {
+                            //close the file index
+                            fileIndexAccessor.closeIndex();
+                        }
+                    } finally {
+                        if (writerOpen) {
+                            writer.close();
+                        }
+                    }
                 }
             }
 
@@ -106,12 +115,12 @@
             public void fail() throws HyracksDataException {
                 try {
                     adapter.fail();
-                    writer.fail();
-                } catch (Exception e) {
-                    throw new HyracksDataException("controlled adapter failed to clean up", e);
+                } catch (Throwable th) {
+                    throw new HyracksDataException(th);
                 } finally {
-                    // close the open index
-                    fileIndexAccessor.closeIndex();
+                    if (writerOpen) {
+                        writer.fail();
+                    }
                 }
             }
 
@@ -119,11 +128,10 @@
             public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
                 try {
                     adapter.nextFrame(buffer, writer);
-                } catch (Exception e) {
-                    throw new HyracksDataException("controlled adapter failed to process a frame", e);
+                } catch (Throwable th) {
+                    throw new HyracksDataException(th);
                 }
             }
-
         };
     }
 }
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/ExternalDataScanOperatorDescriptor.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/ExternalDataScanOperatorDescriptor.java
index ad0bcb5..4321b6d 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/ExternalDataScanOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/ExternalDataScanOperatorDescriptor.java
@@ -38,7 +38,6 @@
     private static final long serialVersionUID = 1L;
 
     private IAdapterFactory adapterFactory;
-    
 
     public ExternalDataScanOperatorDescriptor(JobSpecification spec, RecordDescriptor rDesc,
             IAdapterFactory dataSourceAdapterFactory) {
@@ -50,19 +49,20 @@
     @Override
     public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
-            throws HyracksDataException {
+                    throws HyracksDataException {
 
         return new AbstractUnaryOutputSourceOperatorNodePushable() {
 
             @Override
             public void initialize() throws HyracksDataException {
-                writer.open();
                 IDatasourceAdapter adapter = null;
                 try {
+                    writer.open();
                     adapter = adapterFactory.createAdapter(ctx, partition);
                     adapter.start(partition, writer);
-                } catch (Exception e) {
-                    throw new HyracksDataException("exception during reading from external data source", e);
+                } catch (Throwable th) {
+                    writer.fail();
+                    throw new HyracksDataException(th);
                 } finally {
                     writer.close();
                 }
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaComputeNodePushable.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaComputeNodePushable.java
index f6d2287..d9d6bdf 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaComputeNodePushable.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaComputeNodePushable.java
@@ -43,6 +43,9 @@
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 
+/*
+ * This IFrameWriter doesn't follow the contract
+ */
 public class FeedMetaComputeNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
 
     private static final Logger LOGGER = Logger.getLogger(FeedMetaComputeNodePushable.class.getName());
@@ -97,8 +100,8 @@
         this.partition = partition;
         this.nPartitions = nPartitions;
         this.connectionId = feedConnectionId;
-        this.feedManager = ((IAsterixAppRuntimeContext) (IAsterixAppRuntimeContext) ctx.getJobletContext()
-                .getApplicationContext().getApplicationObject()).getFeedManager();
+        this.feedManager = ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
+                .getApplicationObject()).getFeedManager();
         IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
                 .getApplicationContext().getApplicationObject();
         this.feedManager = runtimeCtx.getFeedManager();
@@ -125,11 +128,10 @@
     private void initializeNewFeedRuntime(FeedRuntimeId runtimeId) throws Exception {
         this.fta = new FrameTupleAccessor(recordDesc);
         this.inputSideHandler = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, coreOperator,
-                policyEnforcer.getFeedPolicyAccessor(), true, fta, recordDesc, feedManager,
-                nPartitions);
+                policyEnforcer.getFeedPolicyAccessor(), true, fta, recordDesc, feedManager, nPartitions);
 
-        DistributeFeedFrameWriter distributeWriter = new DistributeFeedFrameWriter(ctx, connectionId.getFeedId(), writer,
-                runtimeType, partition, new FrameTupleAccessor(recordDesc), feedManager);
+        DistributeFeedFrameWriter distributeWriter = new DistributeFeedFrameWriter(ctx, connectionId.getFeedId(),
+                writer, runtimeType, partition, new FrameTupleAccessor(recordDesc), feedManager);
         coreOperator.setOutputFrameWriter(0, distributeWriter, recordDesc);
 
         feedRuntime = new SubscribableRuntime(connectionId.getFeedId(), runtimeId, inputSideHandler, distributeWriter,
@@ -145,8 +147,8 @@
         this.inputSideHandler = feedRuntime.getInputHandler();
         this.inputSideHandler.setCoreOperator(coreOperator);
 
-        DistributeFeedFrameWriter distributeWriter = new DistributeFeedFrameWriter(ctx, connectionId.getFeedId(), writer,
-                runtimeType, partition, new FrameTupleAccessor(recordDesc), feedManager);
+        DistributeFeedFrameWriter distributeWriter = new DistributeFeedFrameWriter(ctx, connectionId.getFeedId(),
+                writer, runtimeType, partition, new FrameTupleAccessor(recordDesc), feedManager);
         coreOperator.setOutputFrameWriter(0, distributeWriter, recordDesc);
         distributeWriter.subscribeFeed(policyEnforcer.getFeedPolicyAccessor(), writer, connectionId);
 
@@ -217,8 +219,7 @@
             feedManager.getFeedSubscriptionManager().deregisterFeedSubscribableRuntime(runtimeId);
 
             // deregister from connection manager
-            feedManager.getFeedConnectionManager().deRegisterFeedRuntime(connectionId,
-                    ((FeedRuntime) feedRuntime).getRuntimeId());
+            feedManager.getFeedConnectionManager().deRegisterFeedRuntime(connectionId, feedRuntime.getRuntimeId());
         }
     }
 
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/external/ExternalBTreeSearchOperatorNodePushable.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/external/ExternalBTreeSearchOperatorNodePushable.java
index 81f6aeb..97b3d6a 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/external/ExternalBTreeSearchOperatorNodePushable.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/external/ExternalBTreeSearchOperatorNodePushable.java
@@ -46,12 +46,11 @@
     // We override the open function to search a specific version of the index
     @Override
     public void open() throws HyracksDataException {
+        writer.open();
         ExternalBTreeWithBuddyDataflowHelper dataFlowHelper = (ExternalBTreeWithBuddyDataflowHelper) indexHelper;
         accessor = new FrameTupleAccessor(inputRecDesc);
-        writer.open();
         dataFlowHelper.open();
         index = indexHelper.getIndexInstance();
-
         if (retainNull) {
             int fieldCount = getFieldCount();
             nullTupleBuild = new ArrayTupleBuilder(fieldCount);
@@ -67,7 +66,6 @@
         } else {
             nullTupleBuild = null;
         }
-
         ExternalBTreeWithBuddy externalIndex = (ExternalBTreeWithBuddy) index;
         try {
             searchPred = createSearchPredicate();
@@ -82,10 +80,8 @@
             if (retainInput) {
                 frameTuple = new FrameTupleReference();
             }
-        } catch (Exception e) {
-            indexHelper.close();
-            throw new HyracksDataException(e);
+        } catch (Throwable th) {
+            throw new HyracksDataException(th);
         }
     }
-
 }
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/external/ExternalRTreeSearchOperatorNodePushable.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/external/ExternalRTreeSearchOperatorNodePushable.java
index 9c7b020..3fb5609 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/external/ExternalRTreeSearchOperatorNodePushable.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/external/ExternalRTreeSearchOperatorNodePushable.java
@@ -45,12 +45,11 @@
     // We override this method to specify the searched version of the index
     @Override
     public void open() throws HyracksDataException {
-        accessor = new FrameTupleAccessor(inputRecDesc);
         writer.open();
+        accessor = new FrameTupleAccessor(inputRecDesc);
         indexHelper.open();
         ExternalRTreeDataflowHelper rTreeDataflowHelper = (ExternalRTreeDataflowHelper) indexHelper;
         index = indexHelper.getIndexInstance();
-
         if (retainNull) {
             int fieldCount = getFieldCount();
             nullTupleBuild = new ArrayTupleBuilder(fieldCount);
@@ -66,7 +65,6 @@
         } else {
             nullTupleBuild = null;
         }
-
         ExternalRTree rTreeIndex = (ExternalRTree) index;
         try {
             searchPred = createSearchPredicate();
@@ -82,7 +80,6 @@
                 frameTuple = new FrameTupleReference();
             }
         } catch (Exception e) {
-            indexHelper.close();
             throw new HyracksDataException(e);
         }
     }
