Modified RTree search operator to extend the common TreeIndexSearchOperator.
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1195 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
index fd65c20..2de7c79 100644
--- a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
+++ b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
@@ -15,153 +15,39 @@
package edu.uci.ics.hyracks.storage.am.rtree.dataflow;
-import java.io.DataOutput;
-import java.nio.ByteBuffer;
-
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexSearchOperatorNodePushable;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
-import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeInteriorFrame;
-import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeLeafFrame;
import edu.uci.ics.hyracks.storage.am.rtree.impls.RTree;
-import edu.uci.ics.hyracks.storage.am.rtree.impls.RTreeSearchCursor;
import edu.uci.ics.hyracks.storage.am.rtree.impls.SearchPredicate;
import edu.uci.ics.hyracks.storage.am.rtree.util.RTreeUtils;
-public class RTreeSearchOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
- private TreeIndexDataflowHelper treeIndexHelper;
- private FrameTupleAccessor accessor;
-
- private ByteBuffer writeBuffer;
- private FrameTupleAppender appender;
- private ArrayTupleBuilder tb;
- private DataOutput dos;
-
- private RTree rtree;
- private PermutingFrameTupleReference searchKey;
- private SearchPredicate searchPred;
- private MultiComparator cmp;
- private ITreeIndexCursor cursor;
- private ITreeIndexFrame interiorFrame;
- private ITreeIndexFrame leafFrame;
- private ITreeIndexAccessor indexAccessor;
-
- private RecordDescriptor recDesc;
+public class RTreeSearchOperatorNodePushable extends TreeIndexSearchOperatorNodePushable {
+ protected PermutingFrameTupleReference searchKey;
+ protected MultiComparator cmp;
public RTreeSearchOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
int partition, IRecordDescriptorProvider recordDescProvider, int[] keyFields) {
- treeIndexHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
- opDesc, ctx, partition, false);
- this.recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getOperatorId(), 0);
+ super(opDesc, ctx, partition, recordDescProvider);
if (keyFields != null && keyFields.length > 0) {
searchKey = new PermutingFrameTupleReference();
searchKey.setFieldPermutation(keyFields);
}
}
- @Override
- public void open() throws HyracksDataException {
- accessor = new FrameTupleAccessor(treeIndexHelper.getHyracksTaskContext().getFrameSize(), recDesc);
- try {
- treeIndexHelper.init();
- writer.open();
- try {
- rtree = (RTree) treeIndexHelper.getIndex();
- interiorFrame = rtree.getInteriorFrameFactory().createFrame();
- leafFrame = rtree.getLeafFrameFactory().createFrame();
- cursor = new RTreeSearchCursor((IRTreeInteriorFrame) interiorFrame, (IRTreeLeafFrame) leafFrame);
- cmp = RTreeUtils.getSearchMultiComparator(rtree.getComparatorFactories(), searchKey);
+ @Override
+ protected ISearchPredicate createSearchPredicate() {
+ RTree rtree = (RTree) treeIndex;
+ cmp = RTreeUtils.getSearchMultiComparator(rtree.getComparatorFactories(), searchKey);
+ return new SearchPredicate(searchKey, cmp);
+ }
- searchPred = new SearchPredicate(searchKey, cmp);
- writeBuffer = treeIndexHelper.getHyracksTaskContext().allocateFrame();
- tb = new ArrayTupleBuilder(rtree.getFieldCount());
- dos = tb.getDataOutput();
- appender = new FrameTupleAppender(treeIndexHelper.getHyracksTaskContext().getFrameSize());
- appender.reset(writeBuffer, true);
- indexAccessor = rtree.createAccessor();
- } catch (Exception e) {
- writer.fail();
- throw e;
- }
-
- } catch (Exception e) {
- treeIndexHelper.deinit();
- throw new HyracksDataException(e);
- }
- }
-
- private void writeSearchResults() throws Exception {
- while (cursor.hasNext()) {
- tb.reset();
- cursor.next();
-
- ITupleReference frameTuple = cursor.getTuple();
- for (int i = 0; i < frameTuple.getFieldCount(); i++) {
- dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
- tb.addFieldEndOffset();
- }
-
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- FrameUtils.flushFrame(writeBuffer, writer);
- appender.reset(writeBuffer, true);
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- throw new IllegalStateException();
- }
- }
- }
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- accessor.reset(buffer);
-
- int tupleCount = accessor.getTupleCount();
- try {
- for (int i = 0; i < tupleCount; i++) {
- searchKey.reset(accessor, i);
-
- cursor.reset();
- indexAccessor.search(cursor, searchPred);
- writeSearchResults();
- }
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- }
-
- @Override
- public void close() throws HyracksDataException {
- try {
- if (appender.getTupleCount() > 0) {
- FrameUtils.flushFrame(writeBuffer, writer);
- }
- writer.close();
- try {
- cursor.close();
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- } finally {
- treeIndexHelper.deinit();
- }
- }
-
- @Override
- public void fail() throws HyracksDataException {
- writer.fail();
- }
+ @Override
+ protected void resetSearchPredicate(int tupleIndex) {
+ searchKey.reset(accessor, tupleIndex);
+ }
}
\ No newline at end of file