[ASTERIXDB-2231][STO] Separate primary op tracker for each partition
- user model changes: no
- storage format changes: no.
- interface changes: yes.
Details:
- Separate primary index operation tracker for each partition, instead
of having a global one on each NC to achieve better scalability.
- As a coordinated change, separate component id generator for each
partition as well.
- Add partition to transaction context so that transaction operations
can operate on proper op tracker.
- Fixes [ASTERIXDB-2232] to calculate dataset partitions correctly.
Change-Id: I9eb3854d2343e45beeccb87b0d434e5f4efd69c9
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2263
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index c554cbd..366438a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -109,15 +109,15 @@
private ILSMMergePolicyFactory metadataMergePolicyFactory;
private final INCServiceContext ncServiceContext;
private final IResourceIdFactory resourceIdFactory;
- private CompilerProperties compilerProperties;
- private ExternalProperties externalProperties;
- private MetadataProperties metadataProperties;
- private StorageProperties storageProperties;
- private TransactionProperties txnProperties;
- private ActiveProperties activeProperties;
- private BuildProperties buildProperties;
- private ReplicationProperties replicationProperties;
- private MessagingProperties messagingProperties;
+ private final CompilerProperties compilerProperties;
+ private final ExternalProperties externalProperties;
+ private final MetadataProperties metadataProperties;
+ private final StorageProperties storageProperties;
+ private final TransactionProperties txnProperties;
+ private final ActiveProperties activeProperties;
+ private final BuildProperties buildProperties;
+ private final ReplicationProperties replicationProperties;
+ private final MessagingProperties messagingProperties;
private final NodeProperties nodeProperties;
private ExecutorService threadExecutor;
private IDatasetMemoryManager datasetMemoryManager;
@@ -373,8 +373,8 @@
}
@Override
- public ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID) {
- return datasetLifecycleManager.getOperationTracker(datasetID);
+ public ILSMOperationTracker getPrimaryOperationTracker(int datasetID, int partition) {
+ return datasetLifecycleManager.getOperationTracker(datasetID, partition);
}
@Override
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
index a46b029..c6232f5 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
@@ -144,7 +144,7 @@
Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
- dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh();
+ dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh();
((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
// rollback a memory component
lsmAccessor.deleteComponents(memoryComponentsPredicate);
@@ -153,7 +153,7 @@
// rollback the last disk component
lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
- dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh();
+ dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh();
((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn);
lsmAccessor.deleteComponents(pred);
@@ -201,7 +201,7 @@
Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
- dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh();
+ dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh();
((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
// rollback a memory component
lsmAccessor.deleteComponents(memoryComponentsPredicate);
@@ -227,7 +227,7 @@
// rollback the last disk component
lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
- dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh();
+ dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh();
((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn);
lsmAccessor.deleteComponents(pred);
@@ -276,7 +276,7 @@
firstSearcher.waitUntilEntered();
// now that we enetered, we will rollback
ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
- dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh();
+ dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh();
((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
// rollback a memory component
lsmAccessor.deleteComponents(
@@ -298,7 +298,7 @@
lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
- dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh();
+ dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh();
((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn);
lsmAccessor.deleteComponents(pred);
@@ -702,7 +702,7 @@
public void run() {
ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
try {
- dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh();
+ dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh();
((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
lsmAccessor.deleteComponents(predicate);
} catch (HyracksDataException e) {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
index 367d0b9..62705cc 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
@@ -251,7 +251,7 @@
/**
* This test update partition 0, schedule flush and modify partition 1
- * Then ensure that in partition 1, primary and secondary have different component ids
+ * Then ensure that in partition 1, primary and secondary have the same component ids
*/
@Test
public void testAllocateWhileFlushIsScheduled() {
@@ -400,7 +400,8 @@
AtomicBoolean arrivedAtSchduleFlush = new AtomicBoolean(false);
AtomicBoolean finishedSchduleFlush = new AtomicBoolean(false);
MutableBoolean proceedToScheduleFlush = new MutableBoolean(false);
- addOpTrackerCallback(primaryLsmBtrees[0], new ITestOpCallback<Void>() {
+ // keep track of the flush of partition 1 since partitions 0 and 1 are flushed seperately
+ addOpTrackerCallback(primaryLsmBtrees[1], new ITestOpCallback<Void>() {
@Override
public void before(Void t) {
synchronized (arrivedAtSchduleFlush) {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
index 4bfc581..c69ffe5 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
@@ -22,7 +22,6 @@
import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallback;
import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
-import org.apache.asterix.common.storage.IIndexCheckpointManager;
import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
@@ -52,7 +51,7 @@
}
@Override
- public synchronized ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) {
+ public synchronized ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) throws HyracksDataException {
completedFlushes = 0;
completedMerges = 0;
rollbackFlushes = 0;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTracker.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTracker.java
index e376ff9..9a528d3 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTracker.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTracker.java
@@ -33,9 +33,9 @@
private final List<ITestOpCallback<Void>> callbacks = new ArrayList<>();
- public TestPrimaryIndexOperationTracker(int datasetID, ILogManager logManager, DatasetInfo dsInfo,
+ public TestPrimaryIndexOperationTracker(int datasetID, int partition, ILogManager logManager, DatasetInfo dsInfo,
ILSMComponentIdGenerator idGenerator) {
- super(datasetID, logManager, dsInfo, idGenerator);
+ super(datasetID, partition, logManager, dsInfo, idGenerator);
}
public void addCallback(ITestOpCallback<Void> callback) {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
index 5d7a7c6..e6b34b8 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
@@ -20,19 +20,22 @@
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
+import java.util.Map;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.context.DatasetLifecycleManager;
import org.apache.asterix.common.context.DatasetResource;
import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
+import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerFactory;
import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+import org.apache.hyracks.storage.common.IResource;
public class TestPrimaryIndexOperationTrackerFactory extends PrimaryIndexOperationTrackerFactory {
private static final long serialVersionUID = 1L;
- private int datasetId;
+ private final int datasetId;
public TestPrimaryIndexOperationTrackerFactory(int datasetId) {
super(datasetId);
@@ -40,17 +43,19 @@
}
@Override
- public ILSMOperationTracker getOperationTracker(INCServiceContext ctx) {
+ public ILSMOperationTracker getOperationTracker(INCServiceContext ctx, IResource resource) {
try {
INcApplicationContext appCtx = (INcApplicationContext) ctx.getApplicationContext();
DatasetLifecycleManager dslcManager = (DatasetLifecycleManager) appCtx.getDatasetLifecycleManager();
DatasetResource dsr = dslcManager.getDatasetLifecycle(datasetId);
- PrimaryIndexOperationTracker opTracker = dsr.getOpTracker();
+ int partition = StoragePathUtil.getPartitionNumFromRelativePath(resource.getPath());
+ PrimaryIndexOperationTracker opTracker = dslcManager.getOperationTracker(datasetId, partition);
if (!(opTracker instanceof TestPrimaryIndexOperationTracker)) {
- Field opTrackerField = DatasetResource.class.getDeclaredField("datasetPrimaryOpTracker");
- opTracker = new TestPrimaryIndexOperationTracker(datasetId,
- appCtx.getTransactionSubsystem().getLogManager(), dsr.getDatasetInfo(), dsr.getIdGenerator());
- setFinal(opTrackerField, dsr, opTracker);
+ Field opTrackersField = DatasetResource.class.getDeclaredField("datasetPrimaryOpTrackers");
+ opTracker = new TestPrimaryIndexOperationTracker(datasetId, partition,
+ appCtx.getTransactionSubsystem().getLogManager(), dsr.getDatasetInfo(),
+ dslcManager.getComponentIdGenerator(datasetId, partition));
+ replaceMapEntry(opTrackersField, dsr, partition, opTracker);
}
return opTracker;
} catch (Exception e) {
@@ -65,4 +70,14 @@
modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
field.set(obj, newValue);
}
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ static void replaceMapEntry(Field field, Object obj, Object key, Object value)
+ throws Exception, IllegalAccessException {
+ field.setAccessible(true);
+ Field modifiersField = Field.class.getDeclaredField("modifiers");
+ modifiersField.setAccessible(true);
+ Map map = (Map) field.get(obj);
+ map.put(key, value);
+ }
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java
index a10c234..70e5f6e 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java
@@ -42,6 +42,7 @@
import org.apache.asterix.metadata.utils.DatasetUtil;
import org.apache.asterix.test.common.TestExecutor;
import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
import org.junit.After;
import org.junit.Assert;
@@ -203,8 +204,10 @@
((INcApplicationContext) integrationUtil.ncs[0].getApplicationContext()).getDatasetLifecycleManager();
int maxMetadatasetId = 14;
for (int i = 1; i <= maxMetadatasetId; i++) {
- if (datasetLifecycleManager.getIndex(i, i) != null) {
- final PrimaryIndexOperationTracker opTracker = datasetLifecycleManager.getOperationTracker(i);
+ ILSMIndex index = (ILSMIndex) datasetLifecycleManager.getIndex(i, i);
+ if (index != null) {
+ final PrimaryIndexOperationTracker opTracker =
+ (PrimaryIndexOperationTracker) index.getOperationTracker();
Assert.assertEquals(0, opTracker.getNumActiveOperations());
}
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
index a1978eb..6a70a29 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
@@ -170,7 +170,7 @@
final TransactionOptions options = new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL);
final ITransactionManager transactionManager = ncAppCtx.getTransactionSubsystem().getTransactionManager();
final ITransactionContext txnCtx = transactionManager.beginTransaction(txnId, options);
- txnCtx.register(resourceId, index, NoOpOperationCallback.INSTANCE, true);
+ txnCtx.register(resourceId, 0, index, NoOpOperationCallback.INSTANCE, true);
return txnCtx;
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index 41c5ade..4441c6e 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -75,17 +75,19 @@
* creates (if necessary) and returns the primary index operation tracker of a dataset.
*
* @param datasetId
+ * @param partition
* @return
*/
- PrimaryIndexOperationTracker getOperationTracker(int datasetId);
+ PrimaryIndexOperationTracker getOperationTracker(int datasetId, int partition);
/**
* creates (if necessary) and returns the component Id generator of a dataset.
*
* @param datasetId
+ * @param partition
* @return
*/
- ILSMComponentIdGenerator getComponentIdGenerator(int datasetId);
+ ILSMComponentIdGenerator getComponentIdGenerator(int datasetId, int partition);
/**
* creates (if necessary) and returns the dataset virtual buffer caches.
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
index 8a83c7b..fffc170 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
@@ -66,7 +66,7 @@
IResourceIdFactory getResourceIdFactory();
- ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID);
+ ILSMOperationTracker getPrimaryOperationTracker(int datasetID, int partition);
void initialize(boolean initialRun) throws IOException, ACIDException, AlgebricksException;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
index e5fc998..41461ec 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
@@ -23,7 +23,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Set;
-import java.util.stream.Collectors;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.commons.lang3.tuple.Pair;
@@ -92,10 +91,10 @@
ILSMComponent leftComponent = immutableComponents.get(mergeableIndexes.getLeft());
ILSMComponent rightComponent = immutableComponents.get(mergeableIndexes.getRight());
ILSMComponentId targetId = LSMComponentIdUtils.union(leftComponent.getId(), rightComponent.getId());
- Set<IndexInfo> indexInfos = datasetLifecycleManager.getDatasetInfo(datasetId).getDatsetIndexInfos();
- int partition = getIndexPartition(index, indexInfos);
- triggerScheduledMerge(targetId,
- indexInfos.stream().filter(info -> info.getPartition() == partition).collect(Collectors.toSet()));
+ int partition = ((PrimaryIndexOperationTracker) index.getOperationTracker()).getPartition();
+ Set<ILSMIndex> indexes =
+ datasetLifecycleManager.getDatasetInfo(datasetId).getDatasetPartitionOpenIndexes(partition);
+ triggerScheduledMerge(targetId, indexes);
return true;
}
@@ -107,11 +106,8 @@
* @param indexInfos
* @throws HyracksDataException
*/
- private void triggerScheduledMerge(ILSMComponentId targetId, Set<IndexInfo> indexInfos)
- throws HyracksDataException {
- for (IndexInfo info : indexInfos) {
- ILSMIndex lsmIndex = info.getIndex();
-
+ private void triggerScheduledMerge(ILSMComponentId targetId, Set<ILSMIndex> indexes) throws HyracksDataException {
+ for (ILSMIndex lsmIndex : indexes) {
List<ILSMDiskComponent> immutableComponents = new ArrayList<>(lsmIndex.getDiskComponents());
if (isMergeOngoing(immutableComponents)) {
continue;
@@ -132,13 +128,4 @@
accessor.scheduleMerge(lsmIndex.getIOOperationCallback(), mergableComponents);
}
}
-
- private int getIndexPartition(ILSMIndex index, Set<IndexInfo> indexInfos) {
- for (IndexInfo info : indexInfos) {
- if (info.getIndex() == index) {
- return info.getPartition();
- }
- }
- return -1;
- }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
index 9d63818..44baf77 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
@@ -30,6 +30,9 @@
public class DatasetInfo extends Info implements Comparable<DatasetInfo> {
private static final Logger LOGGER = LogManager.getLogger();
+ // partition -> index
+ private final Map<Integer, Set<IndexInfo>> partitionIndexes;
+ // resourceID -> index
private final Map<Long, IndexInfo> indexes;
private final int datasetID;
private int numActiveIOOps;
@@ -40,6 +43,7 @@
private boolean durable;
public DatasetInfo(int datasetID) {
+ this.partitionIndexes = new HashMap<>();
this.indexes = new HashMap<>();
this.setLastAccess(-1);
this.datasetID = datasetID;
@@ -69,26 +73,17 @@
notifyAll();
}
- public synchronized Set<ILSMIndex> getDatasetIndexes() {
- Set<ILSMIndex> datasetIndexes = new HashSet<>();
- for (IndexInfo iInfo : getIndexes().values()) {
- if (iInfo.isOpen()) {
- datasetIndexes.add(iInfo.getIndex());
+ public synchronized Set<ILSMIndex> getDatasetPartitionOpenIndexes(int partition) {
+ Set<ILSMIndex> indexSet = new HashSet<>();
+ Set<IndexInfo> partitionIndexInfos = this.partitionIndexes.get(partition);
+ if (partitionIndexInfos != null) {
+ for (IndexInfo iInfo : partitionIndexInfos) {
+ if (iInfo.isOpen()) {
+ indexSet.add(iInfo.getIndex());
+ }
}
}
-
- return datasetIndexes;
- }
-
- public synchronized Set<IndexInfo> getDatsetIndexInfos() {
- Set<IndexInfo> infos = new HashSet<>();
- for (IndexInfo iInfo : getIndexes().values()) {
- if (iInfo.isOpen()) {
- infos.add(iInfo);
- }
- }
-
- return infos;
+ return indexSet;
}
@Override
@@ -160,6 +155,18 @@
return indexes;
}
+ public synchronized void addIndex(long resourceID, IndexInfo indexInfo) {
+ indexes.put(resourceID, indexInfo);
+ partitionIndexes.computeIfAbsent(indexInfo.getPartition(), partition -> new HashSet<>()).add(indexInfo);
+ }
+
+ public synchronized void removeIndex(long resourceID) {
+ IndexInfo info = indexes.remove(resourceID);
+ if (info != null) {
+ partitionIndexes.get(info.getPartition()).remove(info);
+ }
+ }
+
public boolean isRegistered() {
return isRegistered;
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java
index 7b8397c..83e3144 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java
@@ -21,9 +21,12 @@
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory;
+import org.apache.hyracks.storage.common.IResource;
/**
* This factory implementation is used by AsterixDB layer so that indexes of a dataset (/partition)
@@ -41,10 +44,12 @@
}
@Override
- public ILSMComponentIdGenerator getComponentIdGenerator(INCServiceContext serviceCtx) {
+ public ILSMComponentIdGenerator getComponentIdGenerator(INCServiceContext serviceCtx, IResource resource)
+ throws HyracksDataException {
IDatasetLifecycleManager dslcManager =
((INcApplicationContext) serviceCtx.getApplicationContext()).getDatasetLifecycleManager();
- return dslcManager.getComponentIdGenerator(datasetId);
+ int partition = StoragePathUtil.getPartitionNumFromRelativePath(resource.getPath());
+ return dslcManager.getComponentIdGenerator(datasetId, partition);
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 3a70515..1a61b8f 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -21,6 +21,7 @@
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -139,7 +140,7 @@
throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST);
}
- PrimaryIndexOperationTracker opTracker = dsr.getOpTracker();
+ PrimaryIndexOperationTracker opTracker = dsr.getOpTracker(iInfo.getPartition());
if (iInfo.getReferenceCount() != 0 || (opTracker != null && opTracker.getNumActiveOperations() != 0)) {
if (LOGGER.isErrorEnabled()) {
final String logMsg = String.format(
@@ -155,7 +156,7 @@
DatasetInfo dsInfo = dsr.getDatasetInfo();
dsInfo.waitForIO();
closeIndex(iInfo);
- dsInfo.getIndexes().remove(resourceID);
+ dsInfo.removeIndex(resourceID);
if (dsInfo.getReferenceCount() == 0 && dsInfo.isOpen() && dsInfo.getIndexes().isEmpty()
&& !dsInfo.isExternal()) {
removeDatasetFromCache(dsInfo.getDatasetID());
@@ -203,10 +204,7 @@
List<DatasetResource> datasetsResources = new ArrayList<>(datasets.values());
Collections.sort(datasetsResources);
for (DatasetResource dsr : datasetsResources) {
- PrimaryIndexOperationTracker opTracker = dsr.getOpTracker();
- if (opTracker != null && opTracker.getNumActiveOperations() == 0
- && dsr.getDatasetInfo().getReferenceCount() == 0 && dsr.getDatasetInfo().isOpen()
- && !dsr.isMetadataDataset()) {
+ if (isCandidateDatasetForEviction(dsr)) {
closeDataset(dsr);
LOGGER.info(() -> "Evicted Dataset" + dsr.getDatasetID());
return true;
@@ -215,14 +213,18 @@
return false;
}
- private static void flushAndWaitForIO(DatasetInfo dsInfo, IndexInfo iInfo) throws HyracksDataException {
- if (iInfo.isOpen()) {
- ILSMIndexAccessor accessor = iInfo.getIndex().createAccessor(NoOpIndexAccessParameters.INSTANCE);
- accessor.scheduleFlush(iInfo.getIndex().getIOOperationCallback());
+ private boolean isCandidateDatasetForEviction(DatasetResource dsr) {
+ for (PrimaryIndexOperationTracker opTracker : dsr.getOpTrackers()) {
+ if (opTracker.getNumActiveOperations() != 0) {
+ return false;
+ }
+ }
+ if (dsr.getDatasetInfo().getReferenceCount() != 0 || !dsr.getDatasetInfo().isOpen()
+ || dsr.isMetadataDataset()) {
+ return false;
}
- // Wait for the above flush op.
- dsInfo.waitForIO();
+ return true;
}
public DatasetResource getDatasetLifecycle(int did) {
@@ -234,12 +236,9 @@
dsr = datasets.get(did);
if (dsr == null) {
DatasetInfo dsInfo = new DatasetInfo(did);
- ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator();
- PrimaryIndexOperationTracker opTracker =
- new PrimaryIndexOperationTracker(did, logManager, dsInfo, idGenerator);
DatasetVirtualBufferCaches vbcs = new DatasetVirtualBufferCaches(did, storageProperties,
memoryManager.getNumPages(did), numPartitions);
- dsr = new DatasetResource(dsInfo, opTracker, vbcs, idGenerator);
+ dsr = new DatasetResource(dsInfo, vbcs);
datasets.put(did, dsr);
}
return dsr;
@@ -318,13 +317,33 @@
}
@Override
- public PrimaryIndexOperationTracker getOperationTracker(int datasetId) {
- return datasets.get(datasetId).getOpTracker();
+ public synchronized PrimaryIndexOperationTracker getOperationTracker(int datasetId, int partition) {
+ DatasetResource dataset = datasets.get(datasetId);
+ PrimaryIndexOperationTracker opTracker = dataset.getOpTracker(partition);
+ if (opTracker == null) {
+ populateOpTrackerAndIdGenerator(dataset, partition);
+ opTracker = dataset.getOpTracker(partition);
+ }
+ return opTracker;
}
@Override
- public ILSMComponentIdGenerator getComponentIdGenerator(int datasetId) {
- return datasets.get(datasetId).getIdGenerator();
+ public synchronized ILSMComponentIdGenerator getComponentIdGenerator(int datasetId, int partition) {
+ DatasetResource dataset = datasets.get(datasetId);
+ ILSMComponentIdGenerator generator = dataset.getComponentIdGenerator(partition);
+ if (generator == null) {
+ populateOpTrackerAndIdGenerator(dataset, partition);
+ generator = dataset.getComponentIdGenerator(partition);
+ }
+ return generator;
+ }
+
+ private void populateOpTrackerAndIdGenerator(DatasetResource dataset, int partition) {
+ ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator();
+ PrimaryIndexOperationTracker opTracker = new PrimaryIndexOperationTracker(dataset.getDatasetID(), partition,
+ logManager, dataset.getDatasetInfo(), idGenerator);
+ dataset.setPrimaryIndexOperationTracker(partition, opTracker);
+ dataset.setIdGenerator(partition, idGenerator);
}
private void validateDatasetLifecycleManagerState() throws HyracksDataException {
@@ -357,31 +376,40 @@
public synchronized void scheduleAsyncFlushForLaggingDatasets(long targetLSN) throws HyracksDataException {
//schedule flush for datasets with min LSN (Log Serial Number) < targetLSN
for (DatasetResource dsr : datasets.values()) {
- PrimaryIndexOperationTracker opTracker = dsr.getOpTracker();
- synchronized (opTracker) {
- for (IndexInfo iInfo : dsr.getIndexes().values()) {
- AbstractLSMIOOperationCallback ioCallback =
- (AbstractLSMIOOperationCallback) iInfo.getIndex().getIOOperationCallback();
- if (!(iInfo.getIndex().isCurrentMutableComponentEmpty() || ioCallback.hasPendingFlush()
- || opTracker.isFlushLogCreated() || opTracker.isFlushOnExit())) {
- long firstLSN = ioCallback.getFirstLSN();
- if (firstLSN < targetLSN) {
- LOGGER.info("Checkpoint flush dataset {}", dsr.getDatasetID());
- opTracker.setFlushOnExit(true);
- if (opTracker.getNumActiveOperations() == 0) {
- // No Modify operations currently, we need to trigger the flush and we can do so safely
- opTracker.flushIfRequested();
- }
- break;
- }
+ for (PrimaryIndexOperationTracker opTracker : dsr.getOpTrackers()) {
+ // check all partitions
+ synchronized (opTracker) {
+ scheduleAsyncFlushForLaggingDatasetPartition(dsr, opTracker, targetLSN);
+ }
+ }
+ }
+ }
+
+ private void scheduleAsyncFlushForLaggingDatasetPartition(DatasetResource dsr,
+ PrimaryIndexOperationTracker opTracker, long targetLSN) throws HyracksDataException {
+ int partition = opTracker.getPartition();
+ for (ILSMIndex lsmIndex : dsr.getDatasetInfo().getDatasetPartitionOpenIndexes(partition)) {
+ AbstractLSMIOOperationCallback ioCallback =
+ (AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback();
+ if (!(lsmIndex.isCurrentMutableComponentEmpty() || ioCallback.hasPendingFlush()
+ || opTracker.isFlushLogCreated() || opTracker.isFlushOnExit())) {
+ long firstLSN = ioCallback.getFirstLSN();
+ if (firstLSN < targetLSN) {
+ LOGGER.info("Checkpoint flush dataset {} partition {}", dsr.getDatasetID(), partition);
+ opTracker.setFlushOnExit(true);
+ if (opTracker.getNumActiveOperations() == 0) {
+ // No Modify operations currently, we need to trigger the flush and we can do so safely
+ opTracker.flushIfRequested();
}
+ break;
}
}
}
}
/*
- * This method can only be called asynchronously safely if we're sure no modify operation will take place until the flush is scheduled
+ * This method can only be called asynchronously safely if we're sure no modify operation
+ * will take place until the flush is scheduled
*/
private void flushDatasetOpenIndexes(DatasetResource dsr, boolean asyncFlush) throws HyracksDataException {
DatasetInfo dsInfo = dsr.getDatasetInfo();
@@ -389,53 +417,61 @@
// no memory components for external dataset
return;
}
- PrimaryIndexOperationTracker primaryOpTracker = dsr.getOpTracker();
- if (primaryOpTracker.getNumActiveOperations() > 0) {
- throw new IllegalStateException(
- "flushDatasetOpenIndexes is called on a dataset with currently active operations");
- }
+ for (PrimaryIndexOperationTracker primaryOpTracker : dsr.getOpTrackers()) {
+ // flush each partition one by one
+ if (primaryOpTracker.getNumActiveOperations() > 0) {
+ throw new IllegalStateException(
+ "flushDatasetOpenIndexes is called on a dataset with currently active operations");
+ }
+ int partition = primaryOpTracker.getPartition();
+ Collection<ILSMIndex> indexes = dsInfo.getDatasetPartitionOpenIndexes(partition);
+ ILSMComponentIdGenerator idGenerator = getComponentIdGenerator(dsInfo.getDatasetID(), partition);
+ idGenerator.refresh();
- ILSMComponentIdGenerator idGenerator = getComponentIdGenerator(dsInfo.getDatasetID());
- idGenerator.refresh();
+ if (dsInfo.isDurable()) {
+ synchronized (logRecord) {
+ TransactionUtil.formFlushLogRecord(logRecord, dsInfo.getDatasetID(), null);
+ try {
+ logManager.log(logRecord);
+ } catch (ACIDException e) {
+ throw new HyracksDataException("could not write flush log while closing dataset", e);
+ }
- if (dsInfo.isDurable()) {
- synchronized (logRecord) {
- TransactionUtil.formFlushLogRecord(logRecord, dsInfo.getDatasetID(), null);
- try {
- logManager.log(logRecord);
- } catch (ACIDException e) {
- throw new HyracksDataException("could not write flush log while closing dataset", e);
+ try {
+ //notification will come from LogBuffer class (notifyFlushTerminator)
+ logRecord.wait();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw HyracksDataException.create(e);
+ }
}
+ }
+ for (ILSMIndex index : indexes) {
+ //update resource lsn
+ AbstractLSMIOOperationCallback ioOpCallback =
+ (AbstractLSMIOOperationCallback) index.getIOOperationCallback();
+ ioOpCallback.updateLastLSN(logRecord.getLSN());
+ }
- try {
- //notification will come from LogPage class (notifyFlushTerminator)
- logRecord.wait();
- } catch (InterruptedException e) {
- throw new HyracksDataException(e);
+ if (asyncFlush) {
+ for (ILSMIndex index : indexes) {
+ ILSMIndexAccessor accessor = index.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+ accessor.scheduleFlush(index.getIOOperationCallback());
+ }
+ } else {
+ for (ILSMIndex index : indexes) {
+ // TODO: This is not efficient since we flush the indexes sequentially.
+ // Think of a way to allow submitting the flush requests concurrently.
+ // We don't do them concurrently because this may lead to a deadlock scenario
+ // between the DatasetLifeCycleManager and the PrimaryIndexOperationTracker.
+ ILSMIndexAccessor accessor = index.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+ accessor.scheduleFlush(index.getIOOperationCallback());
+ // Wait for the above flush op.
+ dsInfo.waitForIO();
}
}
}
- for (IndexInfo iInfo : dsInfo.getIndexes().values()) {
- //update resource lsn
- AbstractLSMIOOperationCallback ioOpCallback =
- (AbstractLSMIOOperationCallback) iInfo.getIndex().getIOOperationCallback();
- ioOpCallback.updateLastLSN(logRecord.getLSN());
- }
-
- if (asyncFlush) {
- for (IndexInfo iInfo : dsInfo.getIndexes().values()) {
- ILSMIndexAccessor accessor = iInfo.getIndex().createAccessor(NoOpIndexAccessParameters.INSTANCE);
- accessor.scheduleFlush(iInfo.getIndex().getIOOperationCallback());
- }
- } else {
- for (IndexInfo iInfo : dsInfo.getIndexes().values()) {
- // TODO: This is not efficient since we flush the indexes sequentially.
- // Think of a way to allow submitting the flush requests concurrently. We don't do them concurrently because this
- // may lead to a deadlock scenario between the DatasetLifeCycleManager and the PrimaryIndexOperationTracker.
- flushAndWaitForIO(dsInfo, iInfo);
- }
- }
}
private void closeDataset(DatasetResource dsr) throws HyracksDataException {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
index c02de7e..8dcae23 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.common.context;
+import java.util.Collection;
+import java.util.HashMap;
import java.util.Map;
import org.apache.asterix.common.dataflow.DatasetLocalResource;
@@ -41,17 +43,16 @@
*/
public class DatasetResource implements Comparable<DatasetResource> {
private final DatasetInfo datasetInfo;
- private final PrimaryIndexOperationTracker datasetPrimaryOpTracker;
private final DatasetVirtualBufferCaches datasetVirtualBufferCaches;
- private final ILSMComponentIdGenerator datasetComponentIdGenerator;
- public DatasetResource(DatasetInfo datasetInfo, PrimaryIndexOperationTracker datasetPrimaryOpTracker,
- DatasetVirtualBufferCaches datasetVirtualBufferCaches,
- ILSMComponentIdGenerator datasetComponentIdGenerator) {
+ private final Map<Integer, PrimaryIndexOperationTracker> datasetPrimaryOpTrackers;
+ private final Map<Integer, ILSMComponentIdGenerator> datasetComponentIdGenerators;
+
+ public DatasetResource(DatasetInfo datasetInfo, DatasetVirtualBufferCaches datasetVirtualBufferCaches) {
this.datasetInfo = datasetInfo;
- this.datasetPrimaryOpTracker = datasetPrimaryOpTracker;
this.datasetVirtualBufferCaches = datasetVirtualBufferCaches;
- this.datasetComponentIdGenerator = datasetComponentIdGenerator;
+ this.datasetPrimaryOpTrackers = new HashMap<>();
+ this.datasetComponentIdGenerators = new HashMap<>();
}
public boolean isRegistered() {
@@ -108,7 +109,8 @@
if (index == null) {
throw new HyracksDataException("Attempt to register a null index");
}
- datasetInfo.getIndexes().put(resourceID, new IndexInfo(index, datasetInfo.getDatasetID(), resource,
+
+ datasetInfo.addIndex(resourceID, new IndexInfo(index, datasetInfo.getDatasetID(), resource,
((DatasetLocalResource) resource.getResource()).getPartition()));
}
@@ -116,12 +118,31 @@
return datasetInfo;
}
- public PrimaryIndexOperationTracker getOpTracker() {
- return datasetPrimaryOpTracker;
+ public PrimaryIndexOperationTracker getOpTracker(int partition) {
+ return datasetPrimaryOpTrackers.get(partition);
}
- public ILSMComponentIdGenerator getIdGenerator() {
- return datasetComponentIdGenerator;
+ public Collection<PrimaryIndexOperationTracker> getOpTrackers() {
+ return datasetPrimaryOpTrackers.values();
+ }
+
+ public ILSMComponentIdGenerator getComponentIdGenerator(int partition) {
+ return datasetComponentIdGenerators.get(partition);
+ }
+
+ public void setPrimaryIndexOperationTracker(int partition, PrimaryIndexOperationTracker opTracker) {
+ if (datasetPrimaryOpTrackers.containsKey(partition)) {
+ throw new IllegalStateException(
+ "PrimaryIndexOperationTracker has already been set for partition " + partition);
+ }
+ datasetPrimaryOpTrackers.put(partition, opTracker);
+ }
+
+ public void setIdGenerator(int partition, ILSMComponentIdGenerator idGenerator) {
+ if (datasetComponentIdGenerators.containsKey(partition)) {
+ throw new IllegalStateException("LSMComponentIdGenerator has already been set for partition " + partition);
+ }
+ datasetComponentIdGenerators.put(partition, idGenerator);
}
@Override
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index 5170310..1a76b66 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -43,6 +43,7 @@
public class PrimaryIndexOperationTracker extends BaseOperationTracker {
+ private final int partition;
// Number of active operations on an ILSMIndex instance.
private final AtomicInteger numActiveOperations;
private final ILogManager logManager;
@@ -50,9 +51,10 @@
private boolean flushOnExit = false;
private boolean flushLogCreated = false;
- public PrimaryIndexOperationTracker(int datasetID, ILogManager logManager, DatasetInfo dsInfo,
+ public PrimaryIndexOperationTracker(int datasetID, int partition, ILogManager logManager, DatasetInfo dsInfo,
ILSMComponentIdGenerator idGenerator) {
super(datasetID, dsInfo);
+ this.partition = partition;
this.logManager = logManager;
this.numActiveOperations = new AtomicInteger();
this.idGenerator = idGenerator;
@@ -100,7 +102,7 @@
// or if there is a flush scheduled by the checkpoint (flushOnExit), then schedule it
boolean needsFlush = false;
- Set<ILSMIndex> indexes = dsInfo.getDatasetIndexes();
+ Set<ILSMIndex> indexes = dsInfo.getDatasetPartitionOpenIndexes(partition);
if (!flushOnExit) {
for (ILSMIndex lsmIndex : indexes) {
@@ -146,7 +148,7 @@
//This method is called sequentially by LogPage.notifyFlushTerminator in the sequence flushes were scheduled.
public synchronized void triggerScheduleFlush(LogRecord logRecord) throws HyracksDataException {
idGenerator.refresh();
- for (ILSMIndex lsmIndex : dsInfo.getDatasetIndexes()) {
+ for (ILSMIndex lsmIndex : dsInfo.getDatasetPartitionOpenIndexes(partition)) {
//get resource
ILSMIndexAccessor accessor = lsmIndex.createAccessor(NoOpIndexAccessParameters.INSTANCE);
//update resource lsn
@@ -199,4 +201,8 @@
return flushLogCreated;
}
+ public int getPartition() {
+ return partition;
+ }
+
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java
index ed56ab1..5b9883c 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java
@@ -24,11 +24,13 @@
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
+import org.apache.hyracks.storage.common.IResource;
public abstract class AbstractLSMIndexIOOperationCallbackFactory implements ILSMIOOperationCallbackFactory {
@@ -38,17 +40,20 @@
protected transient INCServiceContext ncCtx;
+ protected transient IResource resource;
+
public AbstractLSMIndexIOOperationCallbackFactory(ILSMComponentIdGeneratorFactory idGeneratorFactory) {
this.idGeneratorFactory = idGeneratorFactory;
}
@Override
- public void initialize(INCServiceContext ncCtx) {
+ public void initialize(INCServiceContext ncCtx, IResource resource) {
this.ncCtx = ncCtx;
+ this.resource = resource;
}
- protected ILSMComponentIdGenerator getComponentIdGenerator() {
- return idGeneratorFactory.getComponentIdGenerator(ncCtx);
+ protected ILSMComponentIdGenerator getComponentIdGenerator() throws HyracksDataException {
+ return idGeneratorFactory.getComponentIdGenerator(ncCtx, resource);
}
protected IIndexCheckpointManagerProvider getIndexCheckpointManagerProvider() {
@@ -60,7 +65,7 @@
private static final long serialVersionUID = 1L;
@Override
- public ILSMComponentIdGenerator getComponentIdGenerator(INCServiceContext serviceCtx) {
+ public ILSMComponentIdGenerator getComponentIdGenerator(INCServiceContext serviceCtx, IResource resource) {
// used for backward compatibility
// if idGeneratorFactory is not set for legacy lsm indexes, we return a default
// component id generator which always generates the missing component id.
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
index 95245cb..97badb2 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
@@ -19,6 +19,7 @@
package org.apache.asterix.common.ioopcallbacks;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
@@ -32,7 +33,7 @@
}
@Override
- public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) {
+ public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) throws HyracksDataException {
return new LSMBTreeIOOperationCallback(index, getComponentIdGenerator(), getIndexCheckpointManagerProvider());
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java
index 6c75ed6..9b32345 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.common.ioopcallbacks;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
@@ -31,7 +32,7 @@
}
@Override
- public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) {
+ public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) throws HyracksDataException {
return new LSMBTreeWithBuddyIOOperationCallback(index, getComponentIdGenerator(),
getIndexCheckpointManagerProvider());
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
index fb73d19..766ef95 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
@@ -19,6 +19,7 @@
package org.apache.asterix.common.ioopcallbacks;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
@@ -32,7 +33,7 @@
}
@Override
- public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) {
+ public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) throws HyracksDataException {
return new LSMInvertedIndexIOOperationCallback(index, getComponentIdGenerator(),
getIndexCheckpointManagerProvider());
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
index 94be0bb..3a0afa8 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
@@ -19,6 +19,7 @@
package org.apache.asterix.common.ioopcallbacks;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
@@ -32,7 +33,7 @@
}
@Override
- public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) {
+ public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) throws HyracksDataException {
return new LSMRTreeIOOperationCallback(index, getComponentIdGenerator(), getIndexCheckpointManagerProvider());
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
index c4a2d03..a3d5bc5 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
@@ -38,11 +38,13 @@
* transaction.
*
* @param resourceId
+ * @param partition
* @param index
* @param callback
* @param primaryIndex
*/
- void register(long resourceId, ILSMIndex index, IModificationOperationCallback callback, boolean primaryIndex);
+ void register(long resourceId, int partition, ILSMIndex index, IModificationOperationCallback callback,
+ boolean primaryIndex);
/**
* Gets the unique transaction id.
@@ -135,8 +137,10 @@
* Called to notify the transaction that an entity commit
* log belonging to this transaction has been flushed to
* disk.
+ *
+ * @param partition
*/
- void notifyEntityCommitted();
+ void notifyEntityCommitted(int partition);
/**
* Called after an operation is performed on index
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
index 3aa7b17..f9f742a 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
@@ -22,15 +22,14 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.context.CorrelatedPrefixMergePolicy;
import org.apache.asterix.common.context.DatasetInfo;
import org.apache.asterix.common.context.IndexInfo;
+import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
@@ -60,6 +59,8 @@
private final int DATASET_ID = 1;
+ private long nextResourceId = 0;
+
@Test
public void testBasic() {
try {
@@ -183,19 +184,15 @@
}
}
- private ILSMMergePolicy mockMergePolicy(IndexInfo... indexes) {
+ private ILSMMergePolicy mockMergePolicy(IndexInfo... indexInfos) {
Map<String, String> properties = new HashMap<>();
properties.put("max-tolerance-component-count", String.valueOf(MAX_COMPONENT_COUNT));
properties.put("max-mergable-component-size", String.valueOf(MAX_COMPONENT_SIZE));
- Set<IndexInfo> indexInfos = new HashSet<>();
- for (IndexInfo info : indexes) {
- indexInfos.add(info);
+ DatasetInfo dsInfo = new DatasetInfo(DATASET_ID);
+ for (IndexInfo index : indexInfos) {
+ dsInfo.addIndex(index.getResourceId(), index);
}
-
- DatasetInfo dsInfo = Mockito.mock(DatasetInfo.class);
- Mockito.when(dsInfo.getDatsetIndexInfos()).thenReturn(indexInfos);
-
IDatasetLifecycleManager manager = Mockito.mock(IDatasetLifecycleManager.class);
Mockito.when(manager.getDatasetInfo(DATASET_ID)).thenReturn(dsInfo);
@@ -238,8 +235,16 @@
Mockito.when(index.createAccessor(Mockito.any(IIndexAccessParameters.class))).thenReturn(accessor);
Mockito.when(index.isPrimaryIndex()).thenReturn(isPrimary);
+ if (isPrimary) {
+ PrimaryIndexOperationTracker opTracker = Mockito.mock(PrimaryIndexOperationTracker.class);
+ Mockito.when(opTracker.getPartition()).thenReturn(partition);
+ Mockito.when(index.getOperationTracker()).thenReturn(opTracker);
+ }
final LocalResource localResource = Mockito.mock(LocalResource.class);
- return new IndexInfo(index, DATASET_ID, localResource, partition);
+ Mockito.when(localResource.getId()).thenReturn(nextResourceId++);
+ IndexInfo indexInfo = new IndexInfo(index, DATASET_ID, localResource, partition);
+ indexInfo.setOpen(true);
+ return indexInfo;
}
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index 6634e51..e8f2595 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -42,6 +42,7 @@
import org.apache.asterix.common.transactions.ImmutableDatasetId;
import org.apache.asterix.common.transactions.TransactionOptions;
import org.apache.asterix.common.transactions.TxnId;
+import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
import org.apache.asterix.metadata.api.ExtensionMetadataDataset;
@@ -474,7 +475,9 @@
IIndexAccessParameters iap = new IndexAccessParameters(modCallback, NoOpOperationCallback.INSTANCE);
ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(iap);
txnCtx.setWriteTxn(true);
- txnCtx.register(metadataIndex.getResourceId(), lsmIndex, modCallback, metadataIndex.isPrimaryIndex());
+ txnCtx.register(metadataIndex.getResourceId(),
+ StoragePathUtil.getPartitionNumFromRelativePath(resourceName), lsmIndex, modCallback,
+ metadataIndex.isPrimaryIndex());
LSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) lsmIndex, transactionSubsystem.getLogManager());
switch (op) {
case INSERT:
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index 9753bcf..9ebd21b 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -63,7 +63,6 @@
import org.apache.asterix.metadata.utils.MetadataUtil;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerFactory;
import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerFactory;
import org.apache.asterix.transaction.management.resource.DatasetLocalResourceFactory;
@@ -476,4 +475,5 @@
public static void setNewUniverse(boolean isNewUniverse) {
MetadataBootstrap.isNewUniverse = isNewUniverse;
}
+
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index ea2d715..8cd7053 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -42,6 +42,7 @@
import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
import org.apache.asterix.common.utils.JobUtils;
import org.apache.asterix.common.utils.JobUtils.ProgressState;
+import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.external.feed.management.FeedConnectionId;
import org.apache.asterix.external.indexing.IndexingConstants;
import org.apache.asterix.formats.nontagged.BinaryHashFunctionFactoryProvider;
@@ -818,6 +819,10 @@
protected int[] getDatasetPartitions(MetadataProvider metadataProvider) throws AlgebricksException {
FileSplit[] splitsForDataset =
metadataProvider.splitsForIndex(metadataProvider.getMetadataTxnContext(), this, getDatasetName());
- return IntStream.range(0, splitsForDataset.length).toArray();
+ int[] partitions = new int[splitsForDataset.length];
+ for (int i = 0; i < partitions.length; i++) {
+ partitions[i] = StoragePathUtil.getPartitionNumFromRelativePath(splitsForDataset[i].getPath());
+ }
+ return partitions;
}
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
index 7913d48..6faffc7 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
@@ -18,13 +18,12 @@
*/
package org.apache.asterix.runtime.job.listener;
-import static org.apache.asterix.common.transactions.ITransactionManager.AtomicityLevel;
-
import org.apache.asterix.common.api.IJobEventListenerFactory;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.transactions.ITransactionContext;
import org.apache.asterix.common.transactions.ITransactionManager;
+import org.apache.asterix.common.transactions.ITransactionManager.AtomicityLevel;
import org.apache.asterix.common.transactions.TransactionOptions;
import org.apache.asterix.common.transactions.TxnId;
import org.apache.hyracks.api.context.IHyracksJobletContext;
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
index d057c50..10c6b8f 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
@@ -73,7 +73,7 @@
IModificationOperationCallback modCallback = new PrimaryIndexModificationOperationCallback(
new DatasetId(datasetId), primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem,
resource.getId(), aResource.getPartition(), resourceType, indexOp, operatorNodePushable);
- txnCtx.register(resource.getId(), index, modCallback, true);
+ txnCtx.register(resource.getId(), aResource.getPartition(), index, modCallback, true);
return modCallback;
} catch (ACIDException e) {
throw HyracksDataException.create(e);
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java
index f40140a..eef1cb0 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java
@@ -21,9 +21,12 @@
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
+import org.apache.hyracks.storage.common.IResource;
public class PrimaryIndexOperationTrackerFactory implements ILSMOperationTrackerFactory {
@@ -36,10 +39,12 @@
}
@Override
- public ILSMOperationTracker getOperationTracker(INCServiceContext ctx) {
+ public ILSMOperationTracker getOperationTracker(INCServiceContext ctx, IResource resource)
+ throws HyracksDataException {
IDatasetLifecycleManager dslcManager =
((INcApplicationContext) ctx.getApplicationContext()).getDatasetLifecycleManager();
- return dslcManager.getOperationTracker(datasetId);
+ int partition = StoragePathUtil.getPartitionNumFromRelativePath(resource.getPath());
+ return dslcManager.getOperationTracker(datasetId, partition);
}
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
index 26e1b22..a927da0 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
@@ -69,7 +69,7 @@
IModificationOperationCallback modCallback = new SecondaryIndexModificationOperationCallback(
new DatasetId(datasetId), primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem,
resource.getId(), aResource.getPartition(), resourceType, indexOp);
- txnCtx.register(resource.getId(), index, modCallback, false);
+ txnCtx.register(resource.getId(), aResource.getPartition(), index, modCallback, false);
return modCallback;
} catch (ACIDException e) {
throw HyracksDataException.create(e);
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java
index febcac2..7586980 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java
@@ -18,12 +18,13 @@
*/
package org.apache.asterix.transaction.management.opcallbacks;
-import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.context.BaseOperationTracker;
import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
+import org.apache.hyracks.storage.common.IResource;
public class SecondaryIndexOperationTrackerFactory implements ILSMOperationTrackerFactory {
@@ -36,7 +37,7 @@
}
@Override
- public ILSMOperationTracker getOperationTracker(INCServiceContext ctx) {
+ public ILSMOperationTracker getOperationTracker(INCServiceContext ctx, IResource resource) {
IDatasetLifecycleManager dslcManager =
((INcApplicationContext) ctx.getApplicationContext()).getDatasetLifecycleManager();
return new BaseOperationTracker(datasetID, dslcManager.getDatasetInfo(datasetID));
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
index 1449a1b..8f7d445 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
@@ -68,7 +68,7 @@
IModificationOperationCallback modCallback = new UpsertOperationCallback(new DatasetId(datasetId),
primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resource.getId(),
aResource.getPartition(), resourceType, indexOp);
- txnCtx.register(resource.getId(), index, modCallback, true);
+ txnCtx.register(resource.getId(), aResource.getPartition(), index, modCallback, true);
return modCallback;
} catch (ACIDException e) {
throw new HyracksDataException(e);
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/DatasetLocalResourceFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/DatasetLocalResourceFactory.java
index 8724128..9ce5843 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/DatasetLocalResourceFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/DatasetLocalResourceFactory.java
@@ -37,9 +37,8 @@
@Override
public IResource createResource(FileReference fileRef) {
- IResource resource = resourceFactory.createResource(fileRef);
- // Currently, we get the partition number from the relative path
int partition = StoragePathUtil.getPartitionNumFromRelativePath(fileRef.getRelativePath());
+ IResource resource = resourceFactory.createResource(fileRef);
return new DatasetLocalResource(datasetId, partition, resource);
}
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
index bc487fe..a630caa 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
@@ -222,7 +222,7 @@
txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(reusableTxnId);
txnSubsystem.getLockManager().unlock(reusableDatasetId, logRecord.getPKHashValue(),
LockMode.ANY, txnCtx);
- txnCtx.notifyEntityCommitted();
+ txnCtx.notifyEntityCommitted(logRecord.getResourcePartition());
if (txnSubsystem.getTransactionProperties().isCommitProfilerEnabled()) {
txnSubsystem.incrementEntityCommitCount();
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java
index 43fe266..b3d5e49 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java
@@ -118,7 +118,7 @@
}
@Override
- public void register(long resourceId, ILSMIndex index, IModificationOperationCallback callback,
+ public void register(long resourceId, int partition, ILSMIndex index, IModificationOperationCallback callback,
boolean primaryIndex) {
synchronized (txnOpTrackers) {
if (!txnOpTrackers.containsKey(resourceId)) {
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
index 1d132a8..219cf07 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
@@ -44,9 +44,9 @@
}
@Override
- public void register(long resourceId, ILSMIndex index, IModificationOperationCallback callback,
+ public void register(long resourceId, int partition, ILSMIndex index, IModificationOperationCallback callback,
boolean primaryIndex) {
- super.register(resourceId, index, callback, primaryIndex);
+ super.register(resourceId, partition, index, callback, primaryIndex);
synchronized (txnOpTrackers) {
if (primaryIndex && !opTrackers.containsKey(resourceId)) {
opTrackers.put(resourceId, index.getOperationTracker());
@@ -67,7 +67,7 @@
}
@Override
- public void notifyEntityCommitted() {
+ public void notifyEntityCommitted(int partition) {
throw new IllegalStateException("Unexpected entity commit in atomic transaction");
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
index e195451..9d2f54b 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
@@ -18,11 +18,15 @@
*/
package org.apache.asterix.transaction.management.service.transaction;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.transactions.TxnId;
+import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
@@ -32,30 +36,36 @@
@ThreadSafe
public class EntityLevelTransactionContext extends AbstractTransactionContext {
- private PrimaryIndexOperationTracker primaryIndexOpTracker;
- private IModificationOperationCallback primaryIndexCallback;
- private final AtomicInteger pendingOps;
+ private final Map<Integer, Pair<PrimaryIndexOperationTracker, IModificationOperationCallback>> primaryIndexTrackers;
+ private final Map<Long, AtomicInteger> resourcePendingOps;
+ private final Map<Integer, AtomicInteger> partitionPendingOps;
public EntityLevelTransactionContext(TxnId txnId) {
super(txnId);
- pendingOps = new AtomicInteger(0);
+ this.primaryIndexTrackers = new HashMap<>();
+ this.resourcePendingOps = new HashMap<>();
+ this.partitionPendingOps = new HashMap<>();
}
@Override
- public void register(long resourceId, ILSMIndex index, IModificationOperationCallback callback,
+ public void register(long resourceId, int partition, ILSMIndex index, IModificationOperationCallback callback,
boolean primaryIndex) {
- super.register(resourceId, index, callback, primaryIndex);
+ super.register(resourceId, partition, index, callback, primaryIndex);
synchronized (txnOpTrackers) {
- if (primaryIndex && primaryIndexOpTracker == null) {
- primaryIndexCallback = callback;
- primaryIndexOpTracker = (PrimaryIndexOperationTracker) index.getOperationTracker();
+ AtomicInteger pendingOps = partitionPendingOps.computeIfAbsent(partition, p -> new AtomicInteger(0));
+ resourcePendingOps.put(resourceId, pendingOps);
+ if (primaryIndex) {
+ Pair<PrimaryIndexOperationTracker, IModificationOperationCallback> pair =
+ new Pair<PrimaryIndexOperationTracker, IModificationOperationCallback>(
+ (PrimaryIndexOperationTracker) index.getOperationTracker(), callback);
+ primaryIndexTrackers.put(partition, pair);
}
}
}
@Override
public void beforeOperation(long resourceId) {
- pendingOps.incrementAndGet();
+ resourcePendingOps.get(resourceId).incrementAndGet();
}
@Override
@@ -64,9 +74,11 @@
}
@Override
- public void notifyEntityCommitted() {
+ public void notifyEntityCommitted(int partition) {
try {
- primaryIndexOpTracker.completeOperation(null, LSMOperationType.MODIFICATION, null, primaryIndexCallback);
+ Pair<PrimaryIndexOperationTracker, IModificationOperationCallback> pair =
+ primaryIndexTrackers.get(partition);
+ pair.first.completeOperation(null, LSMOperationType.MODIFICATION, null, pair.second);
} catch (HyracksDataException e) {
throw new ACIDException(e);
}
@@ -74,13 +86,15 @@
@Override
public void afterOperation(long resourceId) {
- pendingOps.decrementAndGet();
+ resourcePendingOps.get(resourceId).decrementAndGet();
}
@Override
protected void cleanupForAbort() {
- if (primaryIndexOpTracker != null) {
- primaryIndexOpTracker.cleanupNumActiveOperationsForAbortedJob(pendingOps.get());
+ for (Entry<Integer, Pair<PrimaryIndexOperationTracker, IModificationOperationCallback>> e : primaryIndexTrackers
+ .entrySet()) {
+ AtomicInteger pendingOps = partitionPendingOps.get(e.getKey());
+ e.getValue().first.cleanupNumActiveOperationsForAbortedJob(pendingOps.get());
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java
index 673bd3b..445f363 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java
@@ -57,11 +57,11 @@
public ILSMIndex createInstance(INCServiceContext serviceCtx) throws HyracksDataException {
IIOManager ioManager = serviceCtx.getIoManager();
FileReference file = ioManager.resolve(path);
- ioOpCallbackFactory.initialize(serviceCtx);
+ ioOpCallbackFactory.initialize(serviceCtx, this);
return LSMBTreeUtil.createExternalBTree(ioManager, file, storageManager.getBufferCache(serviceCtx), typeTraits,
cmpFactories, bloomFilterKeyFields, bloomFilterFalsePositiveRate,
mergePolicyFactory.createMergePolicy(mergePolicyProperties, serviceCtx),
- opTrackerProvider.getOperationTracker(serviceCtx), ioSchedulerProvider.getIoScheduler(serviceCtx),
+ opTrackerProvider.getOperationTracker(serviceCtx, this), ioSchedulerProvider.getIoScheduler(serviceCtx),
ioOpCallbackFactory, durable, metadataPageManagerFactory, serviceCtx.getTracer());
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java
index 7e44c63..acdb09e 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java
@@ -60,11 +60,11 @@
public ILSMIndex createInstance(INCServiceContext serviceCtx) throws HyracksDataException {
IIOManager ioManager = serviceCtx.getIoManager();
FileReference file = ioManager.resolve(path);
- ioOpCallbackFactory.initialize(serviceCtx);
+ ioOpCallbackFactory.initialize(serviceCtx, this);
return LSMBTreeUtil.createExternalBTreeWithBuddy(ioManager, file, storageManager.getBufferCache(serviceCtx),
typeTraits, cmpFactories, bloomFilterFalsePositiveRate,
mergePolicyFactory.createMergePolicy(mergePolicyProperties, serviceCtx),
- opTrackerProvider.getOperationTracker(serviceCtx), ioSchedulerProvider.getIoScheduler(serviceCtx),
+ opTrackerProvider.getOperationTracker(serviceCtx, this), ioSchedulerProvider.getIoScheduler(serviceCtx),
ioOpCallbackFactory, bloomFilterKeyFields, durable, metadataPageManagerFactory, serviceCtx.getTracer());
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java
index 1988736..f0b86d2 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java
@@ -70,13 +70,13 @@
IIOManager ioManager = serviceCtx.getIoManager();
FileReference file = ioManager.resolve(path);
List<IVirtualBufferCache> vbcs = vbcProvider.getVirtualBufferCaches(serviceCtx, file);
- ioOpCallbackFactory.initialize(serviceCtx);
+ ioOpCallbackFactory.initialize(serviceCtx, this);
//TODO: enable updateAwareness for secondary LSMBTree indexes
boolean updateAware = false;
return LSMBTreeUtil.createLSMTree(ioManager, vbcs, file, storageManager.getBufferCache(serviceCtx), typeTraits,
cmpFactories, bloomFilterKeyFields, bloomFilterFalsePositiveRate,
mergePolicyFactory.createMergePolicy(mergePolicyProperties, serviceCtx),
- opTrackerProvider.getOperationTracker(serviceCtx), ioSchedulerProvider.getIoScheduler(serviceCtx),
+ opTrackerProvider.getOperationTracker(serviceCtx, this), ioSchedulerProvider.getIoScheduler(serviceCtx),
ioOpCallbackFactory, isPrimary, filterTypeTraits, filterCmpFactories, btreeFields, filterFields,
durable, metadataPageManagerFactory, updateAware, serviceCtx.getTracer());
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java
index 280803d..92d74d9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java
@@ -94,7 +94,8 @@
ILSMDiskComponentFactory bulkLoadComponentFactory, ILSMDiskComponentFactory transactionComponentFactory,
double bloomFilterFalsePositiveRate, IBinaryComparatorFactory[] cmpFactories, ILSMMergePolicy mergePolicy,
ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler,
- ILSMIOOperationCallbackFactory ioOpCallbackFactory, boolean durable, ITracer tracer) {
+ ILSMIOOperationCallbackFactory ioOpCallbackFactory, boolean durable, ITracer tracer)
+ throws HyracksDataException {
super(ioManager, insertLeafFrameFactory, deleteLeafFrameFactory, bufferCache, fileManager, componentFactory,
bulkLoadComponentFactory, bloomFilterFalsePositiveRate, cmpFactories, mergePolicy, opTracker,
ioScheduler, ioOpCallbackFactory, false, durable, tracer);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java
index 5f04c0a..6993013 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java
@@ -89,8 +89,8 @@
ILSMDiskComponentFactory bulkLoadComponentFactory, double bloomFilterFalsePositiveRate,
ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler,
ILSMIOOperationCallbackFactory ioOpCallbackFactory, IBinaryComparatorFactory[] btreeCmpFactories,
- IBinaryComparatorFactory[] buddyBtreeCmpFactories, int[] buddyBTreeFields, boolean durable,
- ITracer tracer) {
+ IBinaryComparatorFactory[] buddyBtreeCmpFactories, int[] buddyBTreeFields, boolean durable, ITracer tracer)
+ throws HyracksDataException {
super(ioManager, diskBufferCache, fileManager, bloomFilterFalsePositiveRate, mergePolicy, opTracker,
ioScheduler, ioOpCallbackFactory, componentFactory, bulkLoadComponentFactory, durable, tracer);
this.btreeCmpFactories = btreeCmpFactories;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index 0593ad5..b4990d6 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -125,7 +125,7 @@
double bloomFilterFalsePositiveRate, IBinaryComparatorFactory[] cmpFactories, ILSMMergePolicy mergePolicy,
ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler,
ILSMIOOperationCallbackFactory ioOpCallbackFactory, boolean needKeyDupCheck, boolean durable,
- ITracer tracer) {
+ ITracer tracer) throws HyracksDataException {
super(ioManager, bufferCache, fileManager, bloomFilterFalsePositiveRate, mergePolicy, opTracker, ioScheduler,
ioOpCallbackFactory, componentFactory, bulkLoadComponentFactory, durable, tracer);
this.insertLeafFrameFactory = insertLeafFrameFactory;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java
index 4706faa..cc10a98 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java
@@ -135,7 +135,7 @@
int[] bloomFilterKeyFields, double bloomFilterFalsePositiveRate, ILSMMergePolicy mergePolicy,
ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler,
ILSMIOOperationCallbackFactory ioOpCallbackFactory, boolean durable,
- IMetadataPageManagerFactory freePageManagerFactory, ITracer tracer) {
+ IMetadataPageManagerFactory freePageManagerFactory, ITracer tracer) throws HyracksDataException {
LSMBTreeTupleWriterFactory insertTupleWriterFactory =
new LSMBTreeTupleWriterFactory(typeTraits, cmpFactories.length, false, false);
LSMBTreeTupleWriterFactory deleteTupleWriterFactory =
@@ -187,8 +187,8 @@
IBufferCache diskBufferCache, ITypeTraits[] typeTraits, IBinaryComparatorFactory[] cmpFactories,
double bloomFilterFalsePositiveRate, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker,
ILSMIOOperationScheduler ioScheduler, ILSMIOOperationCallbackFactory ioOpCallbackFactory,
- int[] buddyBTreeFields, boolean durable, IMetadataPageManagerFactory freePageManagerFactory,
- ITracer tracer) {
+ int[] buddyBTreeFields, boolean durable, IMetadataPageManagerFactory freePageManagerFactory, ITracer tracer)
+ throws HyracksDataException {
ITypeTraits[] buddyBtreeTypeTraits = new ITypeTraits[buddyBTreeFields.length];
IBinaryComparatorFactory[] buddyBtreeCmpFactories = new IBinaryComparatorFactory[buddyBTreeFields.length];
for (int i = 0; i < buddyBtreeTypeTraits.length; i++) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGeneratorFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGeneratorFactory.java
index c0f530b..4ec82c1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGeneratorFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGeneratorFactory.java
@@ -21,9 +21,11 @@
import java.io.Serializable;
import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.common.IResource;
@FunctionalInterface
public interface ILSMComponentIdGeneratorFactory extends Serializable {
-
- ILSMComponentIdGenerator getComponentIdGenerator(INCServiceContext serviceCtx);
+ ILSMComponentIdGenerator getComponentIdGenerator(INCServiceContext serviceCtx, IResource resource)
+ throws HyracksDataException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallbackFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallbackFactory.java
index a9dc50e..e8742b5 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallbackFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallbackFactory.java
@@ -21,14 +21,16 @@
import java.io.Serializable;
import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.common.IResource;
public interface ILSMIOOperationCallbackFactory extends Serializable {
/**
- * Initialize the callback factory with the given ncCtx
+ * Initialize the callback factory with the given ncCtx and resource
*
* @param ncCtx
*/
- void initialize(INCServiceContext ncCtx);
+ void initialize(INCServiceContext ncCtx, IResource resource);
- ILSMIOOperationCallback createIoOpCallback(ILSMIndex index);
+ ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) throws HyracksDataException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMOperationTrackerFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMOperationTrackerFactory.java
index 217f794..ef22620 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMOperationTrackerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMOperationTrackerFactory.java
@@ -21,8 +21,10 @@
import java.io.Serializable;
import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.common.IResource;
@FunctionalInterface
public interface ILSMOperationTrackerFactory extends Serializable {
- ILSMOperationTracker getOperationTracker(INCServiceContext ctx);
+ ILSMOperationTracker getOperationTracker(INCServiceContext ctx, IResource resource) throws HyracksDataException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index 6c1ef55..5a95af0 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -112,7 +112,7 @@
ILSMIOOperationCallbackFactory ioOpCallbackFactory, ILSMDiskComponentFactory componentFactory,
ILSMDiskComponentFactory bulkLoadComponentFactory, ILSMComponentFilterFrameFactory filterFrameFactory,
LSMComponentFilterManager filterManager, int[] filterFields, boolean durable,
- IComponentFilterHelper filterHelper, int[] treeFields, ITracer tracer) {
+ IComponentFilterHelper filterHelper, int[] treeFields, ITracer tracer) throws HyracksDataException {
this.ioManager = ioManager;
this.virtualBufferCaches = virtualBufferCaches;
this.diskBufferCache = diskBufferCache;
@@ -146,7 +146,7 @@
double bloomFilterFalsePositiveRate, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker,
ILSMIOOperationScheduler ioScheduler, ILSMIOOperationCallbackFactory ioOpCallbackFactory,
ILSMDiskComponentFactory componentFactory, ILSMDiskComponentFactory bulkLoadComponentFactory,
- boolean durable, ITracer tracer) {
+ boolean durable, ITracer tracer) throws HyracksDataException {
this.ioManager = ioManager;
this.diskBufferCache = diskBufferCache;
this.fileManager = fileManager;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGeneratorFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGeneratorFactory.java
index 728c90a..d288ec8 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGeneratorFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGeneratorFactory.java
@@ -22,6 +22,7 @@
import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory;
+import org.apache.hyracks.storage.common.IResource;
/**
* A default implementation of {@link ILSMComponentIdGeneratorFactory}.
@@ -32,7 +33,7 @@
private static final long serialVersionUID = 1L;
@Override
- public ILSMComponentIdGenerator getComponentIdGenerator(INCServiceContext serviceCtx) {
+ public ILSMComponentIdGenerator getComponentIdGenerator(INCServiceContext serviceCtx, IResource resource) {
return new LSMComponentIdGenerator();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java
index 3a58c19..eec2dca 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallbackFactory.java
@@ -29,6 +29,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
+import org.apache.hyracks.storage.common.IResource;
public enum NoOpIOOperationCallbackFactory implements ILSMIOOperationCallbackFactory {
INSTANCE;
@@ -39,7 +40,7 @@
}
@Override
- public void initialize(INCServiceContext ncCtx) {
+ public void initialize(INCServiceContext ncCtx, IResource resource) {
// No op
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpOperationTrackerFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpOperationTrackerFactory.java
index 55a2164..8d8e763 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpOperationTrackerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpOperationTrackerFactory.java
@@ -25,6 +25,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
import org.apache.hyracks.storage.common.IModificationOperationCallback;
+import org.apache.hyracks.storage.common.IResource;
import org.apache.hyracks.storage.common.ISearchOperationCallback;
/**
@@ -43,7 +44,7 @@
}
@Override
- public ILSMOperationTracker getOperationTracker(INCServiceContext ctx) {
+ public ILSMOperationTracker getOperationTracker(INCServiceContext ctx, IResource resource) {
return tracker;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ThreadCountingOperationTrackerFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ThreadCountingOperationTrackerFactory.java
index d01e7ba..6c17ee8 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ThreadCountingOperationTrackerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ThreadCountingOperationTrackerFactory.java
@@ -21,6 +21,7 @@
import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
+import org.apache.hyracks.storage.common.IResource;
public class ThreadCountingOperationTrackerFactory implements ILSMOperationTrackerFactory {
@@ -32,7 +33,7 @@
}
@Override
- public ILSMOperationTracker getOperationTracker(INCServiceContext ctx) {
+ public ILSMOperationTracker getOperationTracker(INCServiceContext ctx, IResource resource) {
return new ThreadCountingTracker();
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexLocalResource.java
index a45f006..4eb7728 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexLocalResource.java
@@ -87,19 +87,19 @@
IBufferCache bufferCache = storageManager.getBufferCache(serviceCtx);
ILSMMergePolicy mergePolicy = mergePolicyFactory.createMergePolicy(mergePolicyProperties, serviceCtx);
ILSMIOOperationScheduler ioScheduler = ioSchedulerProvider.getIoScheduler(serviceCtx);
- ioOpCallbackFactory.initialize(serviceCtx);
+ ioOpCallbackFactory.initialize(serviceCtx, this);
if (isPartitioned) {
return InvertedIndexUtils.createPartitionedLSMInvertedIndex(ioManager, virtualBufferCaches, typeTraits,
cmpFactories, tokenTypeTraits, tokenCmpFactories, tokenizerFactory, bufferCache,
file.getAbsolutePath(), bloomFilterFalsePositiveRate, mergePolicy,
- opTrackerProvider.getOperationTracker(serviceCtx), ioScheduler, ioOpCallbackFactory,
+ opTrackerProvider.getOperationTracker(serviceCtx, this), ioScheduler, ioOpCallbackFactory,
invertedIndexFields, filterTypeTraits, filterCmpFactories, filterFields,
filterFieldsForNonBulkLoadOps, invertedIndexFieldsForNonBulkLoadOps, durable,
metadataPageManagerFactory);
} else {
return InvertedIndexUtils.createLSMInvertedIndex(ioManager, virtualBufferCaches, typeTraits, cmpFactories,
tokenTypeTraits, tokenCmpFactories, tokenizerFactory, bufferCache, file.getAbsolutePath(),
- bloomFilterFalsePositiveRate, mergePolicy, opTrackerProvider.getOperationTracker(serviceCtx),
+ bloomFilterFalsePositiveRate, mergePolicy, opTrackerProvider.getOperationTracker(serviceCtx, this),
ioScheduler, ioOpCallbackFactory, invertedIndexFields, filterTypeTraits, filterCmpFactories,
filterFields, filterFieldsForNonBulkLoadOps, invertedIndexFieldsForNonBulkLoadOps, durable,
metadataPageManagerFactory);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/ExternalRTreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/ExternalRTreeLocalResource.java
index 9960590..f72e17c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/ExternalRTreeLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/ExternalRTreeLocalResource.java
@@ -66,13 +66,13 @@
public IIndex createInstance(INCServiceContext ncServiceCtx) throws HyracksDataException {
IIOManager ioManager = ncServiceCtx.getIoManager();
FileReference fileRef = ioManager.resolve(path);
- ioOpCallbackFactory.initialize(ncServiceCtx);
+ ioOpCallbackFactory.initialize(ncServiceCtx, this);
return LSMRTreeUtils.createExternalRTree(ioManager, fileRef, storageManager.getBufferCache(ncServiceCtx),
typeTraits, cmpFactories, btreeCmpFactories, valueProviderFactories, rtreePolicyType,
bloomFilterFalsePositiveRate, mergePolicyFactory.createMergePolicy(mergePolicyProperties, ncServiceCtx),
- opTrackerProvider.getOperationTracker(ncServiceCtx), ioSchedulerProvider.getIoScheduler(ncServiceCtx),
- ioOpCallbackFactory, linearizeCmpFactory, buddyBTreeFields, durable, isPointMBR,
- metadataPageManagerFactory, ncServiceCtx.getTracer());
+ opTrackerProvider.getOperationTracker(ncServiceCtx, this),
+ ioSchedulerProvider.getIoScheduler(ncServiceCtx), ioOpCallbackFactory, linearizeCmpFactory,
+ buddyBTreeFields, durable, isPointMBR, metadataPageManagerFactory, ncServiceCtx.getTracer());
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeLocalResource.java
index f6396cf..634504b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeLocalResource.java
@@ -83,13 +83,14 @@
IIOManager ioManager = ncServiceCtx.getIoManager();
FileReference fileRef = ioManager.resolve(path);
List<IVirtualBufferCache> virtualBufferCaches = vbcProvider.getVirtualBufferCaches(ncServiceCtx, fileRef);
- ioOpCallbackFactory.initialize(ncServiceCtx);
+ ioOpCallbackFactory.initialize(ncServiceCtx, this);
return LSMRTreeUtils.createLSMTree(ioManager, virtualBufferCaches, fileRef,
storageManager.getBufferCache(ncServiceCtx), typeTraits, cmpFactories, btreeCmpFactories,
valueProviderFactories, rtreePolicyType, bloomFilterFalsePositiveRate,
mergePolicyFactory.createMergePolicy(mergePolicyProperties, ncServiceCtx),
- opTrackerProvider.getOperationTracker(ncServiceCtx), ioSchedulerProvider.getIoScheduler(ncServiceCtx),
- ioOpCallbackFactory, linearizeCmpFactory, rtreeFields, buddyBTreeFields, filterTypeTraits,
- filterCmpFactories, filterFields, durable, isPointMBR, metadataPageManagerFactory);
+ opTrackerProvider.getOperationTracker(ncServiceCtx, this),
+ ioSchedulerProvider.getIoScheduler(ncServiceCtx), ioOpCallbackFactory, linearizeCmpFactory, rtreeFields,
+ buddyBTreeFields, filterTypeTraits, filterCmpFactories, filterFields, durable, isPointMBR,
+ metadataPageManagerFactory);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterLocalResource.java
index f586d51..f91a5f7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/dataflow/LSMRTreeWithAntiMatterLocalResource.java
@@ -78,12 +78,12 @@
IIOManager ioManager = serviceCtx.getIoManager();
FileReference file = ioManager.resolve(path);
List<IVirtualBufferCache> virtualBufferCaches = vbcProvider.getVirtualBufferCaches(serviceCtx, file);
- ioOpCallbackFactory.initialize(serviceCtx);
+ ioOpCallbackFactory.initialize(serviceCtx, this);
return LSMRTreeUtils.createLSMTreeWithAntiMatterTuples(ioManager, virtualBufferCaches, file,
storageManager.getBufferCache(serviceCtx), typeTraits, cmpFactories, btreeComparatorFactories,
valueProviderFactories, rtreePolicyType,
mergePolicyFactory.createMergePolicy(mergePolicyProperties, serviceCtx),
- opTrackerProvider.getOperationTracker(serviceCtx), ioSchedulerProvider.getIoScheduler(serviceCtx),
+ opTrackerProvider.getOperationTracker(serviceCtx, this), ioSchedulerProvider.getIoScheduler(serviceCtx),
ioOpCallbackFactory, linearizeCmpFactory, rtreeFields, filterTypeTraits, filterCmpFactories,
filterFields, durable, isPointMBR, metadataPageManagerFactory);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
index aac8905..ee37043 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
@@ -127,7 +127,7 @@
ILinearizeComparatorFactory linearizer, int[] comparatorFields, IBinaryComparatorFactory[] linearizerArray,
double bloomFilterFalsePositiveRate, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker,
ILSMIOOperationScheduler ioScheduler, ILSMIOOperationCallbackFactory ioOpCallbackFactory, boolean durable,
- boolean isPointMBR, ITracer tracer) {
+ boolean isPointMBR, ITracer tracer) throws HyracksDataException {
super(ioManager, diskBufferCache, fileManager, bloomFilterFalsePositiveRate, mergePolicy, opTracker,
ioScheduler, ioOpCallbackFactory, componentFactory, componentFactory, durable, tracer);
this.rtreeInteriorFrameFactory = rtreeInteriorFrameFactory;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java
index e23a83e..f29bffc 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java
@@ -87,7 +87,7 @@
int[] comparatorFields, IBinaryComparatorFactory[] linearizerArray, ILSMMergePolicy mergePolicy,
ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler,
ILSMIOOperationCallbackFactory ioOpCallbackFactory, int[] buddyBTreeFields, boolean durable,
- boolean isPointMBR, ITracer tracer) {
+ boolean isPointMBR, ITracer tracer) throws HyracksDataException {
super(ioManager, rtreeInteriorFrameFactory, rtreeLeafFrameFactory, btreeInteriorFrameFactory,
btreeLeafFrameFactory, diskBufferCache, fileNameManager, componentFactory, bloomFilterFalsePositiveRate,
rtreeCmpFactories, btreeCmpFactories, linearizer, comparatorFields, linearizerArray, mergePolicy,
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
index 123b38d..7172b74 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
@@ -100,7 +100,7 @@
int[] comparatorFields, IBinaryComparatorFactory[] linearizerArray, ILSMMergePolicy mergePolicy,
ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler,
ILSMIOOperationCallbackFactory ioOpCallbackFactory, int[] buddyBTreeFields, boolean durable,
- boolean isPointMBR, ITracer tracer) {
+ boolean isPointMBR, ITracer tracer) throws HyracksDataException {
super(ioManager, rtreeInteriorFrameFactory, rtreeLeafFrameFactory, btreeInteriorFrameFactory,
btreeLeafFrameFactory, diskBufferCache, fileNameManager, componentFactory, rtreeCmpFactories,
btreeCmpFactories, linearizer, comparatorFields, linearizerArray, bloomFilterFalsePositiveRate,
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java
index 1af6779..9f17efa 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java
@@ -49,7 +49,7 @@
harness.getFileReference(), harness.getDiskBufferCache(), SerdeUtils.serdesToTypeTraits(keySerdes),
SerdeUtils.serdesToComparatorFactories(keySerdes, keySerdes.length), bloomFilterKeyFields,
harness.getBoomFilterFalsePositiveRate(), harness.getMergePolicy(),
- NoOpOperationTrackerFactory.INSTANCE.getOperationTracker(null), harness.getIOScheduler(),
+ NoOpOperationTrackerFactory.INSTANCE.getOperationTracker(null, null), harness.getIOScheduler(),
harness.getIOOperationCallbackFactory(), true, null, null, null, null, true,
harness.getMetadataPageManagerFactory(), false, ITracer.NONE);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java
index 0904806..77d52bb 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java
@@ -59,7 +59,7 @@
harness.getFileReference(), harness.getDiskBufferCache(), SerdeUtils.serdesToTypeTraits(keySerdes),
SerdeUtils.serdesToComparatorFactories(keySerdes, keySerdes.length), bloomFilterKeyFields,
harness.getBoomFilterFalsePositiveRate(), harness.getMergePolicy(),
- NoOpOperationTrackerFactory.INSTANCE.getOperationTracker(null), harness.getIOScheduler(),
+ NoOpOperationTrackerFactory.INSTANCE.getOperationTracker(null, null), harness.getIOScheduler(),
harness.getIOOperationCallbackFactory(), true, null, null, null, null, true,
harness.getMetadataPageManagerFactory(), false, ITracer.NONE);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java
index ccbaa9c..d42c3b6 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java
@@ -71,7 +71,7 @@
harness.getFileReference(), harness.getDiskBufferCache(), SerdeUtils.serdesToTypeTraits(keySerdes),
SerdeUtils.serdesToComparatorFactories(keySerdes, keySerdes.length), bloomFilterKeyFields,
harness.getBoomFilterFalsePositiveRate(), harness.getMergePolicy(),
- NoOpOperationTrackerFactory.INSTANCE.getOperationTracker(null), harness.getIOScheduler(),
+ NoOpOperationTrackerFactory.INSTANCE.getOperationTracker(null, null), harness.getIOScheduler(),
harness.getIOOperationCallbackFactory(), true, null, null, null, null, true,
harness.getMetadataPageManagerFactory(), true, ITracer.NONE);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java
index 9b53120..3ce24b0 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java
@@ -67,11 +67,11 @@
vbcs.add(i, new TestVirtualBufferCache(vbc));
}
}
- ioOpCallbackFactory.initialize(serviceCtx);
+ ioOpCallbackFactory.initialize(serviceCtx, this);
return TestLsmBtreeUtil.createLSMTree(ioManager, vbcs, file, storageManager.getBufferCache(serviceCtx),
typeTraits, cmpFactories, bloomFilterKeyFields, bloomFilterFalsePositiveRate,
mergePolicyFactory.createMergePolicy(mergePolicyProperties, serviceCtx),
- opTrackerProvider.getOperationTracker(serviceCtx), ioSchedulerProvider.getIoScheduler(serviceCtx),
+ opTrackerProvider.getOperationTracker(serviceCtx, this), ioSchedulerProvider.getIoScheduler(serviceCtx),
ioOpCallbackFactory, isPrimary, filterTypeTraits, filterCmpFactories, btreeFields, filterFields,
durable, metadataPageManagerFactory, false, serviceCtx.getTracer());
}