Fixed problem with writing to unopened writer

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@552 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
index c9a4d6b..a1e5162 100644
--- a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
+++ b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
@@ -45,157 +45,149 @@
 import edu.uci.ics.hyracks.storage.am.rtree.impls.RTreeSearchCursor;
 import edu.uci.ics.hyracks.storage.am.rtree.impls.SearchPredicate;
 
-public class RTreeSearchOperatorNodePushable extends
-		AbstractUnaryInputUnaryOutputOperatorNodePushable {
-	private TreeIndexOpHelper treeIndexOpHelper;
-	private FrameTupleAccessor accessor;
+public class RTreeSearchOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+    private TreeIndexOpHelper treeIndexOpHelper;
+    private FrameTupleAccessor accessor;
 
-	private ByteBuffer writeBuffer;
-	private FrameTupleAppender appender;
-	private ArrayTupleBuilder tb;
-	private DataOutput dos;
+    private ByteBuffer writeBuffer;
+    private FrameTupleAppender appender;
+    private ArrayTupleBuilder tb;
+    private DataOutput dos;
 
-	private RTree rtree;
-	private PermutingFrameTupleReference searchKey;
-	private SearchPredicate searchPred;
-	private MultiComparator cmp;
-	private ITreeIndexCursor cursor;
-	private ITreeIndexFrame interiorFrame;
-	private ITreeIndexFrame leafFrame;
-	private RTreeOpContext opCtx;
+    private RTree rtree;
+    private PermutingFrameTupleReference searchKey;
+    private SearchPredicate searchPred;
+    private MultiComparator cmp;
+    private ITreeIndexCursor cursor;
+    private ITreeIndexFrame interiorFrame;
+    private ITreeIndexFrame leafFrame;
+    private RTreeOpContext opCtx;
 
-	private RecordDescriptor recDesc;
+    private RecordDescriptor recDesc;
 
-	public RTreeSearchOperatorNodePushable(
-			AbstractTreeIndexOperatorDescriptor opDesc,
-			IHyracksTaskContext ctx, int partition,
-			IRecordDescriptorProvider recordDescProvider, int[] keyFields) {
-		treeIndexOpHelper = opDesc.getTreeIndexOpHelperFactory()
-				.createTreeIndexOpHelper(opDesc, ctx, partition,
-						IndexHelperOpenMode.OPEN);
+    public RTreeSearchOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
+            int partition, IRecordDescriptorProvider recordDescProvider, int[] keyFields) {
+        treeIndexOpHelper = opDesc.getTreeIndexOpHelperFactory().createTreeIndexOpHelper(opDesc, ctx, partition,
+                IndexHelperOpenMode.OPEN);
 
-		this.recDesc = recordDescProvider.getInputRecordDescriptor(
-				opDesc.getOperatorId(), 0);
-		if (keyFields != null && keyFields.length > 0) {
-			searchKey = new PermutingFrameTupleReference();
-			searchKey.setFieldPermutation(keyFields);
-		}
-	}
+        this.recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getOperatorId(), 0);
+        if (keyFields != null && keyFields.length > 0) {
+            searchKey = new PermutingFrameTupleReference();
+            searchKey.setFieldPermutation(keyFields);
+        }
+    }
 
-	@Override
-	public void open() throws HyracksDataException {
-		AbstractTreeIndexOperatorDescriptor opDesc = (AbstractTreeIndexOperatorDescriptor) treeIndexOpHelper
-				.getOperatorDescriptor();
-		accessor = new FrameTupleAccessor(treeIndexOpHelper
-				.getHyracksTaskContext().getFrameSize(), recDesc);
+    @Override
+    public void open() throws HyracksDataException {
+        AbstractTreeIndexOperatorDescriptor opDesc = (AbstractTreeIndexOperatorDescriptor) treeIndexOpHelper
+                .getOperatorDescriptor();
+        accessor = new FrameTupleAccessor(treeIndexOpHelper.getHyracksTaskContext().getFrameSize(), recDesc);
 
-		interiorFrame = opDesc.getTreeIndexInteriorFactory().createFrame();
-		leafFrame = opDesc.getTreeIndexLeafFactory().createFrame();
-		cursor = new RTreeSearchCursor((IRTreeInteriorFrame) interiorFrame,
-				(IRTreeLeafFrame) leafFrame);
+        interiorFrame = opDesc.getTreeIndexInteriorFactory().createFrame();
+        leafFrame = opDesc.getTreeIndexLeafFactory().createFrame();
+        cursor = new RTreeSearchCursor((IRTreeInteriorFrame) interiorFrame, (IRTreeLeafFrame) leafFrame);
 
-		try {
+        try {
 
-			treeIndexOpHelper.init();
-			rtree = (RTree) treeIndexOpHelper.getTreeIndex();
+            treeIndexOpHelper.init();
+            writer.open();
+            try {
+                rtree = (RTree) treeIndexOpHelper.getTreeIndex();
 
-			int keySearchFields = rtree.getCmp().getComparators().length;
+                int keySearchFields = rtree.getCmp().getComparators().length;
 
-			IBinaryComparator[] keySearchComparators = new IBinaryComparator[keySearchFields];
-			for (int i = 0; i < keySearchFields; i++) {
-				keySearchComparators[i] = rtree.getCmp().getComparators()[i];
-			}
+                IBinaryComparator[] keySearchComparators = new IBinaryComparator[keySearchFields];
+                for (int i = 0; i < keySearchFields; i++) {
+                    keySearchComparators[i] = rtree.getCmp().getComparators()[i];
+                }
 
-			IPrimitiveValueProvider[] keyValueProvider = new IPrimitiveValueProvider[keySearchFields];
-			for (int i = 0; i < keySearchFields; i++) {
-				keyValueProvider[i] = rtree.getCmp().getValueProviders()[i];
-			}
+                IPrimitiveValueProvider[] keyValueProvider = new IPrimitiveValueProvider[keySearchFields];
+                for (int i = 0; i < keySearchFields; i++) {
+                    keyValueProvider[i] = rtree.getCmp().getValueProviders()[i];
+                }
 
-			cmp = new MultiComparator(rtree.getCmp().getTypeTraits(),
-					keySearchComparators, keyValueProvider);
+                cmp = new MultiComparator(rtree.getCmp().getTypeTraits(), keySearchComparators, keyValueProvider);
 
-			searchPred = new SearchPredicate(searchKey, cmp);
+                searchPred = new SearchPredicate(searchKey, cmp);
 
-			writeBuffer = treeIndexOpHelper.getHyracksTaskContext()
-					.allocateFrame();
-			tb = new ArrayTupleBuilder(rtree.getCmp().getFieldCount());
-			dos = tb.getDataOutput();
-			appender = new FrameTupleAppender(treeIndexOpHelper
-					.getHyracksTaskContext().getFrameSize());
-			appender.reset(writeBuffer, true);
+                writeBuffer = treeIndexOpHelper.getHyracksTaskContext().allocateFrame();
+                tb = new ArrayTupleBuilder(rtree.getCmp().getFieldCount());
+                dos = tb.getDataOutput();
+                appender = new FrameTupleAppender(treeIndexOpHelper.getHyracksTaskContext().getFrameSize());
+                appender.reset(writeBuffer, true);
 
-			opCtx = rtree.createOpContext(IndexOp.SEARCH,
-					treeIndexOpHelper.getLeafFrame(),
-					treeIndexOpHelper.getInteriorFrame(), null);
+                opCtx = rtree.createOpContext(IndexOp.SEARCH, treeIndexOpHelper.getLeafFrame(),
+                        treeIndexOpHelper.getInteriorFrame(), null);
+            } catch (Exception e) {
+                writer.fail();
+                throw e;
+            }
 
-		} catch (Exception e) {
-			treeIndexOpHelper.deinit();
-		}
-	}
+        } catch (Exception e) {
+            treeIndexOpHelper.deinit();
+            throw new HyracksDataException(e);
+        }
+    }
 
-	private void writeSearchResults() throws Exception {
-		while (cursor.hasNext()) {
-			tb.reset();
-			cursor.next();
+    private void writeSearchResults() throws Exception {
+        while (cursor.hasNext()) {
+            tb.reset();
+            cursor.next();
 
-			ITupleReference frameTuple = cursor.getTuple();
-			for (int i = 0; i < frameTuple.getFieldCount(); i++) {
-				dos.write(frameTuple.getFieldData(i),
-						frameTuple.getFieldStart(i),
-						frameTuple.getFieldLength(i));
-				tb.addFieldEndOffset();
-			}
+            ITupleReference frameTuple = cursor.getTuple();
+            for (int i = 0; i < frameTuple.getFieldCount(); i++) {
+                dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
+                tb.addFieldEndOffset();
+            }
 
-			if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0,
-					tb.getSize())) {
-				FrameUtils.flushFrame(writeBuffer, writer);
-				appender.reset(writeBuffer, true);
-				if (!appender.append(tb.getFieldEndOffsets(),
-						tb.getByteArray(), 0, tb.getSize())) {
-					throw new IllegalStateException();
-				}
-			}
-		}
-	}
+            if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                FrameUtils.flushFrame(writeBuffer, writer);
+                appender.reset(writeBuffer, true);
+                if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                    throw new IllegalStateException();
+                }
+            }
+        }
+    }
 
-	@Override
-	public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-		accessor.reset(buffer);
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        accessor.reset(buffer);
 
-		int tupleCount = accessor.getTupleCount();
-		try {
-			for (int i = 0; i < tupleCount; i++) {
-				searchKey.reset(accessor, i);
+        int tupleCount = accessor.getTupleCount();
+        try {
+            for (int i = 0; i < tupleCount; i++) {
+                searchKey.reset(accessor, i);
 
-				searchPred.setSearchKey(searchKey);
-				cursor.reset();
-				rtree.search(cursor, searchPred, opCtx);
-				writeSearchResults();
-			}
-		} catch (Exception e) {
-			throw new HyracksDataException(e);
-		}
-	}
+                searchPred.setSearchKey(searchKey);
+                cursor.reset();
+                rtree.search(cursor, searchPred, opCtx);
+                writeSearchResults();
+            }
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
 
-	@Override
-	public void close() throws HyracksDataException {
-		try {
-			if (appender.getTupleCount() > 0) {
-				FrameUtils.flushFrame(writeBuffer, writer);
-			}
-			writer.close();
-			try {
-				cursor.close();
-			} catch (Exception e) {
-				throw new HyracksDataException(e);
-			}
-		} finally {
-			treeIndexOpHelper.deinit();
-		}
-	}
+    @Override
+    public void close() throws HyracksDataException {
+        try {
+            if (appender.getTupleCount() > 0) {
+                FrameUtils.flushFrame(writeBuffer, writer);
+            }
+            writer.close();
+            try {
+                cursor.close();
+            } catch (Exception e) {
+                throw new HyracksDataException(e);
+            }
+        } finally {
+            treeIndexOpHelper.deinit();
+        }
+    }
 
-	@Override
-	public void fail() throws HyracksDataException {
-		writer.fail();
-	}
+    @Override
+    public void fail() throws HyracksDataException {
+        writer.fail();
+    }
 }
\ No newline at end of file