change indexes in a dataset to share indexoptracker incl. partitions on same machine
add specialized LSM insert/delete operators at the asterix level
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
index 8170624..4ea338c 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
@@ -18,11 +18,6 @@
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
import edu.uci.ics.asterix.common.transactions.ITransactionSubsystem;
-import edu.uci.ics.asterix.transaction.management.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
-import edu.uci.ics.asterix.transaction.management.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
-import edu.uci.ics.asterix.transaction.management.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
-import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerFactory;
-import edu.uci.ics.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerFactory;
import edu.uci.ics.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import edu.uci.ics.asterix.transaction.management.resource.PersistentLocalResourceRepositoryFactory;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
@@ -33,7 +28,7 @@
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SynchronousScheduler;
import edu.uci.ics.hyracks.storage.common.buffercache.BufferCache;
@@ -68,10 +63,6 @@
private ITransactionSubsystem txnSubsystem;
private ILSMMergePolicy mergePolicy;
- private ILSMOperationTrackerFactory lsmBTreeSecondaryOpTrackerFactory;
- private ILSMOperationTrackerFactory lsmBTreePrimaryOpTrackerFactory;
- private ILSMOperationTrackerFactory lsmRTreeOpTrackerFactory;
- private ILSMOperationTrackerFactory lsmInvertedIndexOpTrackerFactory;
private ILSMIOOperationScheduler lsmIOScheduler;
private ILocalResourceRepository localResourceRepository;
private ResourceIdFactory resourceIdFactory;
@@ -103,14 +94,6 @@
lsmIOScheduler = SynchronousScheduler.INSTANCE;
mergePolicy = new ConstantMergePolicy(storageProperties.getLSMIndexMergeThreshold(), this);
- lsmBTreePrimaryOpTrackerFactory = new PrimaryIndexOperationTrackerFactory(
- LSMBTreeIOOperationCallbackFactory.INSTANCE);
- lsmBTreeSecondaryOpTrackerFactory = new SecondaryIndexOperationTrackerFactory(
- LSMBTreeIOOperationCallbackFactory.INSTANCE);
- lsmRTreeOpTrackerFactory = new SecondaryIndexOperationTrackerFactory(
- LSMRTreeIOOperationCallbackFactory.INSTANCE);
- lsmInvertedIndexOpTrackerFactory = new SecondaryIndexOperationTrackerFactory(
- LSMInvertedIndexIOOperationCallbackFactory.INSTANCE);
ILocalResourceRepositoryFactory persistentLocalResourceRepositoryFactory = new PersistentLocalResourceRepositoryFactory(
ioManager);
@@ -163,18 +146,6 @@
return storageProperties.getBloomFilterFalsePositiveRate();
}
- public ILSMOperationTrackerFactory getLSMBTreeOperationTrackerFactory(boolean isPrimary) {
- return isPrimary ? lsmBTreePrimaryOpTrackerFactory : lsmBTreeSecondaryOpTrackerFactory;
- }
-
- public ILSMOperationTrackerFactory getLSMRTreeOperationTrackerFactory() {
- return lsmRTreeOpTrackerFactory;
- }
-
- public ILSMOperationTrackerFactory getLSMInvertedIndexOperationTrackerFactory() {
- return lsmInvertedIndexOpTrackerFactory;
- }
-
public ILSMIOOperationScheduler getLSMIOScheduler() {
return lsmIOScheduler;
}
@@ -224,4 +195,9 @@
public IVirtualBufferCache getVirtualBufferCache(int datasetID) {
return indexLifecycleManager.getVirtualBufferCache(datasetID);
}
+
+ @Override
+ public ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID) {
+ return indexLifecycleManager.getOperationTracker(datasetID);
+ }
}
\ No newline at end of file
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java
index 18d193b..1035f0c 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java
@@ -7,7 +7,7 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
@@ -73,21 +73,6 @@
}
@Override
- public ILSMOperationTrackerFactory getLSMBTreeOperationTrackerFactory(boolean isPrimary) {
- return asterixAppRuntimeContext.getLSMBTreeOperationTrackerFactory(isPrimary);
- }
-
- @Override
- public ILSMOperationTrackerFactory getLSMRTreeOperationTrackerFactory() {
- return asterixAppRuntimeContext.getLSMRTreeOperationTrackerFactory();
- }
-
- @Override
- public ILSMOperationTrackerFactory getLSMInvertedIndexOperationTrackerFactory() {
- return asterixAppRuntimeContext.getLSMInvertedIndexOperationTrackerFactory();
- }
-
- @Override
public IVirtualBufferCache getVirtualBufferCache(int datasetID) {
return asterixAppRuntimeContext.getVirtualBufferCache(datasetID);
}
@@ -115,4 +100,9 @@
// TODO Auto-generated method stub
return null;
}
+
+ @Override
+ public ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID) {
+ return asterixAppRuntimeContext.getLSMBTreeOperationTracker(datasetID);
+ }
}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
index 9129981..47aaac8 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
@@ -42,6 +42,7 @@
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerProvider;
import edu.uci.ics.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadata;
import edu.uci.ics.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
@@ -126,7 +127,7 @@
AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
splitsAndConstraint.first, new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
dataset.getDatasetId()), AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
+ new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
storageProperties.getBloomFilterFalsePositiveRate()));
@@ -179,7 +180,7 @@
splitsAndConstraint.first, typeTraits, comparatorFactories, blooFilterKeyFields,
new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
+ new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER, storageProperties
.getBloomFilterFalsePositiveRate()), localResourceFactoryProvider,
@@ -270,7 +271,7 @@
GlobalConfig.DEFAULT_BTREE_FILL_FACTOR, false, numElementsHint, new LSMBTreeDataflowHelperFactory(
new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
+ new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
storageProperties.getBloomFilterFalsePositiveRate()), NoOpOperationCallbackFactory.INSTANCE);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
index c102d0c..46f5b1a 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
@@ -40,6 +40,7 @@
import edu.uci.ics.asterix.runtime.evaluators.functions.AndDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.IsNullDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.functions.NotDescriptor;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerProvider;
import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
import edu.uci.ics.asterix.translator.CompiledStatements.CompiledCreateIndexStatement;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -282,7 +283,7 @@
primaryBloomFilterKeyFields, lowKeyFields, highKeyFields, true, true,
new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
+ new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER, storageProperties
.getBloomFilterFalsePositiveRate()), false,
diff --git a/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterix-app/src/test/resources/runtimets/testsuite.xml
index f609bf4..1d94da9 100644
--- a/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -2113,6 +2113,11 @@
<output-dir compare="Text">float_01</output-dir>
</compilation-unit>
</test-case>
+ <!-- <test-case FilePath="misc">
+ <compilation-unit name="flushtest">
+ <output-dir compare="Text">flushtest</output-dir>
+ </compilation-unit>
+ </test-case> -->
<test-case FilePath="misc">
<compilation-unit name="groupby-orderby-count">
<output-dir compare="Text">groupby-orderby-count</output-dir>
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/IAsterixAppRuntimeContext.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/IAsterixAppRuntimeContext.java
index 6bfd3b47..964b761 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/IAsterixAppRuntimeContext.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/IAsterixAppRuntimeContext.java
@@ -9,7 +9,7 @@
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
@@ -38,7 +38,7 @@
public ResourceIdFactory getResourceIdFactory();
- public ILSMOperationTrackerFactory getLSMBTreeOperationTrackerFactory(boolean isPrimary);
+ public ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID);
public void initialize() throws IOException, ACIDException, AsterixException;
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/IAsterixRuntimeComponentsProvider.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/IAsterixRuntimeComponentsProvider.java
index d03b15c..d7bc0f3 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/IAsterixRuntimeComponentsProvider.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/IAsterixRuntimeComponentsProvider.java
@@ -5,7 +5,7 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
import edu.uci.ics.hyracks.storage.common.file.ILocalResourceRepository;
@@ -23,11 +23,11 @@
public ILSMMergePolicy getLSMMergePolicy();
- public ILSMOperationTrackerFactory getLSMBTreeOperationTrackerFactory();
+ public ILSMOperationTrackerProvider getLSMBTreeOperationTrackerFactory();
- public ILSMOperationTrackerFactory getLSMRTreeOperationTrackerFactory();
+ public ILSMOperationTrackerProvider getLSMRTreeOperationTrackerFactory();
- public ILSMOperationTrackerFactory getLSMInvertedIndexOperationTrackerFactory();
+ public ILSMOperationTrackerProvider getLSMInvertedIndexOperationTrackerFactory();
public ILSMIOOperationCallbackProvider getLSMBTreeIOOperationCallbackProvider();
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixStorageProperties.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixStorageProperties.java
index 8ea64b8..253b30c 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixStorageProperties.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixStorageProperties.java
@@ -15,7 +15,7 @@
private static final int STORAGE_MEMORYCOMPONENT_PAGESIZE_DEFAULT = (32 << 10); // 32KB
private static final String STORAGE_MEMORYCOMPONENT_NUMPAGES_KEY = "storage.memorycomponent.numpages";
- private static final int STORAGE_MEMORYCOMPONENT_NUMPAGES_DEFAULT = 1024; // ... so 32MB components
+ private static final int STORAGE_MEMORYCOMPONENT_NUMPAGES_DEFAULT = 8; // ... so 32MB components
private static final String STORAGE_MEMORYCOMPONENT_GLOBALBUDGET_KEY = "storage.memorycomponent.globalbudget";
private static final long STORAGE_MEMORYCOMPONENT_GLOBALBUDGET_DEFAULT = 536870912; // 512MB
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/BaseOperationTracker.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/BaseOperationTracker.java
similarity index 77%
rename from asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/BaseOperationTracker.java
rename to asterix-common/src/main/java/edu/uci/ics/asterix/common/context/BaseOperationTracker.java
index af601d4..2e50bf6 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/BaseOperationTracker.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/BaseOperationTracker.java
@@ -1,4 +1,4 @@
-package edu.uci.ics.asterix.transaction.management.opcallbacks;
+package edu.uci.ics.asterix.common.context;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
@@ -12,13 +12,11 @@
public class BaseOperationTracker implements ILSMOperationTracker {
- protected final ILSMIndex index;
protected final ILSMIOOperationCallback ioOpCallback;
protected long lastLSN;
protected long firstLSN;
- public BaseOperationTracker(ILSMIndex index, ILSMIOOperationCallbackFactory ioOpCallbackFactory) {
- this.index = index;
+ public BaseOperationTracker(ILSMIOOperationCallbackFactory ioOpCallbackFactory) {
this.ioOpCallback = ioOpCallbackFactory == null ? NoOpIOOperationCallback.INSTANCE : ioOpCallbackFactory
.createIOOperationCallback(this);
resetLSNs();
@@ -49,17 +47,17 @@
}
@Override
- public void beforeOperation(LSMOperationType opType, ISearchOperationCallback searchCallback,
+ public void beforeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback) throws HyracksDataException {
}
@Override
- public void afterOperation(LSMOperationType opType, ISearchOperationCallback searchCallback,
+ public void afterOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback) throws HyracksDataException {
}
@Override
- public void completeOperation(LSMOperationType opType, ISearchOperationCallback searchCallback,
+ public void completeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback) throws HyracksDataException {
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
index 96f5330..25bfbd1 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
@@ -3,15 +3,19 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import edu.uci.ics.asterix.common.api.ILocalResourceMetadata;
import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
+import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.MultitenantVirtualBufferCache;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.VirtualBufferCache;
@@ -22,6 +26,7 @@
public class DatasetLifecycleManager implements IIndexLifecycleManager {
private final AsterixStorageProperties storageProperties;
private final Map<Integer, MultitenantVirtualBufferCache> datasetVirtualBufferCaches;
+ private final Map<Integer, ILSMOperationTracker> datasetOpTrackers;
private final Map<Integer, DatasetInfo> datasetInfos;
private final ILocalResourceRepository resourceRepository;
private final long capacity;
@@ -32,6 +37,7 @@
this.storageProperties = storageProperties;
this.resourceRepository = resourceRepository;
datasetVirtualBufferCaches = new HashMap<Integer, MultitenantVirtualBufferCache>();
+ datasetOpTrackers = new HashMap<Integer, ILSMOperationTracker>();
datasetInfos = new HashMap<Integer, DatasetInfo>();
capacity = storageProperties.getMemoryComponentGlobalBudget();
used = 0;
@@ -196,6 +202,31 @@
}
}
+ public ILSMOperationTracker getOperationTracker(int datasetID) {
+ synchronized (datasetOpTrackers) {
+ ILSMOperationTracker opTracker = datasetOpTrackers.get(datasetID);
+ if (opTracker == null) {
+ opTracker = new PrimaryIndexOperationTracker(this, datasetID,
+ LSMBTreeIOOperationCallbackFactory.INSTANCE);
+ datasetOpTrackers.put(datasetID, opTracker);
+ }
+
+ return opTracker;
+ }
+ }
+
+ public synchronized Set<ILSMIndex> getDatasetIndexes(int datasetID) throws HyracksDataException {
+ DatasetInfo dsInfo = datasetInfos.get(datasetID);
+ if (dsInfo == null) {
+ throw new HyracksDataException("No dataset found with datasetID " + datasetID);
+ }
+ Set<ILSMIndex> datasetIndexes = new HashSet<ILSMIndex>();
+ for (IndexInfo iInfo : dsInfo.indexes.values()) {
+ datasetIndexes.add(iInfo.index);
+ }
+ return datasetIndexes;
+ }
+
private static abstract class Info {
protected int referenceCount;
protected boolean isOpen;
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
new file mode 100644
index 0000000..0d7dd93
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.common.context;
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import edu.uci.ics.asterix.common.transactions.AbstractOperationCallback;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMOperationType;
+
+public class PrimaryIndexOperationTracker extends BaseOperationTracker {
+
+ private final DatasetLifecycleManager datasetLifecycleManager;
+ private final IVirtualBufferCache datasetBufferCache;
+ private final int datasetID;
+ // Number of active operations on a ILSMIndex instance.
+ private AtomicInteger numActiveOperations;
+
+ public PrimaryIndexOperationTracker(DatasetLifecycleManager datasetLifecycleManager, int datasetID,
+ ILSMIOOperationCallbackFactory ioOpCallbackFactory) {
+ super(ioOpCallbackFactory);
+ this.datasetLifecycleManager = datasetLifecycleManager;
+ this.numActiveOperations = new AtomicInteger(0);
+ this.datasetID = datasetID;
+ datasetBufferCache = datasetLifecycleManager.getVirtualBufferCache(datasetID);
+ }
+
+ @Override
+ public void beforeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
+ IModificationOperationCallback modificationCallback) throws HyracksDataException {
+ numActiveOperations.incrementAndGet();
+
+ // Increment transactor-local active operations count.
+ AbstractOperationCallback opCallback = getOperationCallback(searchCallback, modificationCallback);
+ if (opCallback != null) {
+ opCallback.incrementLocalNumActiveOperations();
+ }
+ }
+
+ @Override
+ public void afterOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
+ IModificationOperationCallback modificationCallback) throws HyracksDataException {
+ // Searches are immediately considered complete, because they should not prevent the execution of flushes.
+ if ((searchCallback != null && searchCallback != NoOpOperationCallback.INSTANCE)
+ || opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE) {
+ completeOperation(index, opType, searchCallback, modificationCallback);
+ }
+ }
+
+ @Override
+ public void completeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
+ IModificationOperationCallback modificationCallback) throws HyracksDataException {
+ int nActiveOps = numActiveOperations.decrementAndGet();
+
+ // Decrement transactor-local active operations count.
+ AbstractOperationCallback opCallback = getOperationCallback(searchCallback, modificationCallback);
+ if (opCallback != null) {
+ opCallback.decrementLocalNumActiveOperations();
+ }
+ // If we need a flush, and this is the last completing operation, then schedule the flush.
+ if (datasetBufferCache.isFull() && nActiveOps == 0 && opType != LSMOperationType.FLUSH) {
+ Set<ILSMIndex> indexes = datasetLifecycleManager.getDatasetIndexes(datasetID);
+ for (ILSMIndex lsmIndex : indexes) {
+ ILSMIndexAccessor accessor = (ILSMIndexAccessor) lsmIndex.createAccessor(
+ NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ accessor.scheduleFlush(ioOpCallback);
+ }
+
+ }
+ }
+
+ private AbstractOperationCallback getOperationCallback(ISearchOperationCallback searchCallback,
+ IModificationOperationCallback modificationCallback) {
+
+ if (modificationCallback == NoOpOperationCallback.INSTANCE || modificationCallback == null) {
+ return null;
+ }
+ if (searchCallback != null && searchCallback != NoOpOperationCallback.INSTANCE) {
+ return (AbstractOperationCallback) searchCallback;
+ } else {
+ return (AbstractOperationCallback) modificationCallback;
+ }
+ }
+
+}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
new file mode 100644
index 0000000..93a86e4
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
@@ -0,0 +1,71 @@
+package edu.uci.ics.asterix.common.dataflow;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
+
+public class AsterixLSMInsertDeleteOperatorNodePushable extends LSMIndexInsertUpdateDeleteOperatorNodePushable {
+
+ private final boolean isPrimary;
+
+ public AsterixLSMInsertDeleteOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
+ int partition, int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider, IndexOperation op,
+ boolean isPrimary) {
+ super(opDesc, ctx, partition, fieldPermutation, recordDescProvider, op);
+ this.isPrimary = isPrimary;
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ boolean first = true;
+ accessor.reset(buffer);
+ ILSMIndexAccessor lsmAccessor = (ILSMIndexAccessor) indexAccessor;
+ int tupleCount = accessor.getTupleCount();
+ try {
+ for (int i = 0; i < tupleCount; i++) {
+ if (tupleFilter != null) {
+ frameTuple.reset(accessor, i);
+ if (!tupleFilter.accept(frameTuple)) {
+ lsmAccessor.noOp();
+ continue;
+ }
+ }
+ tuple.reset(accessor, i);
+ switch (op) {
+ case INSERT:
+ if (first && isPrimary) {
+ lsmAccessor.insert(tuple);
+ first = false;
+ } else {
+ lsmAccessor.forceInsert(tuple);
+ }
+ break;
+ case DELETE:
+ if (first && isPrimary) {
+ lsmAccessor.delete(tuple);
+ first = false;
+ } else {
+ lsmAccessor.forceDelete(tuple);
+ }
+ break;
+ default: {
+ throw new HyracksDataException("Unsupported operation " + op
+ + " in tree index InsertDelete operator");
+ }
+ }
+ }
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ System.arraycopy(buffer.array(), 0, writeBuffer.array(), 0, buffer.capacity());
+ FrameUtils.flushFrame(writeBuffer, writer);
+ }
+
+}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor.java
new file mode 100644
index 0000000..15ad465
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor.java
@@ -0,0 +1,42 @@
+package edu.uci.ics.asterix.common.dataflow;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.ITupleFilterFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexInsertUpdateDeleteOperator;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
+import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+
+public class AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor extends LSMInvertedIndexInsertUpdateDeleteOperator {
+
+ private static final long serialVersionUID = 1L;
+
+ public AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor(IOperatorDescriptorRegistry spec,
+ RecordDescriptor recDesc, IStorageManagerInterface storageManager, IFileSplitProvider fileSplitProvider,
+ IIndexLifecycleManagerProvider lifecycleManagerProvider, ITypeTraits[] tokenTypeTraits,
+ IBinaryComparatorFactory[] tokenComparatorFactories, ITypeTraits[] invListsTypeTraits,
+ IBinaryComparatorFactory[] invListComparatorFactories, IBinaryTokenizerFactory tokenizerFactory,
+ int[] fieldPermutation, IndexOperation op, IIndexDataflowHelperFactory dataflowHelperFactory,
+ ITupleFilterFactory tupleFilterFactory, IModificationOperationCallbackFactory modificationOpCallbackFactory) {
+ super(spec, recDesc, storageManager, fileSplitProvider, lifecycleManagerProvider, tokenTypeTraits,
+ tokenComparatorFactories, invListsTypeTraits, invListComparatorFactories, tokenizerFactory,
+ fieldPermutation, op, dataflowHelperFactory, tupleFilterFactory, modificationOpCallbackFactory);
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ return new AsterixLSMInsertDeleteOperatorNodePushable(this, ctx, partition, fieldPermutation,
+ recordDescProvider, op, false);
+ }
+}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMTreeInsertDeleteOperatorDescriptor.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMTreeInsertDeleteOperatorDescriptor.java
new file mode 100644
index 0000000..b1bd6ce
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMTreeInsertDeleteOperatorDescriptor.java
@@ -0,0 +1,45 @@
+package edu.uci.ics.asterix.common.dataflow;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.ITupleFilterFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexInsertUpdateDeleteOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+
+public class AsterixLSMTreeInsertDeleteOperatorDescriptor extends LSMTreeIndexInsertUpdateDeleteOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+
+ private final boolean isPrimary;
+
+ public AsterixLSMTreeInsertDeleteOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor recDesc,
+ IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lifecycleManagerProvider,
+ IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
+ IBinaryComparatorFactory[] comparatorFactories, int[] bloomFilterKeyFields, int[] fieldPermutation,
+ IndexOperation op, IIndexDataflowHelperFactory dataflowHelperFactory,
+ ITupleFilterFactory tupleFilterFactory,
+ IModificationOperationCallbackFactory modificationOpCallbackProvider, boolean isPrimary) {
+ super(spec, recDesc, storageManager, lifecycleManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, bloomFilterKeyFields, fieldPermutation, op, dataflowHelperFactory,
+ tupleFilterFactory, modificationOpCallbackProvider);
+ this.isPrimary = isPrimary;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ return new AsterixLSMInsertDeleteOperatorNodePushable(this, ctx, partition, fieldPermutation,
+ recordDescProvider, op, isPrimary);
+ }
+
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/AbstractLSMIOOperationCallback.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
similarity index 95%
rename from asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/AbstractLSMIOOperationCallback.java
rename to asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
index 76265b8..c9c01e8 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/AbstractLSMIOOperationCallback.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
@@ -13,11 +13,11 @@
* limitations under the License.
*/
-package edu.uci.ics.asterix.transaction.management.ioopcallbacks;
+package edu.uci.ics.asterix.common.ioopcallbacks;
import java.util.List;
-import edu.uci.ics.asterix.transaction.management.opcallbacks.BaseOperationTracker;
+import edu.uci.ics.asterix.common.context.BaseOperationTracker;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrame;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMBTreeIOOperationCallback.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
similarity index 92%
rename from asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMBTreeIOOperationCallback.java
rename to asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
index da29894..1fe77a0 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMBTreeIOOperationCallback.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
@@ -13,11 +13,11 @@
* limitations under the License.
*/
-package edu.uci.ics.asterix.transaction.management.ioopcallbacks;
+package edu.uci.ics.asterix.common.ioopcallbacks;
import java.util.List;
-import edu.uci.ics.asterix.transaction.management.opcallbacks.BaseOperationTracker;
+import edu.uci.ics.asterix.common.context.BaseOperationTracker;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
import edu.uci.ics.hyracks.storage.am.lsm.btree.impls.LSMBTreeImmutableComponent;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
similarity index 89%
rename from asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
rename to asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
index 89e2e99..7524ddb 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
@@ -13,9 +13,9 @@
* limitations under the License.
*/
-package edu.uci.ics.asterix.transaction.management.ioopcallbacks;
+package edu.uci.ics.asterix.common.ioopcallbacks;
-import edu.uci.ics.asterix.transaction.management.opcallbacks.BaseOperationTracker;
+import edu.uci.ics.asterix.common.context.BaseOperationTracker;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
similarity index 93%
rename from asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
rename to asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
index 1b0e162..fb30742 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
@@ -13,11 +13,11 @@
* limitations under the License.
*/
-package edu.uci.ics.asterix.transaction.management.ioopcallbacks;
+package edu.uci.ics.asterix.common.ioopcallbacks;
import java.util.List;
-import edu.uci.ics.asterix.transaction.management.opcallbacks.BaseOperationTracker;
+import edu.uci.ics.asterix.common.context.BaseOperationTracker;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndexImmutableComponent;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
similarity index 89%
rename from asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
rename to asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
index 1752f6c..5e5b0ed 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
@@ -13,9 +13,9 @@
* limitations under the License.
*/
-package edu.uci.ics.asterix.transaction.management.ioopcallbacks;
+package edu.uci.ics.asterix.common.ioopcallbacks;
-import edu.uci.ics.asterix.transaction.management.opcallbacks.BaseOperationTracker;
+import edu.uci.ics.asterix.common.context.BaseOperationTracker;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMRTreeIOOperationCallback.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
similarity index 93%
rename from asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMRTreeIOOperationCallback.java
rename to asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
index fb6c864..324cccb 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMRTreeIOOperationCallback.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
@@ -13,11 +13,11 @@
* limitations under the License.
*/
-package edu.uci.ics.asterix.transaction.management.ioopcallbacks;
+package edu.uci.ics.asterix.common.ioopcallbacks;
import java.util.List;
-import edu.uci.ics.asterix.transaction.management.opcallbacks.BaseOperationTracker;
+import edu.uci.ics.asterix.common.context.BaseOperationTracker;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTreeImmutableComponent;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
similarity index 89%
rename from asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
rename to asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
index 6048fd5..748d93a 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
@@ -13,9 +13,9 @@
* limitations under the License.
*/
-package edu.uci.ics.asterix.transaction.management.ioopcallbacks;
+package edu.uci.ics.asterix.common.ioopcallbacks;
-import edu.uci.ics.asterix.transaction.management.opcallbacks.BaseOperationTracker;
+import edu.uci.ics.asterix.common.context.BaseOperationTracker;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/AbstractOperationCallback.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/AbstractOperationCallback.java
index dc7b543..5e53060 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/AbstractOperationCallback.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/AbstractOperationCallback.java
@@ -15,6 +15,8 @@
package edu.uci.ics.asterix.common.transactions;
+import java.util.concurrent.atomic.AtomicInteger;
+
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.MurmurHash128Bit;
@@ -26,7 +28,7 @@
protected final int[] primaryKeyFields;
protected final ITransactionContext txnCtx;
protected final ILockManager lockManager;
- protected int transactorLocalNumActiveOperations = 0;
+ protected final AtomicInteger transactorLocalNumActiveOperations;
protected final long[] longHashes;
public AbstractOperationCallback(int datasetId, int[] primaryKeyFields, ITransactionContext txnCtx,
@@ -35,6 +37,7 @@
this.primaryKeyFields = primaryKeyFields;
this.txnCtx = txnCtx;
this.lockManager = lockManager;
+ this.transactorLocalNumActiveOperations = new AtomicInteger(0);
this.longHashes = new long[2];
}
@@ -44,14 +47,14 @@
}
public int getLocalNumActiveOperations() {
- return transactorLocalNumActiveOperations;
+ return transactorLocalNumActiveOperations.get();
}
public void incrementLocalNumActiveOperations() {
- transactorLocalNumActiveOperations++;
+ transactorLocalNumActiveOperations.incrementAndGet();
}
public void decrementLocalNumActiveOperations() {
- transactorLocalNumActiveOperations--;
+ transactorLocalNumActiveOperations.decrementAndGet();
}
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
index 0a340d4..b53698b 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
@@ -5,7 +5,7 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
@@ -26,11 +26,7 @@
public ILSMMergePolicy getLSMMergePolicy();
- public ILSMOperationTrackerFactory getLSMBTreeOperationTrackerFactory(boolean isPrimary);
-
- public ILSMOperationTrackerFactory getLSMRTreeOperationTrackerFactory();
-
- public ILSMOperationTrackerFactory getLSMInvertedIndexOperationTrackerFactory();
+ public ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID);
public ILSMIOOperationCallbackProvider getLSMBTreeIOOperationCallbackProvider();
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java
index a06cc75..1ffd5f1 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java
@@ -1,7 +1,6 @@
package edu.uci.ics.asterix.common.transactions;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
-import edu.uci.ics.asterix.common.transactions.ITransactionContext.TransactionType;
import edu.uci.ics.asterix.common.transactions.ITransactionManager.TransactionState;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
@@ -14,6 +13,8 @@
public void decreaseActiveTransactionCountOnIndexes() throws HyracksDataException;
+ public int getActiveOperationCountOnIndexes() throws HyracksDataException;
+
public LogicalLogLocator getFirstLogLocator();
public LogicalLogLocator getLastLogLocator();
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
index 9963ec7..6485c16 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
@@ -25,6 +25,7 @@
import edu.uci.ics.asterix.common.exceptions.ACIDException;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.common.functions.FunctionSignature;
+import edu.uci.ics.asterix.common.transactions.AbstractOperationCallback;
import edu.uci.ics.asterix.common.transactions.DatasetId;
import edu.uci.ics.asterix.common.transactions.IResourceManager.ResourceType;
import edu.uci.ics.asterix.common.transactions.ITransactionContext;
@@ -273,6 +274,7 @@
ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
txnCtx.setTransactionType(TransactionType.READ_WRITE);
+ txnCtx.registerIndexAndCallback(lsmIndex, (AbstractOperationCallback) modCallback);
// TODO: fix exceptions once new BTree exception model is in hyracks.
indexAccessor.insert(tuple);
@@ -575,6 +577,7 @@
ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
txnCtx.setTransactionType(TransactionType.READ_WRITE);
+ txnCtx.registerIndexAndCallback(lsmIndex, (AbstractOperationCallback) modCallback);
indexAccessor.delete(tuple);
indexLifecycleManager.close(resourceID);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
index 22665aa..b4f4ee5 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -32,7 +32,9 @@
import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
import edu.uci.ics.asterix.common.config.IAsterixPropertiesProvider;
+import edu.uci.ics.asterix.common.context.BaseOperationTracker;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
import edu.uci.ics.asterix.common.transactions.IResourceManager.ResourceType;
import edu.uci.ics.asterix.common.transactions.TransactionalResourceManagerRepository;
import edu.uci.ics.asterix.external.adapter.factory.IAdapterFactory;
@@ -72,6 +74,7 @@
import edu.uci.ics.hyracks.storage.am.common.util.IndexFileNameUtil;
import edu.uci.ics.hyracks.storage.am.lsm.btree.impls.LSMBTree;
import edu.uci.ics.hyracks.storage.am.lsm.btree.util.LSMBTreeUtils;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
@@ -337,11 +340,12 @@
long resourceID = -1;
AsterixRuntimeComponentsProvider rtcProvider = index.isPrimaryIndex() ? AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER
: AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER;
+ ILSMOperationTracker opTracker = index.isPrimaryIndex() ? runtimeContext.getLSMBTreeOperationTracker(index
+ .getDatasetId().getId()) : new BaseOperationTracker(LSMBTreeIOOperationCallbackFactory.INSTANCE);
if (create) {
lsmBtree = LSMBTreeUtils.createLSMTree(virtualBufferCache, ioManager, file, bufferCache, fileMapProvider,
typeTraits, comparatorFactories, bloomFilterKeyFields,
- runtimeContext.getBloomFilterFalsePositiveRate(), runtimeContext.getLSMMergePolicy(),
- runtimeContext.getLSMBTreeOperationTrackerFactory(index.isPrimaryIndex()),
+ runtimeContext.getBloomFilterFalsePositiveRate(), runtimeContext.getLSMMergePolicy(), opTracker,
runtimeContext.getLSMIOScheduler(), rtcProvider, runtimeContext.getMetaDataIODeviceId());
lsmBtree.create();
resourceID = runtimeContext.getResourceIdFactory().createId();
@@ -362,8 +366,8 @@
lsmBtree = LSMBTreeUtils.createLSMTree(virtualBufferCache, ioManager, file, bufferCache,
fileMapProvider, typeTraits, comparatorFactories, bloomFilterKeyFields,
runtimeContext.getBloomFilterFalsePositiveRate(), runtimeContext.getLSMMergePolicy(),
- runtimeContext.getLSMBTreeOperationTrackerFactory(index.isPrimaryIndex()),
- runtimeContext.getLSMIOScheduler(), AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
+ opTracker, runtimeContext.getLSMIOScheduler(),
+ AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
runtimeContext.getMetaDataIODeviceId());
indexLifecycleManager.register(resourceID, lsmBtree);
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 4abb5d4..998eac4 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -30,6 +30,8 @@
import edu.uci.ics.asterix.common.context.AsterixVirtualBufferCacheProvider;
import edu.uci.ics.asterix.common.context.ITransactionSubsystemProvider;
import edu.uci.ics.asterix.common.context.TransactionSubsystemProvider;
+import edu.uci.ics.asterix.common.dataflow.AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor;
+import edu.uci.ics.asterix.common.dataflow.AsterixLSMTreeInsertDeleteOperatorDescriptor;
import edu.uci.ics.asterix.common.dataflow.IAsterixApplicationContextInfo;
import edu.uci.ics.asterix.common.parse.IParseFileSplitsDecl;
import edu.uci.ics.asterix.common.transactions.IResourceManager.ResourceType;
@@ -75,6 +77,7 @@
import edu.uci.ics.asterix.runtime.job.listener.JobEventListenerFactory;
import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallbackFactory;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerProvider;
import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexSearchOperationCallbackFactory;
import edu.uci.ics.asterix.transaction.management.opcallbacks.SecondaryIndexModificationOperationCallbackFactory;
import edu.uci.ics.asterix.transaction.management.opcallbacks.SecondaryIndexSearchOperationCallbackFactory;
@@ -130,9 +133,7 @@
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
import edu.uci.ics.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexInsertUpdateDeleteOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexInsertUpdateDeleteOperator;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.dataflow.LSMRTreeDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.rtree.dataflow.RTreeSearchOperatorDescriptor;
@@ -780,7 +781,7 @@
GlobalConfig.DEFAULT_BTREE_FILL_FACTOR, false, numElementsHint, new LSMBTreeDataflowHelperFactory(
new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
+ new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
storageProperties.getBloomFilterFalsePositiveRate()), NoOpOperationCallbackFactory.INSTANCE);
@@ -842,18 +843,18 @@
PrimaryIndexModificationOperationCallbackFactory modificationCallbackFactory = new PrimaryIndexModificationOperationCallbackFactory(
jobId, datasetId, primaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE);
- LSMTreeIndexInsertUpdateDeleteOperatorDescriptor btreeBulkLoad = new LSMTreeIndexInsertUpdateDeleteOperatorDescriptor(
+ AsterixLSMTreeInsertDeleteOperatorDescriptor insertDeleteOp = new AsterixLSMTreeInsertDeleteOperatorDescriptor(
spec, recordDesc, appContext.getStorageManagerInterface(),
appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
comparatorFactories, bloomFilterKeyFields, fieldPermutation, indexOp,
new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(datasetId),
AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
+ new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER, storageProperties
- .getBloomFilterFalsePositiveRate()), null, modificationCallbackFactory);
+ .getBloomFilterFalsePositiveRate()), null, modificationCallbackFactory, true);
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad,
+ return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(insertDeleteOp,
splitsAndConstraint.second);
} catch (MetadataException me) {
@@ -1038,7 +1039,7 @@
SecondaryIndexModificationOperationCallbackFactory modificationCallbackFactory = new SecondaryIndexModificationOperationCallbackFactory(
jobId, datasetId, primaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE);
- LSMTreeIndexInsertUpdateDeleteOperatorDescriptor btreeBulkLoad = new LSMTreeIndexInsertUpdateDeleteOperatorDescriptor(
+ AsterixLSMTreeInsertDeleteOperatorDescriptor btreeBulkLoad = new AsterixLSMTreeInsertDeleteOperatorDescriptor(
spec, recordDesc, appContext.getStorageManagerInterface(),
appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
comparatorFactories, bloomFilterKeyFields, fieldPermutation, indexOp,
@@ -1047,7 +1048,8 @@
AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER,
AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER,
AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER, storageProperties
- .getBloomFilterFalsePositiveRate()), filterFactory, modificationCallbackFactory);
+ .getBloomFilterFalsePositiveRate()), filterFactory, modificationCallbackFactory,
+ false);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad,
splitsAndConstraint.second);
} catch (MetadataException e) {
@@ -1164,7 +1166,7 @@
SecondaryIndexModificationOperationCallbackFactory modificationCallbackFactory = new SecondaryIndexModificationOperationCallbackFactory(
jobId, datasetId, primaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_INVERTED_INDEX);
- LSMInvertedIndexInsertUpdateDeleteOperator insertDeleteOp = new LSMInvertedIndexInsertUpdateDeleteOperator(
+ AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor insertDeleteOp = new AsterixLSMInvertedIndexInsertDeleteOperatorDescriptor(
spec, recordDesc, appContext.getStorageManagerInterface(), splitsAndConstraint.first,
appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits, tokenComparatorFactories,
invListsTypeTraits, invListComparatorFactories, tokenizerFactory, fieldPermutation, indexOp,
@@ -1257,7 +1259,7 @@
SecondaryIndexModificationOperationCallbackFactory modificationCallbackFactory = new SecondaryIndexModificationOperationCallbackFactory(
jobId, datasetId, primaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_RTREE);
- LSMTreeIndexInsertUpdateDeleteOperatorDescriptor rtreeUpdate = new LSMTreeIndexInsertUpdateDeleteOperatorDescriptor(
+ AsterixLSMTreeInsertDeleteOperatorDescriptor rtreeUpdate = new AsterixLSMTreeInsertDeleteOperatorDescriptor(
spec, recordDesc, appContext.getStorageManagerInterface(),
appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
comparatorFactories, null, fieldPermutation, indexOp, new LSMRTreeDataflowHelperFactory(
@@ -1269,7 +1271,7 @@
AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER, proposeLinearizer(
nestedKeyType.getTypeTag(), comparatorFactories.length),
storageProperties.getBloomFilterFalsePositiveRate()), filterFactory,
- modificationCallbackFactory);
+ modificationCallbackFactory, false);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(rtreeUpdate, splitsAndConstraint.second);
} catch (MetadataException | IOException e) {
throw new AlgebricksException(e);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTracker.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTracker.java
deleted file mode 100644
index 68713d8..0000000
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTracker.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.asterix.transaction.management.opcallbacks;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-import edu.uci.ics.asterix.common.transactions.AbstractOperationCallback;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
-import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMOperationType;
-
-public class PrimaryIndexOperationTracker extends BaseOperationTracker {
-
- // Number of active operations on a ILSMIndex instance.
- private AtomicInteger numActiveOperations;
- private ILSMIndexAccessor accessor;
-
- public PrimaryIndexOperationTracker(ILSMIndex index, ILSMIOOperationCallbackFactory ioOpCallbackFactory) {
- super(index, ioOpCallbackFactory);
- this.numActiveOperations = new AtomicInteger(0);
- }
-
- @Override
- public void beforeOperation(LSMOperationType opType, ISearchOperationCallback searchCallback,
- IModificationOperationCallback modificationCallback) throws HyracksDataException {
- if (opType != LSMOperationType.FORCE_MODIFICATION) {
- numActiveOperations.incrementAndGet();
-
- // Increment transactor-local active operations count.
- AbstractOperationCallback opCallback = getOperationCallback(searchCallback, modificationCallback);
- if (opCallback != null) {
- opCallback.incrementLocalNumActiveOperations();
- }
- }
- }
-
- @Override
- public void afterOperation(LSMOperationType opType, ISearchOperationCallback searchCallback,
- IModificationOperationCallback modificationCallback) throws HyracksDataException {
- // Searches are immediately considered complete, because they should not prevent the execution of flushes.
- if (searchCallback != null) {
- completeOperation(opType, searchCallback, modificationCallback);
- }
- }
-
- @Override
- public void completeOperation(LSMOperationType opType, ISearchOperationCallback searchCallback,
- IModificationOperationCallback modificationCallback) throws HyracksDataException {
- int nActiveOps = numActiveOperations.decrementAndGet();
- // Decrement transactor-local active operations count.
- AbstractOperationCallback opCallback = getOperationCallback(searchCallback, modificationCallback);
- if (opCallback != null) {
- opCallback.decrementLocalNumActiveOperations();
- }
- // If we need a flush, and this is the last completing operation, then schedule the flush.
- // Once the flush has completed notify all waiting operations.
- if (index.getFlushStatus() && nActiveOps == 0 && opType != LSMOperationType.FLUSH) {
- if (accessor == null) {
- accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
- NoOpOperationCallback.INSTANCE);
- }
- accessor.scheduleFlush(ioOpCallback);
- }
- }
-
- private AbstractOperationCallback getOperationCallback(ISearchOperationCallback searchCallback,
- IModificationOperationCallback modificationCallback) {
-
- if (searchCallback == NoOpOperationCallback.INSTANCE || modificationCallback == NoOpOperationCallback.INSTANCE) {
- return null;
- }
- if (searchCallback != null) {
- return (AbstractOperationCallback) searchCallback;
- } else {
- return (AbstractOperationCallback) modificationCallback;
- }
- }
-
-}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java
deleted file mode 100644
index 20d8931..0000000
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.asterix.transaction.management.opcallbacks;
-
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
-
-public class PrimaryIndexOperationTrackerFactory implements ILSMOperationTrackerFactory {
-
- private static final long serialVersionUID = 1L;
-
- private final ILSMIOOperationCallbackFactory ioOpCallbackFactory;
-
- public PrimaryIndexOperationTrackerFactory(ILSMIOOperationCallbackFactory ioOpCallbackFactory) {
- this.ioOpCallbackFactory = ioOpCallbackFactory;
- }
-
- @Override
- public ILSMOperationTracker createOperationTracker(ILSMIndex index) {
- return new PrimaryIndexOperationTracker(index, ioOpCallbackFactory);
- }
-
-}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerProvider.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerProvider.java
new file mode 100644
index 0000000..7b7198e
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerProvider.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.transaction.management.opcallbacks;
+
+import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
+import edu.uci.ics.asterix.common.context.DatasetLifecycleManager;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider;
+
+public class PrimaryIndexOperationTrackerProvider implements ILSMOperationTrackerProvider {
+
+ private static final long serialVersionUID = 1L;
+
+ private final int datasetID;
+
+ public PrimaryIndexOperationTrackerProvider(int datasetID) {
+ this.datasetID = datasetID;
+ }
+
+ @Override
+ public ILSMOperationTracker createOperationTracker(IHyracksTaskContext ctx) {
+ DatasetLifecycleManager dslcManager = (DatasetLifecycleManager) ((IAsterixAppRuntimeContext) ctx
+ .getJobletContext().getApplicationContext().getApplicationObject()).getIndexLifecycleManager();
+ return dslcManager.getOperationTracker(datasetID);
+ }
+
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java
index 8871b7a..4703875 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java
@@ -1,11 +1,12 @@
package edu.uci.ics.asterix.transaction.management.opcallbacks;
+import edu.uci.ics.asterix.common.context.BaseOperationTracker;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider;
-public class SecondaryIndexOperationTrackerFactory implements ILSMOperationTrackerFactory {
+public class SecondaryIndexOperationTrackerFactory implements ILSMOperationTrackerProvider {
private static final long serialVersionUID = 1L;
@@ -16,8 +17,8 @@
}
@Override
- public ILSMOperationTracker createOperationTracker(ILSMIndex index) {
- return new BaseOperationTracker(index, ioOpCallbackFactory);
+ public ILSMOperationTracker createOperationTracker(IHyracksTaskContext ctx) {
+ return new BaseOperationTracker(ioOpCallbackFactory);
}
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
index 9879cec..ac4ca61 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
@@ -2,6 +2,8 @@
import java.io.File;
+import edu.uci.ics.asterix.common.context.BaseOperationTracker;
+import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
import edu.uci.ics.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
@@ -51,8 +53,9 @@
LSMBTree lsmBTree = LSMBTreeUtils.createLSMTree(virtualBufferCache, runtimeContextProvider.getIOManager(),
file, runtimeContextProvider.getBufferCache(), runtimeContextProvider.getFileMapManager(), typeTraits,
cmpFactories, bloomFilterKeyFields, runtimeContextProvider.getBloomFilterFalsePositiveRate(),
- runtimeContextProvider.getLSMMergePolicy(), runtimeContextProvider
- .getLSMBTreeOperationTrackerFactory(isPrimary), runtimeContextProvider.getLSMIOScheduler(),
+ runtimeContextProvider.getLSMMergePolicy(),
+ isPrimary ? runtimeContextProvider.getLSMBTreeOperationTracker(datasetID) : new BaseOperationTracker(
+ LSMBTreeIOOperationCallbackFactory.INSTANCE), runtimeContextProvider.getLSMIOScheduler(),
runtimeContextProvider.getLSMBTreeIOOperationCallbackProvider(), fileSplits == null ? ioDeviceID
: fileSplits[partition].getIODeviceId());
return lsmBTree;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
index 743e047..5d9ac75 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
@@ -1,5 +1,7 @@
package edu.uci.ics.asterix.transaction.management.resource;
+import edu.uci.ics.asterix.common.context.BaseOperationTracker;
+import edu.uci.ics.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
import edu.uci.ics.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
@@ -43,27 +45,25 @@
IVirtualBufferCache virtualBufferCache = runtimeContextProvider.getVirtualBufferCache(datasetID);
try {
if (isPartitioned) {
- return InvertedIndexUtils.createPartitionedLSMInvertedIndex(virtualBufferCache,
- runtimeContextProvider.getFileMapManager(), invListTypeTraits, invListCmpFactories,
- tokenTypeTraits, tokenCmpFactories, tokenizerFactory, runtimeContextProvider.getBufferCache(),
- runtimeContextProvider.getIOManager(), filePath,
- runtimeContextProvider.getBloomFilterFalsePositiveRate(),
- runtimeContextProvider.getLSMMergePolicy(),
- runtimeContextProvider.getLSMInvertedIndexOperationTrackerFactory(),
- runtimeContextProvider.getLSMIOScheduler(),
- runtimeContextProvider.getLSMInvertedIndexIOOperationCallbackProvider(),
- fileSplits[partition].getIODeviceId());
+ return InvertedIndexUtils.createPartitionedLSMInvertedIndex(virtualBufferCache, runtimeContextProvider
+ .getFileMapManager(), invListTypeTraits, invListCmpFactories, tokenTypeTraits,
+ tokenCmpFactories, tokenizerFactory, runtimeContextProvider.getBufferCache(),
+ runtimeContextProvider.getIOManager(), filePath, runtimeContextProvider
+ .getBloomFilterFalsePositiveRate(), runtimeContextProvider.getLSMMergePolicy(),
+ new BaseOperationTracker(LSMInvertedIndexIOOperationCallbackFactory.INSTANCE),
+ runtimeContextProvider.getLSMIOScheduler(), runtimeContextProvider
+ .getLSMInvertedIndexIOOperationCallbackProvider(), fileSplits[partition]
+ .getIODeviceId());
} else {
- return InvertedIndexUtils.createLSMInvertedIndex(virtualBufferCache,
- runtimeContextProvider.getFileMapManager(), invListTypeTraits, invListCmpFactories,
- tokenTypeTraits, tokenCmpFactories, tokenizerFactory, runtimeContextProvider.getBufferCache(),
- runtimeContextProvider.getIOManager(), filePath,
- runtimeContextProvider.getBloomFilterFalsePositiveRate(),
- runtimeContextProvider.getLSMMergePolicy(),
- runtimeContextProvider.getLSMInvertedIndexOperationTrackerFactory(),
- runtimeContextProvider.getLSMIOScheduler(),
- runtimeContextProvider.getLSMInvertedIndexIOOperationCallbackProvider(),
- fileSplits[partition].getIODeviceId());
+ return InvertedIndexUtils.createLSMInvertedIndex(virtualBufferCache, runtimeContextProvider
+ .getFileMapManager(), invListTypeTraits, invListCmpFactories, tokenTypeTraits,
+ tokenCmpFactories, tokenizerFactory, runtimeContextProvider.getBufferCache(),
+ runtimeContextProvider.getIOManager(), filePath, runtimeContextProvider
+ .getBloomFilterFalsePositiveRate(), runtimeContextProvider.getLSMMergePolicy(),
+ new BaseOperationTracker(LSMInvertedIndexIOOperationCallbackFactory.INSTANCE),
+ runtimeContextProvider.getLSMIOScheduler(), runtimeContextProvider
+ .getLSMInvertedIndexIOOperationCallbackProvider(), fileSplits[partition]
+ .getIODeviceId());
}
} catch (IndexException e) {
throw new HyracksDataException(e);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
index 3e45b77..8ad9225 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
@@ -2,6 +2,8 @@
import java.io.File;
+import edu.uci.ics.asterix.common.context.BaseOperationTracker;
+import edu.uci.ics.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
import edu.uci.ics.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
@@ -30,7 +32,8 @@
public LSMRTreeLocalResourceMetadata(ITypeTraits[] typeTraits, IBinaryComparatorFactory[] rtreeCmpFactories,
IBinaryComparatorFactory[] btreeCmpFactories, IPrimitiveValueProviderFactory[] valueProviderFactories,
- RTreePolicyType rtreePolicyType, ILinearizeComparatorFactory linearizeCmpFactory, FileSplit[] fileSplits, int datasetID) {
+ RTreePolicyType rtreePolicyType, ILinearizeComparatorFactory linearizeCmpFactory, FileSplit[] fileSplits,
+ int datasetID) {
super(datasetID);
this.typeTraits = typeTraits;
this.rtreeCmpFactories = rtreeCmpFactories;
@@ -50,12 +53,11 @@
return LSMRTreeUtils.createLSMTree(virtualBufferCache, runtimeContextProvider.getIOManager(), file,
runtimeContextProvider.getBufferCache(), runtimeContextProvider.getFileMapManager(), typeTraits,
rtreeCmpFactories, btreeCmpFactories, valueProviderFactories, rtreePolicyType,
- runtimeContextProvider.getBloomFilterFalsePositiveRate(),
- runtimeContextProvider.getLSMMergePolicy(),
- runtimeContextProvider.getLSMRTreeOperationTrackerFactory(),
- runtimeContextProvider.getLSMIOScheduler(),
- runtimeContextProvider.getLSMRTreeIOOperationCallbackProvider(), linearizeCmpFactory,
- fileSplits[partition].getIODeviceId());
+ runtimeContextProvider.getBloomFilterFalsePositiveRate(), runtimeContextProvider
+ .getLSMMergePolicy(),
+ new BaseOperationTracker(LSMRTreeIOOperationCallbackFactory.INSTANCE), runtimeContextProvider
+ .getLSMIOScheduler(), runtimeContextProvider.getLSMRTreeIOOperationCallbackProvider(),
+ linearizeCmpFactory, fileSplits[partition].getIODeviceId());
} catch (TreeIndexException e) {
throw new HyracksDataException(e);
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
index dc6f111..bfdda15 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
@@ -110,6 +110,9 @@
@Override
public void lock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
throws ACIDException {
+ if (entityHashValue == 379839425) {
+ System.out.println("break");
+ }
internalLock(datasetId, entityHashValue, lockMode, txnContext, false);
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index 318996a..fc07457 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -27,6 +27,7 @@
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -45,7 +46,6 @@
import edu.uci.ics.asterix.common.transactions.LogicalLogLocator;
import edu.uci.ics.asterix.common.transactions.PhysicalLogLocator;
import edu.uci.ics.asterix.common.transactions.ReusableLogContentObject;
-import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -425,6 +425,14 @@
logPages[pageIndex].setBufferNextWriteOffset(bufferNextWriteOffset);
if (logType != LogType.ENTITY_COMMIT) {
+ if (logType == LogType.COMMIT) {
+ int count = txnCtx.getActiveOperationCountOnIndexes();
+ map = activeTxnCountMaps.get(pageIndex);
+ if (map.containsKey(txnCtx)) {
+ count += (Integer) map.get(txnCtx);
+ }
+ map.put(txnCtx, count);
+ }
// release the ownership as the log record has been placed in
// created space.
logPages[pageIndex].decRefCnt();
@@ -733,6 +741,8 @@
return provider;
}
+ static AtomicInteger t = new AtomicInteger();
+
public void decrementActiveTxnCountOnIndexes(int pageIndex) throws HyracksDataException {
ITransactionContext ctx = null;
int count = 0;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointThread.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointThread.java
index fc9f11b..b59c48f 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointThread.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointThread.java
@@ -2,9 +2,9 @@
import java.util.List;
+import edu.uci.ics.asterix.common.context.BaseOperationTracker;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
import edu.uci.ics.asterix.common.transactions.IRecoveryManager;
-import edu.uci.ics.asterix.transaction.management.opcallbacks.BaseOperationTracker;
import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
index 574487b..b262fab 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -35,6 +35,7 @@
import java.util.logging.Logger;
import edu.uci.ics.asterix.common.api.ILocalResourceMetadata;
+import edu.uci.ics.asterix.common.context.BaseOperationTracker;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
import edu.uci.ics.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
import edu.uci.ics.asterix.common.transactions.IBuffer;
@@ -48,7 +49,6 @@
import edu.uci.ics.asterix.common.transactions.ITransactionContext;
import edu.uci.ics.asterix.common.transactions.LogicalLogLocator;
import edu.uci.ics.asterix.common.transactions.PhysicalLogLocator;
-import edu.uci.ics.asterix.transaction.management.opcallbacks.BaseOperationTracker;
import edu.uci.ics.asterix.transaction.management.service.logging.IndexResourceManager;
import edu.uci.ics.asterix.transaction.management.service.logging.LogManager;
import edu.uci.ics.asterix.transaction.management.service.logging.LogType;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java
index 533f221..cf623a1 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java
@@ -1,11 +1,10 @@
package edu.uci.ics.asterix.transaction.management.service.transaction;
import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
-import edu.uci.ics.asterix.transaction.management.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
-import edu.uci.ics.asterix.transaction.management.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
-import edu.uci.ics.asterix.transaction.management.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
-import edu.uci.ics.asterix.transaction.management.opcallbacks.BaseOperationTracker;
-import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTracker;
+import edu.uci.ics.asterix.common.context.BaseOperationTracker;
+import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
+import edu.uci.ics.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
+import edu.uci.ics.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
@@ -18,7 +17,7 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider;
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
@@ -26,7 +25,7 @@
import edu.uci.ics.hyracks.storage.common.file.ResourceIdFactory;
public class AsterixRuntimeComponentsProvider implements IIndexLifecycleManagerProvider, IStorageManagerInterface,
- ILSMIOOperationSchedulerProvider, ILSMMergePolicyProvider, ILSMOperationTrackerFactory,
+ ILSMIOOperationSchedulerProvider, ILSMMergePolicyProvider, ILSMOperationTrackerProvider,
ILSMIOOperationCallbackProvider {
private static final long serialVersionUID = 1L;
@@ -50,12 +49,9 @@
}
@Override
- public ILSMOperationTracker createOperationTracker(ILSMIndex index) {
- if (isSecondary) {
- return new BaseOperationTracker(index, ioOpCallbackFactory);
- } else {
- return new PrimaryIndexOperationTracker(index, ioOpCallbackFactory);
- }
+ public ILSMOperationTracker createOperationTracker(IHyracksTaskContext ctx) {
+ assert isSecondary;
+ return new BaseOperationTracker(ioOpCallbackFactory);
}
@Override
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
index 7530f53..4bdad6b 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
@@ -15,11 +15,11 @@
package edu.uci.ics.asterix.transaction.management.service.transaction;
import java.io.Serializable;
-import java.util.ArrayList;
import java.util.HashSet;
-import java.util.List;
+import java.util.Iterator;
import java.util.Set;
+import edu.uci.ics.asterix.common.context.BaseOperationTracker;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
import edu.uci.ics.asterix.common.transactions.AbstractOperationCallback;
import edu.uci.ics.asterix.common.transactions.ICloseable;
@@ -27,7 +27,6 @@
import edu.uci.ics.asterix.common.transactions.ITransactionManager.TransactionState;
import edu.uci.ics.asterix.common.transactions.JobId;
import edu.uci.ics.asterix.common.transactions.LogicalLogLocator;
-import edu.uci.ics.asterix.transaction.management.opcallbacks.BaseOperationTracker;
import edu.uci.ics.asterix.transaction.management.service.logging.LogUtil;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
@@ -55,11 +54,11 @@
private JobId jobId;
// List of indexes on which operations were performed on behalf of this transaction.
- private final List<ILSMIndex> indexes = new ArrayList<ILSMIndex>();
+ private final Set<ILSMIndex> indexes = new HashSet<ILSMIndex>();
// List of operation callbacks corresponding to the operand indexes. In particular, needed to track
// the number of active operations contributed by this transaction.
- private final List<AbstractOperationCallback> callbacks = new ArrayList<AbstractOperationCallback>();
+ private final Set<AbstractOperationCallback> callbacks = new HashSet<AbstractOperationCallback>();
public TransactionContext(JobId jobId, TransactionSubsystem transactionSubsystem) throws ACIDException {
this.jobId = jobId;
@@ -92,12 +91,33 @@
public void decreaseActiveTransactionCountOnIndexes() throws HyracksDataException {
synchronized (indexes) {
- for (int i = 0; i < indexes.size(); i++) {
- ILSMIndex index = indexes.get(i);
- IModificationOperationCallback modificationCallback = (IModificationOperationCallback) callbacks.get(i);
- ((BaseOperationTracker) index.getOperationTracker()).completeOperation(LSMOperationType.MODIFICATION,
- null, modificationCallback);
+ Set<BaseOperationTracker> opTrackers = new HashSet<BaseOperationTracker>();
+ Iterator<ILSMIndex> indexIt = indexes.iterator();
+ Iterator<AbstractOperationCallback> cbIt = callbacks.iterator();
+ while (indexIt.hasNext()) {
+ ILSMIndex index = indexIt.next();
+ opTrackers.add((BaseOperationTracker) index.getOperationTracker());
+ assert cbIt.hasNext();
}
+ Iterator<BaseOperationTracker> trackerIt = opTrackers.iterator();
+ while (trackerIt.hasNext()) {
+ IModificationOperationCallback modificationCallback = (IModificationOperationCallback) cbIt.next();
+ ((BaseOperationTracker) trackerIt.next()).completeOperation(null, LSMOperationType.MODIFICATION, null,
+ modificationCallback);
+ }
+ }
+ }
+
+ @Override
+ public int getActiveOperationCountOnIndexes() throws HyracksDataException {
+ synchronized (indexes) {
+ int count = 0;
+ Iterator<AbstractOperationCallback> cbIt = callbacks.iterator();
+ while (cbIt.hasNext()) {
+ IModificationOperationCallback modificationCallback = (IModificationOperationCallback) cbIt.next();
+ count += ((AbstractOperationCallback) modificationCallback).getLocalNumActiveOperations();
+ }
+ return count;
}
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
index 85bdff8..c956367 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -106,8 +106,7 @@
//for entity-level commit
if (PKHashVal != -1) {
- boolean countIsZero = transactionProvider.getLockManager().unlock(datasetId, PKHashVal, txnContext,
- true);
+ boolean countIsZero = transactionProvider.getLockManager().unlock(datasetId, PKHashVal, txnContext, true);
if (!countIsZero) {
// Lock count != 0 for a particular entity implies that the entity has been locked
// more than once (probably due to a hash collision in our current model).