Added LSM insert/delete operator that uses 'try' operations and partial flushing of Hyracks frames to avoid deadlocks with LSM component flushes.
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@2414 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
index 4480c26..c9bddef 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
@@ -35,18 +35,18 @@
import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
public class IndexInsertUpdateDeleteOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
- private final IIndexOperatorDescriptor opDesc;
- private final IHyracksTaskContext ctx;
- private final IIndexDataflowHelper indexHelper;
- private final IRecordDescriptorProvider recordDescProvider;
- private final IndexOperation op;
- private final PermutingFrameTupleReference tuple = new PermutingFrameTupleReference();
- private FrameTupleAccessor accessor;
- private FrameTupleReference frameTuple;
- private ByteBuffer writeBuffer;
- private IIndexAccessor indexAccessor;
- private ITupleFilter tupleFilter;
- private IModificationOperationCallback modCallback;
+ protected final IIndexOperatorDescriptor opDesc;
+ protected final IHyracksTaskContext ctx;
+ protected final IIndexDataflowHelper indexHelper;
+ protected final IRecordDescriptorProvider recordDescProvider;
+ protected final IndexOperation op;
+ protected final PermutingFrameTupleReference tuple = new PermutingFrameTupleReference();
+ protected FrameTupleAccessor accessor;
+ protected FrameTupleReference frameTuple;
+ protected ByteBuffer writeBuffer;
+ protected IIndexAccessor indexAccessor;
+ protected ITupleFilter tupleFilter;
+ protected IModificationOperationCallback modCallback;
public IndexInsertUpdateDeleteOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
int partition, int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider, IndexOperation op) {
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java
index 9c08645..d46c5c2 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java
@@ -27,6 +27,8 @@
public interface ILSMHarness {
public boolean insertUpdateOrDelete(ITupleReference tuple, ILSMIndexOperationContext ictx, boolean tryOperation)
throws HyracksDataException, IndexException;
+
+ public boolean noOp(ILSMIndexOperationContext ictx, boolean tryOperation) throws HyracksDataException;
public List<Object> search(IIndexCursor cursor, ISearchPredicate pred, ILSMIndexOperationContext ctx,
boolean includeMemComponent) throws HyracksDataException, IndexException;
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
index c6564b4..1ad1924 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
@@ -121,4 +121,25 @@
* If there is no matching tuple in the index.
*/
public boolean tryUpsert(ITupleReference tuple) throws HyracksDataException, IndexException;
+
+ /**
+ * This method can be used to increase the number of 'active' operations of an index artificially,
+ * without actually modifying the index.
+ * If the operation would have to wait for a flush then we return false.
+ * Otherwise, beforeOperation() and afterOperation() of the ILSMOperationTracker are called,
+ * and true is returned.
+ *
+ * @throws HyracksDataException
+ */
+ public boolean tryNoOp() throws HyracksDataException;
+
+ /**
+ * This method can be used to increase the number of 'active' operations of an index artificially,
+ * without actually modifying the index.
+ * This method may block waiting for a flush to finish, and will eventually call
+ * beforeOperation() and afterOperation() of the ILSMOperationTracker.
+ *
+ * @throws HyracksDataException
+ */
+ public void noOp() throws HyracksDataException;
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java
new file mode 100644
index 0000000..51f979f
--- /dev/null
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.lsm.common.dataflow;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexInsertUpdateDeleteOperatorNodePushable;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+
+public class LSMIndexInsertUpdateDeleteOperatorNodePushable extends IndexInsertUpdateDeleteOperatorNodePushable {
+
+ protected FrameTupleAppender appender;
+
+ public LSMIndexInsertUpdateDeleteOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
+ int partition, int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider, IndexOperation op) {
+ super(opDesc, ctx, partition, fieldPermutation, recordDescProvider, op);
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ accessor.reset(buffer);
+ ILSMIndexAccessor lsmAccessor = (ILSMIndexAccessor) indexAccessor;
+ int lastFlushedTupleIndex = 0;
+ int tupleCount = accessor.getTupleCount();
+ for (int i = 0; i < tupleCount; i++) {
+ try {
+ if (tupleFilter != null) {
+ frameTuple.reset(accessor, i);
+ if (!tupleFilter.accept(frameTuple)) {
+ if (!lsmAccessor.tryNoOp()) {
+ flushPartialFrame(lastFlushedTupleIndex, i);
+ lastFlushedTupleIndex = (i == 0) ? 0 : i - 1;
+ lsmAccessor.noOp();
+ }
+ continue;
+ }
+ }
+ tuple.reset(accessor, i);
+
+ switch (op) {
+ case INSERT: {
+ if (!lsmAccessor.tryInsert(tuple)) {
+ flushPartialFrame(lastFlushedTupleIndex, i);
+ lastFlushedTupleIndex = (i == 0) ? 0 : i - 1;
+ lsmAccessor.insert(tuple);
+ }
+ break;
+ }
+ case DELETE: {
+ if (!lsmAccessor.tryDelete(tuple)) {
+ flushPartialFrame(lastFlushedTupleIndex, i);
+ lastFlushedTupleIndex = (i == 0) ? 0 : i - 1;
+ lsmAccessor.delete(tuple);
+ }
+ break;
+ }
+ case UPSERT: {
+ if (!lsmAccessor.tryUpsert(tuple)) {
+ flushPartialFrame(lastFlushedTupleIndex, i);
+ lastFlushedTupleIndex = (i == 0) ? 0 : i - 1;
+ lsmAccessor.upsert(tuple);
+ }
+ break;
+ }
+ case UPDATE: {
+ if (!lsmAccessor.tryUpdate(tuple)) {
+ flushPartialFrame(lastFlushedTupleIndex, i);
+ lastFlushedTupleIndex = (i == 0) ? 0 : i - 1;
+ lsmAccessor.update(tuple);
+ }
+ break;
+ }
+ default: {
+ throw new HyracksDataException("Unsupported operation " + op
+ + " in tree index InsertUpdateDelete operator");
+ }
+ }
+ } catch (HyracksDataException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ if (lastFlushedTupleIndex == 0) {
+ // No partial flushing was necessary. Forward entire frame.
+ System.arraycopy(buffer.array(), 0, writeBuffer.array(), 0, buffer.capacity());
+ FrameUtils.flushFrame(writeBuffer, writer);
+ } else {
+ // Flush remaining partial frame.
+ flushPartialFrame(lastFlushedTupleIndex, tupleCount);
+ }
+ }
+
+ private void flushPartialFrame(int startTupleIndex, int endTupleIndex) throws HyracksDataException {
+ if (appender == null) {
+ appender = new FrameTupleAppender(ctx.getFrameSize());
+ }
+ appender.reset(writeBuffer, true);
+ for (int i = startTupleIndex; i < endTupleIndex; i++) {
+ if (!appender.append(accessor, i)) {
+ throw new IllegalStateException("Failed to append tuple into frame.");
+ }
+ }
+ FrameUtils.flushFrame(writeBuffer, writer);
+ }
+}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.java
new file mode 100644
index 0000000..d763356
--- /dev/null
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common.dataflow;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.ITupleFilterFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
+import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.hyracks.storage.common.file.NoOpLocalResourceFactoryProvider;
+
+public class LSMTreeIndexInsertUpdateDeleteOperatorDescriptor extends AbstractTreeIndexOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+
+ private final int[] fieldPermutation;
+ private final IndexOperation op;
+
+ public LSMTreeIndexInsertUpdateDeleteOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor recDesc,
+ IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lifecycleManagerProvider,
+ IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
+ IBinaryComparatorFactory[] comparatorFactories, int[] fieldPermutation, IndexOperation op,
+ IIndexDataflowHelperFactory dataflowHelperFactory, ITupleFilterFactory tupleFilterFactory,
+ IModificationOperationCallbackFactory modificationOpCallbackProvider) {
+ super(spec, 1, 1, recDesc, storageManager, lifecycleManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, dataflowHelperFactory, tupleFilterFactory, false,
+ NoOpLocalResourceFactoryProvider.INSTANCE, NoOpOperationCallbackFactory.INSTANCE,
+ modificationOpCallbackProvider);
+ this.fieldPermutation = fieldPermutation;
+ this.op = op;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ return new LSMIndexInsertUpdateDeleteOperatorNodePushable(this, ctx, partition, fieldPermutation,
+ recordDescProvider, op);
+ }
+}
\ No newline at end of file
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index c4ac36e..00020e6 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -106,6 +106,15 @@
return true;
}
+ @Override
+ public boolean noOp(ILSMIndexOperationContext ctx, boolean tryOperation) throws HyracksDataException {
+ if (!opTracker.beforeOperation(ctx, tryOperation)) {
+ return false;
+ }
+ threadExit(ctx);
+ return true;
+ }
+
public void flush(ILSMIOOperation operation) throws HyracksDataException, IndexException {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Flushing LSM-Index: " + lsmIndex);
@@ -281,5 +290,4 @@
public ILSMIndex getIndex() {
return lsmIndex;
}
-
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
index fd0d704..d752ab4 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
@@ -113,4 +113,14 @@
IndexException {
return lsmHarness.createMergeOperation(callback);
}
+
+ @Override
+ public boolean tryNoOp() throws HyracksDataException {
+ return lsmHarness.noOp(ctx, true);
+ }
+
+ @Override
+ public void noOp() throws HyracksDataException {
+ lsmHarness.noOp(ctx, false);
+ }
}
\ No newline at end of file
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexInsertUpdateDeleteOperator.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexInsertUpdateDeleteOperator.java
index 4b455e7..963c653 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexInsertUpdateDeleteOperator.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexInsertUpdateDeleteOperator.java
@@ -25,9 +25,9 @@
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexInsertUpdateDeleteOperatorNodePushable;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
import edu.uci.ics.hyracks.storage.common.file.NoOpLocalResourceFactoryProvider;
@@ -46,7 +46,7 @@
IBinaryComparatorFactory[] invListComparatorFactories, IBinaryTokenizerFactory tokenizerFactory,
int[] fieldPermutation, IndexOperation op, IIndexDataflowHelperFactory dataflowHelperFactory,
IModificationOperationCallbackFactory modificationOpCallbackFactory) {
- super(spec, 0, 0, null, storageManager, fileSplitProvider, lifecycleManagerProvider, tokenTypeTraits,
+ super(spec, 1, 1, null, storageManager, fileSplitProvider, lifecycleManagerProvider, tokenTypeTraits,
tokenComparatorFactories, invListsTypeTraits, invListComparatorFactories, tokenizerFactory,
dataflowHelperFactory, null, false, NoOpLocalResourceFactoryProvider.INSTANCE,
NoOpOperationCallbackFactory.INSTANCE, modificationOpCallbackFactory);
@@ -57,7 +57,7 @@
@Override
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
- return new IndexInsertUpdateDeleteOperatorNodePushable(this, ctx, partition, fieldPermutation,
+ return new LSMIndexInsertUpdateDeleteOperatorNodePushable(this, ctx, partition, fieldPermutation,
recordDescProvider, op);
}
}
\ No newline at end of file
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
index 45d21e9..cd870b4 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
@@ -119,6 +119,16 @@
}
@Override
+ public boolean tryNoOp() throws HyracksDataException {
+ return lsmHarness.noOp(ctx, true);
+ }
+
+ @Override
+ public void noOp() throws HyracksDataException {
+ lsmHarness.noOp(ctx, false);
+ }
+
+ @Override
public void physicalDelete(ITupleReference tuple) throws HyracksDataException, IndexException {
throw new UnsupportedOperationException("Physical delete not supported by lsm inverted index.");
}