moved opcallbacks out of dataflowhelpers and did some minor cleanup
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1705 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
index 9c295d7..3d29b06 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
@@ -15,67 +15,54 @@
package edu.uci.ics.hyracks.storage.am.common.dataflow;
+import java.io.File;
import java.io.IOException;
import java.util.List;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.api.io.IIOManager;
import edu.uci.ics.hyracks.api.io.IODeviceHandle;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
-import edu.uci.ics.hyracks.storage.am.common.api.IOperationCallbackProvider;
-import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
import edu.uci.ics.hyracks.storage.common.file.IIndexArtifactMap;
public abstract class IndexDataflowHelper {
- protected IIndex index;
- protected int indexFileId = -1;
-
protected final int partition;
protected final IIndexOperatorDescriptor opDesc;
protected final IHyracksTaskContext ctx;
- protected transient IModificationOperationCallback modificationOperationCallback;
- protected transient ISearchOperationCallback searchOperationCallback;
+ protected final IIndexArtifactMap indexArtifactMap;
+
+ protected IIndex index;
+ protected long resourceID = -1;
public IndexDataflowHelper(IIndexOperatorDescriptor opDesc, final IHyracksTaskContext ctx, int partition) {
this.opDesc = opDesc;
this.ctx = ctx;
this.partition = partition;
+ this.indexArtifactMap = opDesc.getStorageManager().getIndexArtifactMap(ctx);
}
public void init(boolean forceCreate) throws HyracksDataException {
IndexRegistry<IIndex> indexRegistry = opDesc.getIndexRegistryProvider().getRegistry(ctx);
- IIndexArtifactMap indexArtifactMap = opDesc.getStorageManager().getIndexArtifactMap(ctx);
- FileReference fileRef = getFilereference();
-
- // check whether the requested index instance already exists by
- // retrieving indexRegistry with resourceId
- // To do so, checking the index directory in the first ioDevice is
- // sufficient.
-
- // create a fullDir(= IODeviceDir + baseDir) for the requested index
- IIOManager ioManager = ctx.getIOManager();
- List<IODeviceHandle> ioDeviceHandles = ioManager.getIODevices();
+ FileReference file = getFilereference();
+ List<IODeviceHandle> ioDeviceHandles = ctx.getIOManager().getIODevices();
String fullDir = ioDeviceHandles.get(0).getPath().toString();
- if (!fullDir.endsWith(System.getProperty("file.separator"))) {
- fullDir += System.getProperty("file.separator");
+ if (!fullDir.endsWith(File.separator)) {
+ fullDir += File.separator;
}
- String baseDir = fileRef.getFile().getPath();
- if (!baseDir.endsWith(System.getProperty("file.separator"))) {
- baseDir += System.getProperty("file.separator");
+ String baseDir = file.getFile().getPath();
+ if (!baseDir.endsWith(File.separator)) {
+ baseDir += File.separator;
}
fullDir += baseDir;
- // get the corresponding resourceId with the fullDir
- long resourceId = indexArtifactMap.get(fullDir);
+ resourceID = indexArtifactMap.get(fullDir);
// Create new index instance and register it.
synchronized (indexRegistry) {
// Check if the index has already been registered.
boolean register = false;
- index = indexRegistry.get(resourceId);
+ index = indexRegistry.get(resourceID);
if (index == null) {
index = createIndexInstance();
register = true;
@@ -84,20 +71,16 @@
index.create();
// Create new resourceId
try {
- resourceId = indexArtifactMap.create(baseDir, ioDeviceHandles);
+ resourceID = indexArtifactMap.create(baseDir, ioDeviceHandles);
} catch (IOException e) {
throw new HyracksDataException(e);
}
}
if (register) {
index.activate();
- indexRegistry.register(resourceId, index);
+ indexRegistry.register(resourceID, index);
}
}
-
- //set operationCallback object
- modificationOperationCallback = opDesc.getOpCallbackProvider().getModificationOperationCallback(resourceId);
- searchOperationCallback = opDesc.getOpCallbackProvider().getSearchOperationCallback(resourceId);
}
public abstract IIndex createIndexInstance() throws HyracksDataException;
@@ -122,20 +105,7 @@
return opDesc;
}
- public int getIndexFileId() {
- return indexFileId;
+ public long getResourceID() {
+ return resourceID;
}
-
- public IOperationCallbackProvider getOpCallbackProvider() {
- return opDesc.getOpCallbackProvider();
- }
-
- public IModificationOperationCallback getModificationOperationCallback() {
- return modificationOperationCallback;
- }
-
- public ISearchOperationCallback getSearchOperationCallback() {
- return searchOperationCallback;
- }
-
}
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
index 33796e6..3c67fcd 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
@@ -24,19 +24,23 @@
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
public class TreeIndexDiskOrderScanOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
+ private final AbstractTreeIndexOperatorDescriptor opDesc;
private final TreeIndexDataflowHelper treeIndexHelper;
private ITreeIndex treeIndex;
public TreeIndexDiskOrderScanOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
IHyracksTaskContext ctx, int partition) {
- treeIndexHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
- opDesc, ctx, partition);
+ this.opDesc = opDesc;
+ this.treeIndexHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory()
+ .createIndexDataflowHelper(opDesc, ctx, partition);
}
@Override
@@ -46,8 +50,10 @@
treeIndex = (ITreeIndex) treeIndexHelper.getIndex();
ITreeIndexFrame cursorFrame = treeIndex.getLeafFrameFactory().createFrame();
ITreeIndexCursor cursor = treeIndexHelper.createDiskOrderScanCursor(cursorFrame);
+ ISearchOperationCallback searchCallback = opDesc.getOpCallbackProvider().getSearchOperationCallback(
+ treeIndexHelper.getResourceID());
ITreeIndexAccessor indexAccessor = (ITreeIndexAccessor) treeIndex.createAccessor(
- treeIndexHelper.getModificationOperationCallback(), treeIndexHelper.getSearchOperationCallback());
+ NoOpOperationCallback.INSTANCE, searchCallback);
writer.open();
try {
indexAccessor.diskOrderScan(cursor);
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorNodePushable.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorNodePushable.java
index 017cb75..d7a4b99 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorNodePushable.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorNodePushable.java
@@ -29,6 +29,7 @@
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.api.ITupleFilter;
import edu.uci.ics.hyracks.storage.am.common.api.ITupleFilterFactory;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
public class TreeIndexInsertUpdateDeleteOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
@@ -41,12 +42,13 @@
private ByteBuffer writeBuffer;
private IIndexAccessor indexAccessor;
private ITupleFilter tupleFilter;
+ private IModificationOperationCallback modCallback;
public TreeIndexInsertUpdateDeleteOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
IHyracksTaskContext ctx, int partition, int[] fieldPermutation,
IRecordDescriptorProvider recordDescProvider, IndexOp op) {
- treeIndexHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
- opDesc, ctx, partition);
+ this.treeIndexHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory()
+ .createIndexDataflowHelper(opDesc, ctx, partition);
this.recordDescProvider = recordDescProvider;
this.op = op;
tuple.setFieldPermutation(fieldPermutation);
@@ -63,8 +65,9 @@
try {
treeIndexHelper.init(false);
ITreeIndex treeIndex = (ITreeIndex) treeIndexHelper.getIndex();
- indexAccessor = treeIndex.createAccessor(treeIndexHelper.getModificationOperationCallback(),
- treeIndexHelper.getSearchOperationCallback());
+ modCallback = opDesc.getOpCallbackProvider().getModificationOperationCallback(
+ treeIndexHelper.getResourceID());
+ indexAccessor = treeIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE);
ITupleFilterFactory tupleFilterFactory = opDesc.getTupleFilterFactory();
if (tupleFilterFactory != null) {
tupleFilter = tupleFilterFactory.createTupleFilter(treeIndexHelper.ctx);
@@ -90,9 +93,8 @@
}
}
tuple.reset(accessor, i);
- IModificationOperationCallback modificationCallback = treeIndexHelper
- .getModificationOperationCallback();
- modificationCallback.before(tuple);
+
+ modCallback.before(tuple);
switch (op) {
case INSERT: {
indexAccessor.insert(tuple);
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexSearchOperatorNodePushable.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexSearchOperatorNodePushable.java
index 7e23d29..a49e9b3 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexSearchOperatorNodePushable.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexSearchOperatorNodePushable.java
@@ -30,11 +30,14 @@
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
public abstract class TreeIndexSearchOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+ protected final AbstractTreeIndexOperatorDescriptor opDesc;
protected final TreeIndexDataflowHelper treeIndexHelper;
protected FrameTupleAccessor accessor;
@@ -55,10 +58,11 @@
public TreeIndexSearchOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
int partition, IRecordDescriptorProvider recordDescProvider) {
- treeIndexHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
- opDesc, ctx, partition);
+ this.opDesc = opDesc;
+ this.treeIndexHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory()
+ .createIndexDataflowHelper(opDesc, ctx, partition);
this.retainInput = treeIndexHelper.getOperatorDescriptor().getRetainInput();
- inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
+ this.inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
}
protected abstract ISearchPredicate createSearchPredicate();
@@ -73,7 +77,7 @@
public void open() throws HyracksDataException {
accessor = new FrameTupleAccessor(treeIndexHelper.getHyracksTaskContext().getFrameSize(), inputRecDesc);
writer.open();
- try {
+ try {
treeIndexHelper.init(false);
treeIndex = (ITreeIndex) treeIndexHelper.getIndex();
cursorFrame = treeIndex.getLeafFrameFactory().createFrame();
@@ -83,11 +87,12 @@
dos = tb.getDataOutput();
appender = new FrameTupleAppender(treeIndexHelper.getHyracksTaskContext().getFrameSize());
appender.reset(writeBuffer, true);
- indexAccessor = treeIndex.createAccessor(treeIndexHelper.getModificationOperationCallback(),
- treeIndexHelper.getSearchOperationCallback());
+ ISearchOperationCallback searchCallback = opDesc.getOpCallbackProvider().getSearchOperationCallback(
+ treeIndexHelper.getResourceID());
+ indexAccessor = treeIndex.createAccessor(NoOpOperationCallback.INSTANCE, searchCallback);
cursor = createCursor();
if (retainInput) {
- frameTuple = new FrameTupleReference();
+ frameTuple = new FrameTupleReference();
}
} catch (Exception e) {
treeIndexHelper.deinit();
@@ -100,9 +105,9 @@
tb.reset();
cursor.next();
if (retainInput) {
- frameTuple.reset(accessor, tupleIndex);
+ frameTuple.reset(accessor, tupleIndex);
for (int i = 0; i < frameTuple.getFieldCount(); i++) {
- dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
+ dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
tb.addFieldEndOffset();
}
}