Changed the IFrameWriter Contract

Updated existing operators to conform to the new contract.
These operators are either index operators or Feed operators.
The rest of the operator already follow the new contract.

Change-Id: Ibcebe876340a25be0f561945582a95211c140e10
Reviewed-on: https://asterix-gerrit.ics.uci.edu/552
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/asterix-app/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java b/asterix-app/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
index 0daee1e..48bb6a9 100644
--- a/asterix-app/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
+++ b/asterix-app/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
@@ -44,13 +44,16 @@
     @Override
     public void initialize() throws HyracksDataException {
         FrameTupleAppender appender = new FrameTupleAppender(new VSizeFrame(ctx));
-        if (fieldSlots != null && tupleData != null && tupleSize > 0)
+        if (fieldSlots != null && tupleData != null && tupleSize > 0) {
             appender.append(fieldSlots, tupleData, 0, tupleSize);
-        writer.open();
-        try {
-            appender.flush(writer, true);
         }
-        finally {
+        try {
+            writer.open();
+            appender.flush(writer, true);
+        } catch (Throwable th) {
+            writer.fail();
+            throw new HyracksDataException(th);
+        } finally {
             writer.close();
         }
     }
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
index 4118f5e..51168ae 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
@@ -40,6 +40,7 @@
 public class AsterixLSMInsertDeleteOperatorNodePushable extends LSMIndexInsertUpdateDeleteOperatorNodePushable {
 
     private final boolean isPrimary;
+    private AbstractLSMIndex lsmIndex;
 
     public boolean isPrimary() {
         return isPrimary;
@@ -57,10 +58,10 @@
         RecordDescriptor inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
         accessor = new FrameTupleAccessor(inputRecDesc);
         writeBuffer = new VSizeFrame(ctx);
-        writer.open();
         indexHelper.open();
-        AbstractLSMIndex lsmIndex = (AbstractLSMIndex) indexHelper.getIndexInstance();
+        lsmIndex = (AbstractLSMIndex) indexHelper.getIndexInstance();
         try {
+            writer.open();
             modCallback = opDesc.getModificationOpCallbackFactory().createModificationOperationCallback(
                     indexHelper.getResourceName(), indexHelper.getResourceID(), lsmIndex, ctx);
             indexAccessor = lsmIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE);
@@ -69,15 +70,11 @@
                 tupleFilter = tupleFilterFactory.createTupleFilter(indexHelper.getTaskContext());
                 frameTuple = new FrameTupleReference();
             }
-
             IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
                     .getApplicationContext().getApplicationObject();
-
             AsterixLSMIndexUtil.checkAndSetFirstLSN(lsmIndex, runtimeCtx.getTransactionSubsystem().getLogManager());
-
-        } catch (Exception e) {
-            indexHelper.close();
-            throw new HyracksDataException(e);
+        } catch (Throwable th) {
+            throw new HyracksDataException(th);
         }
     }
 
@@ -114,18 +111,34 @@
                         }
                         break;
                     default: {
-                        throw new HyracksDataException("Unsupported operation " + op
-                                + " in tree index InsertDelete operator");
+                        throw new HyracksDataException(
+                                "Unsupported operation " + op + " in tree index InsertDelete operator");
                     }
                 }
             }
-        } catch (Exception e) {
-            e.printStackTrace();
-            throw new HyracksDataException(e);
+        } catch (Throwable th) {
+            throw new HyracksDataException(th);
         }
         writeBuffer.ensureFrameSize(buffer.capacity());
         FrameUtils.copyAndFlip(buffer, writeBuffer.getBuffer());
         FrameUtils.flushFrame(writeBuffer.getBuffer(), writer);
     }
 
+    @Override
+    public void close() throws HyracksDataException {
+        if (lsmIndex != null) {
+            try {
+                indexHelper.close();
+            } finally {
+                writer.close();
+            }
+        }
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        if (lsmIndex != null) {
+            writer.fail();
+        }
+    }
 }
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/feeds/DistributeFeedFrameWriter.java b/asterix-common/src/main/java/org/apache/asterix/common/feeds/DistributeFeedFrameWriter.java
index f5f89f7..d0e371e 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/feeds/DistributeFeedFrameWriter.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/feeds/DistributeFeedFrameWriter.java
@@ -98,8 +98,11 @@
 
     @Override
     public void close() throws HyracksDataException {
-        frameDistributor.close();
-        writer.close();
+        try {
+            frameDistributor.close();
+        } finally {
+            writer.close();
+        }
     }
 
     @Override
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalIndexBulkModifyOperatorNodePushable.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalIndexBulkModifyOperatorNodePushable.java
index d8865b4..cc7a212 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalIndexBulkModifyOperatorNodePushable.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalIndexBulkModifyOperatorNodePushable.java
@@ -60,6 +60,7 @@
         indexHelper.open();
         index = indexHelper.getIndexInstance();
         try {
+            writer.open();
             // Transactional BulkLoader
             bulkLoader = ((ITwoPCIndex) index).createTransactionBulkLoader(fillFactor, verifyInput, deletedFiles.length,
                     checkIfEmptyIndex);
@@ -69,9 +70,7 @@
                 filesIndexDescription.getBuddyBTreeTupleFromFileNumber(deleteTuple, buddyBTreeTupleBuilder, fileNumber);
                 ((ITwoPCIndexBulkLoader) bulkLoader).delete(deleteTuple);
             }
-        } catch (Exception e) {
-            ((ITwoPCIndexBulkLoader) bulkLoader).abort();
-            indexHelper.close();
+        } catch (Throwable e) {
             throw new HyracksDataException(e);
         }
     }
@@ -85,7 +84,6 @@
             try {
                 bulkLoader.add(tuple);
             } catch (IndexException e) {
-                ((ITwoPCIndexBulkLoader) bulkLoader).abort();
                 throw new HyracksDataException(e);
             }
         }
@@ -93,17 +91,29 @@
 
     @Override
     public void close() throws HyracksDataException {
-        try {
-            bulkLoader.end();
-        } catch (Exception e) {
-            throw new HyracksDataException(e);
-        } finally {
-            indexHelper.close();
+        if (index != null) {
+            try {
+                bulkLoader.end();
+            } catch (Throwable th) {
+                throw new HyracksDataException(th);
+            } finally {
+                try {
+                    indexHelper.close();
+                } finally {
+                    writer.close();
+                }
+            }
         }
     }
 
     @Override
     public void fail() throws HyracksDataException {
-        ((ITwoPCIndexBulkLoader) bulkLoader).abort();
+        if (index != null) {
+            try {
+                ((ITwoPCIndexBulkLoader) bulkLoader).abort();
+            } finally {
+                writer.fail();
+            }
+        }
     }
 }
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/external/ExternalLoopkupOperatorDiscriptor.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/external/ExternalLoopkupOperatorDiscriptor.java
index a7844ce..be1603b 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/external/ExternalLoopkupOperatorDiscriptor.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/external/ExternalLoopkupOperatorDiscriptor.java
@@ -44,13 +44,12 @@
     private final IControlledAdapterFactory adapterFactory;
     private final INullWriterFactory iNullWriterFactory;
 
-    public ExternalLoopkupOperatorDiscriptor(IOperatorDescriptorRegistry spec,
-            IControlledAdapterFactory adapterFactory, RecordDescriptor outRecDesc,
-            ExternalBTreeDataflowHelperFactory externalFilesIndexDataFlowHelperFactory, boolean propagateInput,
-            IIndexLifecycleManagerProvider lcManagerProvider, IStorageManagerInterface storageManager,
-            IFileSplitProvider fileSplitProvider, int datasetId, double bloomFilterFalsePositiveRate,
-            ISearchOperationCallbackFactory searchOpCallbackFactory, boolean retainNull,
-            INullWriterFactory iNullWriterFactory) {
+    public ExternalLoopkupOperatorDiscriptor(IOperatorDescriptorRegistry spec, IControlledAdapterFactory adapterFactory,
+            RecordDescriptor outRecDesc, ExternalBTreeDataflowHelperFactory externalFilesIndexDataFlowHelperFactory,
+            boolean propagateInput, IIndexLifecycleManagerProvider lcManagerProvider,
+            IStorageManagerInterface storageManager, IFileSplitProvider fileSplitProvider, int datasetId,
+            double bloomFilterFalsePositiveRate, ISearchOperationCallbackFactory searchOpCallbackFactory,
+            boolean retainNull, INullWriterFactory iNullWriterFactory) {
         super(spec, 1, 1, outRecDesc, storageManager, lcManagerProvider, fileSplitProvider,
                 new FilesIndexDescription().EXTERNAL_FILE_INDEX_TYPE_TRAITS,
                 new FilesIndexDescription().FILES_INDEX_COMP_FACTORIES, FilesIndexDescription.BLOOM_FILTER_FIELDS,
@@ -63,7 +62,7 @@
     @Override
     public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
             final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
-            throws HyracksDataException {
+                    throws HyracksDataException {
         // Create a file index accessor to be used for files lookup operations
         // Note that all file index accessors will use partition 0 since we only have 1 files index per NC 
         final ExternalFileIndexAccessor fileIndexAccessor = new ExternalFileIndexAccessor(
@@ -73,18 +72,22 @@
             // The adapter that uses the file index along with the coming tuples to access files in HDFS
             private final IControlledAdapter adapter = adapterFactory.createAdapter(ctx, fileIndexAccessor,
                     recordDescProvider.getInputRecordDescriptor(getActivityId(), 0));
+            private boolean indexOpen = false;
+            private boolean writerOpen = false;
 
             @Override
             public void open() throws HyracksDataException {
                 //Open the file index accessor here
                 fileIndexAccessor.openIndex();
+                indexOpen = true;
                 try {
                     adapter.initialize(ctx, iNullWriterFactory);
-                } catch (Exception e) {
+                } catch (Throwable th) {
                     // close the files index
                     fileIndexAccessor.closeIndex();
-                    throw new HyracksDataException("error during opening a controlled adapter", e);
+                    throw new HyracksDataException(th);
                 }
+                writerOpen = true;
                 writer.open();
             }
 
@@ -92,13 +95,19 @@
             public void close() throws HyracksDataException {
                 try {
                     adapter.close(writer);
-                } catch (Exception e) {
-                    e.printStackTrace();
-                    throw new HyracksDataException("controlled adapter failed to close", e);
+                } catch (Throwable th) {
+                    throw new HyracksDataException(th);
                 } finally {
-                    //close the file index
-                    fileIndexAccessor.closeIndex();
-                    writer.close();
+                    try {
+                        if (indexOpen) {
+                            //close the file index
+                            fileIndexAccessor.closeIndex();
+                        }
+                    } finally {
+                        if (writerOpen) {
+                            writer.close();
+                        }
+                    }
                 }
             }
 
@@ -106,12 +115,12 @@
             public void fail() throws HyracksDataException {
                 try {
                     adapter.fail();
-                    writer.fail();
-                } catch (Exception e) {
-                    throw new HyracksDataException("controlled adapter failed to clean up", e);
+                } catch (Throwable th) {
+                    throw new HyracksDataException(th);
                 } finally {
-                    // close the open index
-                    fileIndexAccessor.closeIndex();
+                    if (writerOpen) {
+                        writer.fail();
+                    }
                 }
             }
 
@@ -119,11 +128,10 @@
             public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
                 try {
                     adapter.nextFrame(buffer, writer);
-                } catch (Exception e) {
-                    throw new HyracksDataException("controlled adapter failed to process a frame", e);
+                } catch (Throwable th) {
+                    throw new HyracksDataException(th);
                 }
             }
-
         };
     }
 }
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/ExternalDataScanOperatorDescriptor.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/ExternalDataScanOperatorDescriptor.java
index ad0bcb5..4321b6d 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/ExternalDataScanOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/ExternalDataScanOperatorDescriptor.java
@@ -38,7 +38,6 @@
     private static final long serialVersionUID = 1L;
 
     private IAdapterFactory adapterFactory;
-    
 
     public ExternalDataScanOperatorDescriptor(JobSpecification spec, RecordDescriptor rDesc,
             IAdapterFactory dataSourceAdapterFactory) {
@@ -50,19 +49,20 @@
     @Override
     public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
-            throws HyracksDataException {
+                    throws HyracksDataException {
 
         return new AbstractUnaryOutputSourceOperatorNodePushable() {
 
             @Override
             public void initialize() throws HyracksDataException {
-                writer.open();
                 IDatasourceAdapter adapter = null;
                 try {
+                    writer.open();
                     adapter = adapterFactory.createAdapter(ctx, partition);
                     adapter.start(partition, writer);
-                } catch (Exception e) {
-                    throw new HyracksDataException("exception during reading from external data source", e);
+                } catch (Throwable th) {
+                    writer.fail();
+                    throw new HyracksDataException(th);
                 } finally {
                     writer.close();
                 }
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaComputeNodePushable.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaComputeNodePushable.java
index f6d2287..d9d6bdf 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaComputeNodePushable.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaComputeNodePushable.java
@@ -43,6 +43,9 @@
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 
+/*
+ * This IFrameWriter doesn't follow the contract
+ */
 public class FeedMetaComputeNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
 
     private static final Logger LOGGER = Logger.getLogger(FeedMetaComputeNodePushable.class.getName());
@@ -97,8 +100,8 @@
         this.partition = partition;
         this.nPartitions = nPartitions;
         this.connectionId = feedConnectionId;
-        this.feedManager = ((IAsterixAppRuntimeContext) (IAsterixAppRuntimeContext) ctx.getJobletContext()
-                .getApplicationContext().getApplicationObject()).getFeedManager();
+        this.feedManager = ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
+                .getApplicationObject()).getFeedManager();
         IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
                 .getApplicationContext().getApplicationObject();
         this.feedManager = runtimeCtx.getFeedManager();
@@ -125,11 +128,10 @@
     private void initializeNewFeedRuntime(FeedRuntimeId runtimeId) throws Exception {
         this.fta = new FrameTupleAccessor(recordDesc);
         this.inputSideHandler = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, coreOperator,
-                policyEnforcer.getFeedPolicyAccessor(), true, fta, recordDesc, feedManager,
-                nPartitions);
+                policyEnforcer.getFeedPolicyAccessor(), true, fta, recordDesc, feedManager, nPartitions);
 
-        DistributeFeedFrameWriter distributeWriter = new DistributeFeedFrameWriter(ctx, connectionId.getFeedId(), writer,
-                runtimeType, partition, new FrameTupleAccessor(recordDesc), feedManager);
+        DistributeFeedFrameWriter distributeWriter = new DistributeFeedFrameWriter(ctx, connectionId.getFeedId(),
+                writer, runtimeType, partition, new FrameTupleAccessor(recordDesc), feedManager);
         coreOperator.setOutputFrameWriter(0, distributeWriter, recordDesc);
 
         feedRuntime = new SubscribableRuntime(connectionId.getFeedId(), runtimeId, inputSideHandler, distributeWriter,
@@ -145,8 +147,8 @@
         this.inputSideHandler = feedRuntime.getInputHandler();
         this.inputSideHandler.setCoreOperator(coreOperator);
 
-        DistributeFeedFrameWriter distributeWriter = new DistributeFeedFrameWriter(ctx, connectionId.getFeedId(), writer,
-                runtimeType, partition, new FrameTupleAccessor(recordDesc), feedManager);
+        DistributeFeedFrameWriter distributeWriter = new DistributeFeedFrameWriter(ctx, connectionId.getFeedId(),
+                writer, runtimeType, partition, new FrameTupleAccessor(recordDesc), feedManager);
         coreOperator.setOutputFrameWriter(0, distributeWriter, recordDesc);
         distributeWriter.subscribeFeed(policyEnforcer.getFeedPolicyAccessor(), writer, connectionId);
 
@@ -217,8 +219,7 @@
             feedManager.getFeedSubscriptionManager().deregisterFeedSubscribableRuntime(runtimeId);
 
             // deregister from connection manager
-            feedManager.getFeedConnectionManager().deRegisterFeedRuntime(connectionId,
-                    ((FeedRuntime) feedRuntime).getRuntimeId());
+            feedManager.getFeedConnectionManager().deRegisterFeedRuntime(connectionId, feedRuntime.getRuntimeId());
         }
     }
 
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/external/ExternalBTreeSearchOperatorNodePushable.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/external/ExternalBTreeSearchOperatorNodePushable.java
index 81f6aeb..97b3d6a 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/external/ExternalBTreeSearchOperatorNodePushable.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/external/ExternalBTreeSearchOperatorNodePushable.java
@@ -46,12 +46,11 @@
     // We override the open function to search a specific version of the index
     @Override
     public void open() throws HyracksDataException {
+        writer.open();
         ExternalBTreeWithBuddyDataflowHelper dataFlowHelper = (ExternalBTreeWithBuddyDataflowHelper) indexHelper;
         accessor = new FrameTupleAccessor(inputRecDesc);
-        writer.open();
         dataFlowHelper.open();
         index = indexHelper.getIndexInstance();
-
         if (retainNull) {
             int fieldCount = getFieldCount();
             nullTupleBuild = new ArrayTupleBuilder(fieldCount);
@@ -67,7 +66,6 @@
         } else {
             nullTupleBuild = null;
         }
-
         ExternalBTreeWithBuddy externalIndex = (ExternalBTreeWithBuddy) index;
         try {
             searchPred = createSearchPredicate();
@@ -82,10 +80,8 @@
             if (retainInput) {
                 frameTuple = new FrameTupleReference();
             }
-        } catch (Exception e) {
-            indexHelper.close();
-            throw new HyracksDataException(e);
+        } catch (Throwable th) {
+            throw new HyracksDataException(th);
         }
     }
-
 }
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/external/ExternalRTreeSearchOperatorNodePushable.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/external/ExternalRTreeSearchOperatorNodePushable.java
index 9c7b020..3fb5609 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/external/ExternalRTreeSearchOperatorNodePushable.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/external/ExternalRTreeSearchOperatorNodePushable.java
@@ -45,12 +45,11 @@
     // We override this method to specify the searched version of the index
     @Override
     public void open() throws HyracksDataException {
-        accessor = new FrameTupleAccessor(inputRecDesc);
         writer.open();
+        accessor = new FrameTupleAccessor(inputRecDesc);
         indexHelper.open();
         ExternalRTreeDataflowHelper rTreeDataflowHelper = (ExternalRTreeDataflowHelper) indexHelper;
         index = indexHelper.getIndexInstance();
-
         if (retainNull) {
             int fieldCount = getFieldCount();
             nullTupleBuild = new ArrayTupleBuilder(fieldCount);
@@ -66,7 +65,6 @@
         } else {
             nullTupleBuild = null;
         }
-
         ExternalRTree rTreeIndex = (ExternalRTree) index;
         try {
             searchPred = createSearchPredicate();
@@ -82,7 +80,6 @@
                 frameTuple = new FrameTupleReference();
             }
         } catch (Exception e) {
-            indexHelper.close();
             throw new HyracksDataException(e);
         }
     }