Added LSM insert/delete operator that uses 'try' operations and partial flushing of Hyracks frames to avoid deadlocks with LSM component flushes.

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@2414 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
index 4480c26..c9bddef 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
@@ -35,18 +35,18 @@
 import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
 
 public class IndexInsertUpdateDeleteOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
-    private final IIndexOperatorDescriptor opDesc;
-    private final IHyracksTaskContext ctx;
-    private final IIndexDataflowHelper indexHelper;
-    private final IRecordDescriptorProvider recordDescProvider;
-    private final IndexOperation op;
-    private final PermutingFrameTupleReference tuple = new PermutingFrameTupleReference();
-    private FrameTupleAccessor accessor;
-    private FrameTupleReference frameTuple;
-    private ByteBuffer writeBuffer;
-    private IIndexAccessor indexAccessor;
-    private ITupleFilter tupleFilter;
-    private IModificationOperationCallback modCallback;
+    protected final IIndexOperatorDescriptor opDesc;
+    protected final IHyracksTaskContext ctx;
+    protected final IIndexDataflowHelper indexHelper;
+    protected final IRecordDescriptorProvider recordDescProvider;
+    protected final IndexOperation op;
+    protected final PermutingFrameTupleReference tuple = new PermutingFrameTupleReference();
+    protected FrameTupleAccessor accessor;
+    protected FrameTupleReference frameTuple;
+    protected ByteBuffer writeBuffer;
+    protected IIndexAccessor indexAccessor;
+    protected ITupleFilter tupleFilter;
+    protected IModificationOperationCallback modCallback;
 
     public IndexInsertUpdateDeleteOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
             int partition, int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider, IndexOperation op) {
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java
index 9c08645..d46c5c2 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java
@@ -27,6 +27,8 @@
 public interface ILSMHarness {
     public boolean insertUpdateOrDelete(ITupleReference tuple, ILSMIndexOperationContext ictx, boolean tryOperation)
             throws HyracksDataException, IndexException;
+    
+    public boolean noOp(ILSMIndexOperationContext ictx, boolean tryOperation) throws HyracksDataException;            
 
     public List<Object> search(IIndexCursor cursor, ISearchPredicate pred, ILSMIndexOperationContext ctx,
             boolean includeMemComponent) throws HyracksDataException, IndexException;
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
index c6564b4..1ad1924 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
@@ -121,4 +121,25 @@
      *             If there is no matching tuple in the index.
      */
     public boolean tryUpsert(ITupleReference tuple) throws HyracksDataException, IndexException;
+    
+    /**
+     * This method can be used to increase the number of 'active' operations of an index artificially,
+     * without actually modifying the index.
+     * If the operation would have to wait for a flush then we return false.
+     * Otherwise, beforeOperation() and afterOperation() of the ILSMOperationTracker are called,
+     * and true is returned.
+     * 
+     * @throws HyracksDataException
+     */
+    public boolean tryNoOp() throws HyracksDataException;
+    
+    /**
+     * This method can be used to increase the number of 'active' operations of an index artificially,
+     * without actually modifying the index.
+     * This method may block waiting for a flush to finish, and will eventually call
+     * beforeOperation() and afterOperation() of the ILSMOperationTracker.
+     * 
+     * @throws HyracksDataException
+     */
+    public void noOp() throws HyracksDataException;
 }
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java
new file mode 100644
index 0000000..51f979f
--- /dev/null
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.lsm.common.dataflow;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexInsertUpdateDeleteOperatorNodePushable;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+
+public class LSMIndexInsertUpdateDeleteOperatorNodePushable extends IndexInsertUpdateDeleteOperatorNodePushable {
+
+    protected FrameTupleAppender appender;
+
+    public LSMIndexInsertUpdateDeleteOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
+            int partition, int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider, IndexOperation op) {
+        super(opDesc, ctx, partition, fieldPermutation, recordDescProvider, op);
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        accessor.reset(buffer);
+        ILSMIndexAccessor lsmAccessor = (ILSMIndexAccessor) indexAccessor;
+        int lastFlushedTupleIndex = 0;
+        int tupleCount = accessor.getTupleCount();
+        for (int i = 0; i < tupleCount; i++) {
+            try {
+                if (tupleFilter != null) {
+                    frameTuple.reset(accessor, i);
+                    if (!tupleFilter.accept(frameTuple)) {
+                        if (!lsmAccessor.tryNoOp()) {
+                            flushPartialFrame(lastFlushedTupleIndex, i);
+                            lastFlushedTupleIndex = (i == 0) ? 0 : i - 1;
+                            lsmAccessor.noOp();
+                        }
+                        continue;
+                    }
+                }
+                tuple.reset(accessor, i);
+
+                switch (op) {
+                    case INSERT: {
+                        if (!lsmAccessor.tryInsert(tuple)) {
+                            flushPartialFrame(lastFlushedTupleIndex, i);
+                            lastFlushedTupleIndex = (i == 0) ? 0 : i - 1;
+                            lsmAccessor.insert(tuple);
+                        }
+                        break;
+                    }
+                    case DELETE: {
+                        if (!lsmAccessor.tryDelete(tuple)) {
+                            flushPartialFrame(lastFlushedTupleIndex, i);
+                            lastFlushedTupleIndex = (i == 0) ? 0 : i - 1;
+                            lsmAccessor.delete(tuple);
+                        }
+                        break;
+                    }
+                    case UPSERT: {
+                        if (!lsmAccessor.tryUpsert(tuple)) {
+                            flushPartialFrame(lastFlushedTupleIndex, i);
+                            lastFlushedTupleIndex = (i == 0) ? 0 : i - 1;
+                            lsmAccessor.upsert(tuple);
+                        }
+                        break;
+                    }
+                    case UPDATE: {
+                        if (!lsmAccessor.tryUpdate(tuple)) {
+                            flushPartialFrame(lastFlushedTupleIndex, i);
+                            lastFlushedTupleIndex = (i == 0) ? 0 : i - 1;
+                            lsmAccessor.update(tuple);
+                        }
+                        break;
+                    }
+                    default: {
+                        throw new HyracksDataException("Unsupported operation " + op
+                                + " in tree index InsertUpdateDelete operator");
+                    }
+                }
+            } catch (HyracksDataException e) {
+                throw e;
+            } catch (Exception e) {
+                throw new HyracksDataException(e);
+            }
+        }
+        if (lastFlushedTupleIndex == 0) {
+            // No partial flushing was necessary. Forward entire frame.
+            System.arraycopy(buffer.array(), 0, writeBuffer.array(), 0, buffer.capacity());
+            FrameUtils.flushFrame(writeBuffer, writer);
+        } else {
+            // Flush remaining partial frame.
+            flushPartialFrame(lastFlushedTupleIndex, tupleCount);
+        }
+    }
+
+    private void flushPartialFrame(int startTupleIndex, int endTupleIndex) throws HyracksDataException {
+        if (appender == null) {
+            appender = new FrameTupleAppender(ctx.getFrameSize());
+        }
+        appender.reset(writeBuffer, true);
+        for (int i = startTupleIndex; i < endTupleIndex; i++) {
+            if (!appender.append(accessor, i)) {
+                throw new IllegalStateException("Failed to append tuple into frame.");
+            }
+        }
+        FrameUtils.flushFrame(writeBuffer, writer);
+    }
+}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.java
new file mode 100644
index 0000000..d763356
--- /dev/null
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common.dataflow;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.ITupleFilterFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
+import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.hyracks.storage.common.file.NoOpLocalResourceFactoryProvider;
+
+public class LSMTreeIndexInsertUpdateDeleteOperatorDescriptor extends AbstractTreeIndexOperatorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
+    private final int[] fieldPermutation;
+    private final IndexOperation op;
+
+    public LSMTreeIndexInsertUpdateDeleteOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor recDesc,
+            IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lifecycleManagerProvider,
+            IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
+            IBinaryComparatorFactory[] comparatorFactories, int[] fieldPermutation, IndexOperation op,
+            IIndexDataflowHelperFactory dataflowHelperFactory, ITupleFilterFactory tupleFilterFactory,
+            IModificationOperationCallbackFactory modificationOpCallbackProvider) {
+        super(spec, 1, 1, recDesc, storageManager, lifecycleManagerProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, dataflowHelperFactory, tupleFilterFactory, false,
+                NoOpLocalResourceFactoryProvider.INSTANCE, NoOpOperationCallbackFactory.INSTANCE,
+                modificationOpCallbackProvider);
+        this.fieldPermutation = fieldPermutation;
+        this.op = op;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+        return new LSMIndexInsertUpdateDeleteOperatorNodePushable(this, ctx, partition, fieldPermutation,
+                recordDescProvider, op);
+    }
+}
\ No newline at end of file
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index c4ac36e..00020e6 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -106,6 +106,15 @@
         return true;
     }
 
+    @Override
+    public boolean noOp(ILSMIndexOperationContext ctx, boolean tryOperation) throws HyracksDataException {
+        if (!opTracker.beforeOperation(ctx, tryOperation)) {
+            return false;
+        }
+        threadExit(ctx);
+        return true;
+    }
+    
     public void flush(ILSMIOOperation operation) throws HyracksDataException, IndexException {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Flushing LSM-Index: " + lsmIndex);
@@ -281,5 +290,4 @@
     public ILSMIndex getIndex() {
         return lsmIndex;
     }
-
 }
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
index fd0d704..d752ab4 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
@@ -113,4 +113,14 @@
             IndexException {
         return lsmHarness.createMergeOperation(callback);
     }
+    
+    @Override
+    public boolean tryNoOp() throws HyracksDataException {
+        return lsmHarness.noOp(ctx, true);
+    }
+
+    @Override
+    public void noOp() throws HyracksDataException {
+        lsmHarness.noOp(ctx, false);
+    }
 }
\ No newline at end of file
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexInsertUpdateDeleteOperator.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexInsertUpdateDeleteOperator.java
index 4b455e7..963c653 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexInsertUpdateDeleteOperator.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexInsertUpdateDeleteOperator.java
@@ -25,9 +25,9 @@
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
 import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexInsertUpdateDeleteOperatorNodePushable;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
 import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
 import edu.uci.ics.hyracks.storage.common.file.NoOpLocalResourceFactoryProvider;
@@ -46,7 +46,7 @@
             IBinaryComparatorFactory[] invListComparatorFactories, IBinaryTokenizerFactory tokenizerFactory,
             int[] fieldPermutation, IndexOperation op, IIndexDataflowHelperFactory dataflowHelperFactory,
             IModificationOperationCallbackFactory modificationOpCallbackFactory) {
-        super(spec, 0, 0, null, storageManager, fileSplitProvider, lifecycleManagerProvider, tokenTypeTraits,
+        super(spec, 1, 1, null, storageManager, fileSplitProvider, lifecycleManagerProvider, tokenTypeTraits,
                 tokenComparatorFactories, invListsTypeTraits, invListComparatorFactories, tokenizerFactory,
                 dataflowHelperFactory, null, false, NoOpLocalResourceFactoryProvider.INSTANCE,
                 NoOpOperationCallbackFactory.INSTANCE, modificationOpCallbackFactory);
@@ -57,7 +57,7 @@
     @Override
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
-        return new IndexInsertUpdateDeleteOperatorNodePushable(this, ctx, partition, fieldPermutation,
+        return new LSMIndexInsertUpdateDeleteOperatorNodePushable(this, ctx, partition, fieldPermutation,
                 recordDescProvider, op);
     }
 }
\ No newline at end of file
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
index 45d21e9..cd870b4 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
@@ -119,6 +119,16 @@
     }
 
     @Override
+    public boolean tryNoOp() throws HyracksDataException {
+        return lsmHarness.noOp(ctx, true);
+    }
+
+    @Override
+    public void noOp() throws HyracksDataException {
+        lsmHarness.noOp(ctx, false);
+    }
+
+    @Override
     public void physicalDelete(ITupleReference tuple) throws HyracksDataException, IndexException {
         throw new UnsupportedOperationException("Physical delete not supported by lsm inverted index.");
     }