more dataflow helper changes and cleanup to ease the introduction of index lifecycle management
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1711 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDataflowHelper.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDataflowHelper.java
index eaf1be1..f307c8a 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDataflowHelper.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDataflowHelper.java
@@ -30,12 +30,11 @@
}
@Override
- public ITreeIndex createIndexInstance() throws HyracksDataException {
+ public ITreeIndex getIndexInstance() throws HyracksDataException {
try {
return BTreeUtils.createBTree(opDesc.getStorageManager().getBufferCache(ctx), opDesc.getStorageManager()
.getFileMapProvider(ctx), treeOpDesc.getTreeIndexTypeTraits(), treeOpDesc
- .getTreeIndexComparatorFactories(), BTreeLeafFrameType.REGULAR_NSM, opDesc.getFileSplitProvider()
- .getFileSplits()[partition].getLocalFile());
+ .getTreeIndexComparatorFactories(), BTreeLeafFrameType.REGULAR_NSM, file);
} catch (BTreeException e) {
throw new HyracksDataException(e);
}
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/IIndexDataflowHelper.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/IIndexDataflowHelper.java
new file mode 100644
index 0000000..92cf775
--- /dev/null
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/IIndexDataflowHelper.java
@@ -0,0 +1,13 @@
+package edu.uci.ics.hyracks.storage.am.common.api;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
+
+public interface IIndexDataflowHelper {
+ public IIndex getIndexInstance() throws HyracksDataException;
+
+ public FileReference getFileReference();
+
+ public long getResourceID();
+}
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IIndexDataflowHelperFactory.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IIndexDataflowHelperFactory.java
index ddca470..71760c9 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IIndexDataflowHelperFactory.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IIndexDataflowHelperFactory.java
@@ -18,8 +18,9 @@
import java.io.Serializable;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexDataflowHelper;
public interface IIndexDataflowHelperFactory extends Serializable {
- public IndexDataflowHelper createIndexDataflowHelper(IIndexOperatorDescriptor opDesc,
+ public IIndexDataflowHelper createIndexDataflowHelper(IIndexOperatorDescriptor opDesc,
final IHyracksTaskContext ctx, int partition);
}
\ No newline at end of file
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
index 3d29b06..3b9a356 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
@@ -17,61 +17,59 @@
import java.io.File;
import java.io.IOException;
-import java.util.List;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.api.io.IODeviceHandle;
-import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexDataflowHelper;
import edu.uci.ics.hyracks.storage.common.file.IIndexArtifactMap;
-public abstract class IndexDataflowHelper {
- protected final int partition;
+public abstract class IndexDataflowHelper implements IIndexDataflowHelper {
+
protected final IIndexOperatorDescriptor opDesc;
protected final IHyracksTaskContext ctx;
protected final IIndexArtifactMap indexArtifactMap;
+ protected final IndexRegistry<IIndex> indexRegistry;
+ protected final FileReference file;
protected IIndex index;
- protected long resourceID = -1;
+ protected String baseDir;
+ protected String fullDir;
public IndexDataflowHelper(IIndexOperatorDescriptor opDesc, final IHyracksTaskContext ctx, int partition) {
this.opDesc = opDesc;
this.ctx = ctx;
- this.partition = partition;
this.indexArtifactMap = opDesc.getStorageManager().getIndexArtifactMap(ctx);
- }
+ this.indexRegistry = opDesc.getIndexRegistryProvider().getRegistry(ctx);
+ this.file = opDesc.getFileSplitProvider().getFileSplits()[partition].getLocalFile();
- public void init(boolean forceCreate) throws HyracksDataException {
- IndexRegistry<IIndex> indexRegistry = opDesc.getIndexRegistryProvider().getRegistry(ctx);
- FileReference file = getFilereference();
- List<IODeviceHandle> ioDeviceHandles = ctx.getIOManager().getIODevices();
- String fullDir = ioDeviceHandles.get(0).getPath().toString();
+ fullDir = ctx.getIOManager().getIODevices().get(0).getPath().toString();
if (!fullDir.endsWith(File.separator)) {
fullDir += File.separator;
}
- String baseDir = file.getFile().getPath();
+ baseDir = file.getFile().getPath();
if (!baseDir.endsWith(File.separator)) {
baseDir += File.separator;
}
fullDir += baseDir;
+ }
- resourceID = indexArtifactMap.get(fullDir);
-
+ public void init(boolean forceCreate) throws HyracksDataException {
+ long resourceID = getResourceID();
// Create new index instance and register it.
synchronized (indexRegistry) {
// Check if the index has already been registered.
boolean register = false;
index = indexRegistry.get(resourceID);
if (index == null) {
- index = createIndexInstance();
+ index = getIndexInstance();
register = true;
}
if (forceCreate) {
index.create();
// Create new resourceId
try {
- resourceID = indexArtifactMap.create(baseDir, ioDeviceHandles);
+ resourceID = indexArtifactMap.create(baseDir, ctx.getIOManager().getIODevices());
} catch (IOException e) {
throw new HyracksDataException(e);
}
@@ -83,29 +81,18 @@
}
}
- public abstract IIndex createIndexInstance() throws HyracksDataException;
+ public abstract IIndex getIndexInstance() throws HyracksDataException;
- public FileReference getFilereference() {
- IFileSplitProvider fileSplitProvider = opDesc.getFileSplitProvider();
- return fileSplitProvider.getFileSplits()[partition].getLocalFile();
- }
-
- public void deinit() throws HyracksDataException {
+ @Override
+ public FileReference getFileReference() {
+ return file;
}
public IIndex getIndex() {
return index;
}
- public IHyracksTaskContext getHyracksTaskContext() {
- return ctx;
- }
-
- public IIndexOperatorDescriptor getOperatorDescriptor() {
- return opDesc;
- }
-
public long getResourceID() {
- return resourceID;
+ return indexArtifactMap.get(fullDir);
}
}
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorNodePushable.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorNodePushable.java
index bb0689a..1289165 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorNodePushable.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorNodePushable.java
@@ -26,6 +26,8 @@
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
public class TreeIndexBulkLoadOperatorNodePushable extends AbstractUnaryInputSinkOperatorNodePushable {
+ private final AbstractTreeIndexOperatorDescriptor opDesc;
+ private final IHyracksTaskContext ctx;
private float fillFactor;
private final TreeIndexDataflowHelper treeIndexHelper;
private FrameTupleAccessor accessor;
@@ -38,8 +40,10 @@
public TreeIndexBulkLoadOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
int partition, int[] fieldPermutation, float fillFactor, IRecordDescriptorProvider recordDescProvider) {
- treeIndexHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
- opDesc, ctx, partition);
+ this.opDesc = opDesc;
+ this.ctx = ctx;
+ this.treeIndexHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory()
+ .createIndexDataflowHelper(opDesc, ctx, partition);
this.fillFactor = fillFactor;
this.recordDescProvider = recordDescProvider;
tuple.setFieldPermutation(fieldPermutation);
@@ -47,18 +51,14 @@
@Override
public void open() throws HyracksDataException {
- AbstractTreeIndexOperatorDescriptor opDesc = (AbstractTreeIndexOperatorDescriptor) treeIndexHelper
- .getOperatorDescriptor();
RecordDescriptor recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
- accessor = new FrameTupleAccessor(treeIndexHelper.getHyracksTaskContext().getFrameSize(), recDesc);
+ accessor = new FrameTupleAccessor(ctx.getFrameSize(), recDesc);
try {
treeIndexHelper.init(false);
treeIndex = (ITreeIndex) treeIndexHelper.getIndex();
bulkLoader = treeIndex.createBulkLoader(fillFactor);
} catch (Exception e) {
// cleanup in case of failure
- System.out.println("help");
- treeIndexHelper.deinit();
throw new HyracksDataException(e);
}
}
@@ -79,8 +79,6 @@
bulkLoader.end();
} catch (Exception e) {
throw new HyracksDataException(e);
- } finally {
- treeIndexHelper.deinit();
}
}
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexCreateOperatorNodePushable.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexCreateOperatorNodePushable.java
index 21348a0..8ad22db 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexCreateOperatorNodePushable.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexCreateOperatorNodePushable.java
@@ -46,11 +46,7 @@
@Override
public void initialize() throws HyracksDataException {
- try {
- treeIndexHelper.init(true);
- } finally {
- treeIndexHelper.deinit();
- }
+ treeIndexHelper.init(true);
}
@Override
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDataflowHelper.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDataflowHelper.java
index 10d1077..6548261 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDataflowHelper.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDataflowHelper.java
@@ -17,7 +17,6 @@
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
import edu.uci.ics.hyracks.storage.am.common.impls.TreeDiskOrderScanCursor;
@@ -30,7 +29,7 @@
this.treeOpDesc = (ITreeIndexOperatorDescriptor) opDesc;
}
- public abstract ITreeIndex createIndexInstance() throws HyracksDataException;
+ public abstract IIndex getIndexInstance() throws HyracksDataException;
public ITreeIndexCursor createDiskOrderScanCursor(ITreeIndexFrame leafFrame) throws HyracksDataException {
return new TreeDiskOrderScanCursor(leafFrame);
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
index 3c67fcd..8ffe1df 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
@@ -33,12 +33,14 @@
public class TreeIndexDiskOrderScanOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
private final AbstractTreeIndexOperatorDescriptor opDesc;
+ private final IHyracksTaskContext ctx;
private final TreeIndexDataflowHelper treeIndexHelper;
private ITreeIndex treeIndex;
public TreeIndexDiskOrderScanOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
IHyracksTaskContext ctx, int partition) {
this.opDesc = opDesc;
+ this.ctx = ctx;
this.treeIndexHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory()
.createIndexDataflowHelper(opDesc, ctx, partition);
}
@@ -58,9 +60,8 @@
try {
indexAccessor.diskOrderScan(cursor);
int fieldCount = treeIndex.getFieldCount();
- ByteBuffer frame = treeIndexHelper.getHyracksTaskContext().allocateFrame();
- FrameTupleAppender appender = new FrameTupleAppender(treeIndexHelper.getHyracksTaskContext()
- .getFrameSize());
+ ByteBuffer frame = ctx.allocateFrame();
+ FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
appender.reset(frame, true);
ArrayTupleBuilder tb = new ArrayTupleBuilder(fieldCount);
DataOutput dos = tb.getDataOutput();
@@ -101,6 +102,5 @@
@Override
public void deinitialize() throws HyracksDataException {
- treeIndexHelper.deinit();
}
}
\ No newline at end of file
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorNodePushable.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorNodePushable.java
index d7a4b99..07a2726 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorNodePushable.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorNodePushable.java
@@ -33,6 +33,8 @@
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
public class TreeIndexInsertUpdateDeleteOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+ private final AbstractTreeIndexOperatorDescriptor opDesc;
+ private final IHyracksTaskContext ctx;
private final TreeIndexDataflowHelper treeIndexHelper;
private FrameTupleAccessor accessor;
private final IRecordDescriptorProvider recordDescProvider;
@@ -47,6 +49,8 @@
public TreeIndexInsertUpdateDeleteOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
IHyracksTaskContext ctx, int partition, int[] fieldPermutation,
IRecordDescriptorProvider recordDescProvider, IndexOp op) {
+ this.opDesc = opDesc;
+ this.ctx = ctx;
this.treeIndexHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory()
.createIndexDataflowHelper(opDesc, ctx, partition);
this.recordDescProvider = recordDescProvider;
@@ -56,11 +60,9 @@
@Override
public void open() throws HyracksDataException {
- AbstractTreeIndexOperatorDescriptor opDesc = (AbstractTreeIndexOperatorDescriptor) treeIndexHelper
- .getOperatorDescriptor();
RecordDescriptor inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
- accessor = new FrameTupleAccessor(treeIndexHelper.getHyracksTaskContext().getFrameSize(), inputRecDesc);
- writeBuffer = treeIndexHelper.getHyracksTaskContext().allocateFrame();
+ accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRecDesc);
+ writeBuffer = ctx.allocateFrame();
writer.open();
try {
treeIndexHelper.init(false);
@@ -74,8 +76,6 @@
frameTuple = new FrameTupleReference();
}
} catch (Exception e) {
- // cleanup in case of failure
- treeIndexHelper.deinit();
throw new HyracksDataException(e);
}
}
@@ -130,11 +130,7 @@
@Override
public void close() throws HyracksDataException {
- try {
- writer.close();
- } finally {
- treeIndexHelper.deinit();
- }
+ writer.close();
}
@Override
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexSearchOperatorNodePushable.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexSearchOperatorNodePushable.java
index a49e9b3..3451d08 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexSearchOperatorNodePushable.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexSearchOperatorNodePushable.java
@@ -38,6 +38,7 @@
public abstract class TreeIndexSearchOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
protected final AbstractTreeIndexOperatorDescriptor opDesc;
+ protected final IHyracksTaskContext ctx;
protected final TreeIndexDataflowHelper treeIndexHelper;
protected FrameTupleAccessor accessor;
@@ -59,9 +60,10 @@
public TreeIndexSearchOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
int partition, IRecordDescriptorProvider recordDescProvider) {
this.opDesc = opDesc;
+ this.ctx = ctx;
this.treeIndexHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory()
.createIndexDataflowHelper(opDesc, ctx, partition);
- this.retainInput = treeIndexHelper.getOperatorDescriptor().getRetainInput();
+ this.retainInput = opDesc.getRetainInput();
this.inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
}
@@ -75,17 +77,17 @@
@Override
public void open() throws HyracksDataException {
- accessor = new FrameTupleAccessor(treeIndexHelper.getHyracksTaskContext().getFrameSize(), inputRecDesc);
+ accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRecDesc);
writer.open();
try {
treeIndexHelper.init(false);
treeIndex = (ITreeIndex) treeIndexHelper.getIndex();
cursorFrame = treeIndex.getLeafFrameFactory().createFrame();
searchPred = createSearchPredicate();
- writeBuffer = treeIndexHelper.getHyracksTaskContext().allocateFrame();
+ writeBuffer = ctx.allocateFrame();
tb = new ArrayTupleBuilder(recordDesc.getFieldCount());
dos = tb.getDataOutput();
- appender = new FrameTupleAppender(treeIndexHelper.getHyracksTaskContext().getFrameSize());
+ appender = new FrameTupleAppender(ctx.getFrameSize());
appender.reset(writeBuffer, true);
ISearchOperationCallback searchCallback = opDesc.getOpCallbackProvider().getSearchOperationCallback(
treeIndexHelper.getResourceID());
@@ -95,7 +97,6 @@
frameTuple = new FrameTupleReference();
}
} catch (Exception e) {
- treeIndexHelper.deinit();
throw new HyracksDataException(e);
}
}
@@ -144,18 +145,14 @@
@Override
public void close() throws HyracksDataException {
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(writeBuffer, writer);
+ }
+ writer.close();
try {
- if (appender.getTupleCount() > 0) {
- FrameUtils.flushFrame(writeBuffer, writer);
- }
- writer.close();
- try {
- cursor.close();
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- } finally {
- treeIndexHelper.deinit();
+ cursor.close();
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
}
}
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java
index df82359..b47049e 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorNodePushable.java
@@ -32,15 +32,18 @@
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
public class TreeIndexStatsOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
- private final TreeIndexDataflowHelper treeIndexHelper;
+ private final AbstractTreeIndexOperatorDescriptor opDesc;
private final IHyracksTaskContext ctx;
+ private final TreeIndexDataflowHelper treeIndexHelper;
private TreeIndexStatsGatherer statsGatherer;
public TreeIndexStatsOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
int partition) {
- treeIndexHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
- opDesc, ctx, partition);
+ this.opDesc = opDesc;
this.ctx = ctx;
+ this.treeIndexHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory()
+ .createIndexDataflowHelper(opDesc, ctx, partition);
+
}
@Override
@@ -58,10 +61,9 @@
writer.open();
treeIndexHelper.init(false);
ITreeIndex treeIndex = (ITreeIndex) treeIndexHelper.getIndex();
- IBufferCache bufferCache = treeIndexHelper.getOperatorDescriptor().getStorageManager().getBufferCache(ctx);
- IFileMapProvider fileMapProvider = treeIndexHelper.getOperatorDescriptor().getStorageManager()
- .getFileMapProvider(ctx);
- int indexFileId = fileMapProvider.lookupFileId(treeIndexHelper.getFilereference());
+ IBufferCache bufferCache = opDesc.getStorageManager().getBufferCache(ctx);
+ IFileMapProvider fileMapProvider = opDesc.getStorageManager().getFileMapProvider(ctx);
+ int indexFileId = fileMapProvider.lookupFileId(treeIndexHelper.getFileReference());
statsGatherer = new TreeIndexStatsGatherer(bufferCache, treeIndex.getFreePageManager(), indexFileId,
treeIndex.getRootPageId());
TreeIndexStats stats = statsGatherer.gatherStats(treeIndex.getLeafFrameFactory().createFrame(), treeIndex
@@ -81,11 +83,7 @@
}
FrameUtils.flushFrame(frame, writer);
} catch (Exception e) {
- try {
- treeIndexHelper.deinit();
- } finally {
- writer.fail();
- }
+ writer.fail();
} finally {
writer.close();
}
diff --git a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexBulkLoadOperatorNodePushable.java b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexBulkLoadOperatorNodePushable.java
index 3642da0..9b9ff2c 100644
--- a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexBulkLoadOperatorNodePushable.java
+++ b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexBulkLoadOperatorNodePushable.java
@@ -24,10 +24,13 @@
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoader;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
import edu.uci.ics.hyracks.storage.am.invertedindex.impls.InvertedIndex;
public class InvertedIndexBulkLoadOperatorNodePushable extends AbstractUnaryInputSinkOperatorNodePushable {
+ private final AbstractInvertedIndexOperatorDescriptor opDesc;
+ private final IHyracksTaskContext ctx;
private final InvertedIndexDataflowHelper invIndexDataflowHelper;
private InvertedIndex invIndex;
private IIndexBulkLoader bulkLoader;
@@ -39,6 +42,8 @@
public InvertedIndexBulkLoadOperatorNodePushable(AbstractInvertedIndexOperatorDescriptor opDesc,
IHyracksTaskContext ctx, int partition, int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider) {
+ this.opDesc = opDesc;
+ this.ctx = ctx;
this.invIndexDataflowHelper = new InvertedIndexDataflowHelper(opDesc, ctx, partition);
this.recordDescProvider = recordDescProvider;
tuple.setFieldPermutation(fieldPermutation);
@@ -46,25 +51,17 @@
@Override
public void open() throws HyracksDataException {
- AbstractInvertedIndexOperatorDescriptor opDesc = (AbstractInvertedIndexOperatorDescriptor) invIndexDataflowHelper
- .getOperatorDescriptor();
RecordDescriptor recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
- accessor = new FrameTupleAccessor(invIndexDataflowHelper.getHyracksTaskContext().getFrameSize(), recDesc);
+ accessor = new FrameTupleAccessor(ctx.getFrameSize(), recDesc);
// Inverted Index.
- try {
- invIndexDataflowHelper.init(false);
- invIndex = (InvertedIndex) invIndexDataflowHelper.getIndex();
+ invIndexDataflowHelper.init(false);
+ invIndex = (InvertedIndex) invIndexDataflowHelper.getIndex();
+ try {
bulkLoader = invIndex.createBulkLoader(BTree.DEFAULT_FILL_FACTOR);
- } catch (Exception e) {
- // Cleanup in case of failure.
- invIndexDataflowHelper.deinit();
- if (e instanceof HyracksDataException) {
- throw (HyracksDataException) e;
- } else {
- throw new HyracksDataException(e);
- }
+ } catch (IndexException e) {
+ throw new HyracksDataException(e);
}
}
@@ -80,13 +77,7 @@
@Override
public void close() throws HyracksDataException {
- try {
- bulkLoader.end();
- } catch (Exception e) {
- throw new HyracksDataException(e);
- } finally {
- invIndexDataflowHelper.deinit();
- }
+ bulkLoader.end();
}
@Override
diff --git a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexCreateOperatorNodePushable.java b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexCreateOperatorNodePushable.java
index bd25746..ef8ef42 100644
--- a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexCreateOperatorNodePushable.java
+++ b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexCreateOperatorNodePushable.java
@@ -44,12 +44,7 @@
@Override
public void initialize() throws HyracksDataException {
- // Inverted Index.
- try {
- invIndexDataflowHelper.init(true);
- } finally {
- invIndexDataflowHelper.deinit();
- }
+ invIndexDataflowHelper.init(true);
}
@Override
diff --git a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexDataflowHelper.java b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexDataflowHelper.java
index 7409b2a..969e099 100644
--- a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexDataflowHelper.java
+++ b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexDataflowHelper.java
@@ -16,8 +16,6 @@
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
@@ -32,21 +30,14 @@
super(opDesc, ctx, partition);
}
- public FileReference getFilereference() {
- AbstractInvertedIndexOperatorDescriptor invIndexOpDesc = (AbstractInvertedIndexOperatorDescriptor) opDesc;
- IFileSplitProvider fileSplitProvider = invIndexOpDesc.getInvListsFileSplitProvider();
- return fileSplitProvider.getFileSplits()[partition].getLocalFile();
- }
-
@Override
- public IIndex createIndexInstance() throws HyracksDataException {
+ public IIndex getIndexInstance() throws HyracksDataException {
IInvertedIndexOperatorDescriptor invIndexOpDesc = (IInvertedIndexOperatorDescriptor) opDesc;
IInvertedListBuilder invListBuilder = new FixedSizeElementInvertedListBuilder(
invIndexOpDesc.getInvListsTypeTraits());
return new InvertedIndex(opDesc.getStorageManager().getBufferCache(ctx), opDesc.getStorageManager()
.getFileMapProvider(ctx), invListBuilder, invIndexOpDesc.getInvListsTypeTraits(),
invIndexOpDesc.getInvListsComparatorFactories(), invIndexOpDesc.getTreeIndexTypeTraits(),
- invIndexOpDesc.getTreeIndexComparatorFactories(),
- opDesc.getFileSplitProvider().getFileSplits()[partition].getLocalFile());
+ invIndexOpDesc.getTreeIndexComparatorFactories(), file);
}
}
\ No newline at end of file
diff --git a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexSearchOperatorNodePushable.java b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexSearchOperatorNodePushable.java
index 4b932b1..b9dd1ea 100644
--- a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexSearchOperatorNodePushable.java
+++ b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexSearchOperatorNodePushable.java
@@ -38,6 +38,8 @@
import edu.uci.ics.hyracks.storage.am.invertedindex.impls.OccurrenceThresholdPanicException;
public class InvertedIndexSearchOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+ private final AbstractInvertedIndexOperatorDescriptor opDesc;
+ private final IHyracksTaskContext ctx;
private final InvertedIndexDataflowHelper invIndexDataflowHelper;
private final int queryField;
private FrameTupleAccessor accessor;
@@ -54,45 +56,34 @@
private ArrayTupleBuilder tb;
private DataOutput dos;
- private final AbstractInvertedIndexOperatorDescriptor opDesc;
private final boolean retainInput;
public InvertedIndexSearchOperatorNodePushable(AbstractInvertedIndexOperatorDescriptor opDesc,
IHyracksTaskContext ctx, int partition, int queryField, IInvertedIndexSearchModifier searchModifier,
IRecordDescriptorProvider recordDescProvider) {
this.opDesc = opDesc;
+ this.ctx = ctx;
this.invIndexDataflowHelper = new InvertedIndexDataflowHelper(opDesc, ctx, partition);
this.queryField = queryField;
this.searchPred = new InvertedIndexSearchPredicate(opDesc.getTokenizerFactory().createTokenizer(),
searchModifier);
this.recordDescProvider = recordDescProvider;
- this.retainInput = invIndexDataflowHelper.getOperatorDescriptor().getRetainInput();
+ this.retainInput = opDesc.getRetainInput();
}
@Override
public void open() throws HyracksDataException {
RecordDescriptor inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
- accessor = new FrameTupleAccessor(invIndexDataflowHelper.getHyracksTaskContext().getFrameSize(), inputRecDesc);
+ accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRecDesc);
tuple = new FrameTupleReference();
- // Inverted Index.
- try {
- invIndexDataflowHelper.init(false);
- invIndex = (InvertedIndex) invIndexDataflowHelper.getIndex();
- } catch (Exception e) {
- // Cleanup in case of failure.
- invIndexDataflowHelper.deinit();
- if (e instanceof HyracksDataException) {
- throw (HyracksDataException) e;
- } else {
- throw new HyracksDataException(e);
- }
- }
+ invIndexDataflowHelper.init(false);
+ invIndex = (InvertedIndex) invIndexDataflowHelper.getIndex();
- writeBuffer = invIndexDataflowHelper.getHyracksTaskContext().allocateFrame();
+ writeBuffer = ctx.allocateFrame();
tb = new ArrayTupleBuilder(recordDesc.getFieldCount());
dos = tb.getDataOutput();
- appender = new FrameTupleAppender(invIndexDataflowHelper.getHyracksTaskContext().getFrameSize());
+ appender = new FrameTupleAppender(ctx.getFrameSize());
appender.reset(writeBuffer, true);
indexAccessor = invIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
@@ -156,13 +147,9 @@
@Override
public void close() throws HyracksDataException {
- try {
- if (appender.getTupleCount() > 0) {
- FrameUtils.flushFrame(writeBuffer, writer);
- }
- writer.close();
- } finally {
- invIndexDataflowHelper.deinit();
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(writeBuffer, writer);
}
+ writer.close();
}
}
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelper.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelper.java
index 0582dbd..fbfd4b8 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelper.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDataflowHelper.java
@@ -17,8 +17,6 @@
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
@@ -66,15 +64,10 @@
}
@Override
- public ITreeIndex createIndexInstance() throws HyracksDataException {
+ public ITreeIndex getIndexInstance() throws HyracksDataException {
ITreeIndexMetaDataFrameFactory metaDataFrameFactory = new LIFOMetaDataFrameFactory();
InMemoryBufferCache memBufferCache = new InMemoryBufferCache(new HeapBufferAllocator(), memPageSize,
memNumPages, new TransientFileMapManager());
- IFileSplitProvider fileSplitProvider = opDesc.getFileSplitProvider();
- FileReference file = fileSplitProvider.getFileSplits()[partition].getLocalFile();
- if (file.getFile().exists() && !file.getFile().isDirectory()) {
- file.delete();
- }
InMemoryFreePageManager memFreePageManager = new InMemoryFreePageManager(memNumPages, metaDataFrameFactory);
return LSMBTreeUtils.createLSMTree(memBufferCache, memFreePageManager, ctx.getIOManager(), file, opDesc
.getStorageManager().getBufferCache(ctx), opDesc.getStorageManager().getFileMapProvider(ctx),
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/AbstractLSMRTreeDataflowHelper.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/AbstractLSMRTreeDataflowHelper.java
index d311e56..b021300 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/AbstractLSMRTreeDataflowHelper.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/dataflow/AbstractLSMRTreeDataflowHelper.java
@@ -21,7 +21,6 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.io.IIOManager;
-import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrameFactory;
@@ -83,15 +82,10 @@
}
@Override
- public ITreeIndex createIndexInstance() throws HyracksDataException {
+ public ITreeIndex getIndexInstance() throws HyracksDataException {
ITreeIndexMetaDataFrameFactory metaDataFrameFactory = new LIFOMetaDataFrameFactory();
InMemoryBufferCache memBufferCache = new LSMRTreeInMemoryBufferCache(new HeapBufferAllocator(), memPageSize,
memNumPages);
- IFileSplitProvider fileSplitProvider = opDesc.getFileSplitProvider();
- FileReference file = fileSplitProvider.getFileSplits()[partition].getLocalFile();
- if (file.getFile().exists() && !file.getFile().isDirectory()) {
- file.delete();
- }
InMemoryFreePageManager memFreePageManager = new LSMRTreeInMemoryFreePageManager(memNumPages,
metaDataFrameFactory);
diff --git a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeDataflowHelper.java b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeDataflowHelper.java
index c3660d1..f513460 100644
--- a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeDataflowHelper.java
+++ b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeDataflowHelper.java
@@ -37,10 +37,9 @@
}
@Override
- public ITreeIndex createIndexInstance() throws HyracksDataException {
+ public ITreeIndex getIndexInstance() throws HyracksDataException {
return RTreeUtils.createRTree(treeOpDesc.getStorageManager().getBufferCache(ctx), treeOpDesc
.getStorageManager().getFileMapProvider(ctx), treeOpDesc.getTreeIndexTypeTraits(),
- valueProviderFactories, treeOpDesc.getTreeIndexComparatorFactories(), rtreePolicyType, treeOpDesc
- .getFileSplitProvider().getFileSplits()[partition].getLocalFile());
+ valueProviderFactories, treeOpDesc.getTreeIndexComparatorFactories(), rtreePolicyType, file);
}
}