merged from master to master_abort_recovery_fix2
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java
index 6dd11bd..32bfa58 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java
@@ -72,7 +72,7 @@
@Override
public void open() throws HyracksDataException {
try {
- transactionContext = transactionManager.getTransactionContext(jobId);
+ transactionContext = transactionManager.getTransactionContext(jobId, false);
transactionContext.setWriteTxn(isWriteTransaction);
} catch (ACIDException e) {
throw new HyracksDataException(e);
@@ -89,7 +89,11 @@
pkHash = computePrimaryKeyHashValue(frameTupleReference, primaryKeyFields);
logRecord.formEntityCommitLogRecord(transactionContext, datasetId, pkHash, frameTupleReference,
primaryKeyFields);
- logMgr.log(logRecord);
+ try {
+ logMgr.log(logRecord);
+ } catch (ACIDException e) {
+ throw new HyracksDataException(e);
+ }
}
}
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/InvertedIndexPOperator.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/InvertedIndexPOperator.java
index 56fc0e7..af503a6 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/InvertedIndexPOperator.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/InvertedIndexPOperator.java
@@ -220,17 +220,15 @@
dataflowHelperFactory = new LSMInvertedIndexDataflowHelperFactory(
new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, new SecondaryIndexOperationTrackerProvider(
- LSMInvertedIndexIOOperationCallbackFactory.INSTANCE, dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
storageProperties.getBloomFilterFalsePositiveRate());
} else {
dataflowHelperFactory = new PartitionedLSMInvertedIndexDataflowHelperFactory(
new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, new SecondaryIndexOperationTrackerProvider(
- LSMInvertedIndexIOOperationCallbackFactory.INSTANCE, dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
storageProperties.getBloomFilterFalsePositiveRate());
}
LSMInvertedIndexSearchOperatorDescriptor invIndexSearchOp = new LSMInvertedIndexSearchOperatorDescriptor(
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java
index ee9dfae..b528381 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java
@@ -18,10 +18,8 @@
import edu.uci.ics.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
import edu.uci.ics.asterix.common.transactions.ITransactionSubsystem;
-import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
import edu.uci.ics.hyracks.api.io.IIOManager;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
@@ -95,26 +93,6 @@
}
@Override
- public ILSMIOOperationCallbackProvider getNoOpIOOperationCallbackProvider() {
- return AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
- }
-
- @Override
- public ILSMIOOperationCallbackProvider getLSMBTreeIOOperationCallbackProvider(boolean isPrimary) {
- return AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
- }
-
- @Override
- public ILSMIOOperationCallbackProvider getLSMRTreeIOOperationCallbackProvider() {
- return AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
- }
-
- @Override
- public ILSMIOOperationCallbackProvider getLSMInvertedIndexIOOperationCallbackProvider() {
- return AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
- }
-
- @Override
public ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID) {
return asterixAppRuntimeContext.getLSMBTreeOperationTracker(datasetID);
}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
index 3e41a77..ccc5e85 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
@@ -30,6 +30,7 @@
import edu.uci.ics.asterix.common.context.AsterixVirtualBufferCacheProvider;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
import edu.uci.ics.asterix.formats.base.IDataFormat;
import edu.uci.ics.asterix.metadata.MetadataManager;
import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
@@ -127,8 +128,7 @@
splitsAndConstraint.first, new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
storageProperties.getBloomFilterFalsePositiveRate()));
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(specPrimary, primaryBtreeDrop,
splitsAndConstraint.second);
@@ -180,7 +180,7 @@
new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, new PrimaryIndexOperationTrackerProvider(
dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, storageProperties
+ LSMBTreeIOOperationCallbackFactory.INSTANCE, storageProperties
.getBloomFilterFalsePositiveRate()), localResourceFactoryProvider,
NoOpOperationCallbackFactory.INSTANCE);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexCreateOp,
@@ -267,7 +267,7 @@
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ LSMBTreeIOOperationCallbackFactory.INSTANCE,
storageProperties.getBloomFilterFalsePositiveRate()), NoOpOperationCallbackFactory.INSTANCE);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, btreeBulkLoad,
splitsAndConstraint.second);
@@ -293,7 +293,7 @@
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ LSMBTreeIOOperationCallbackFactory.INSTANCE,
storageProperties.getBloomFilterFalsePositiveRate()), NoOpOperationCallbackFactory.INSTANCE);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, btreeBulkLoad,
splitsAndConstraint.second);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
index af56894..bf31db7 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
@@ -71,9 +71,8 @@
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
splitsAndConstraint.first, new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- new SecondaryIndexOperationTrackerProvider(LSMBTreeIOOperationCallbackFactory.INSTANCE, dataset
- .getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
storageProperties.getBloomFilterFalsePositiveRate()));
AlgebricksPartitionConstraintHelper
.setPartitionConstraintInJobSpec(spec, btreeDrop, splitsAndConstraint.second);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
index e3832d4..3ae5f6d 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
@@ -66,9 +66,8 @@
secondaryFileSplitProvider, secondaryRecDesc.getTypeTraits(), secondaryComparatorFactories,
secondaryBloomFilterKeyFields, new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- new SecondaryIndexOperationTrackerProvider(LSMBTreeIOOperationCallbackFactory.INSTANCE, dataset
- .getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
storageProperties.getBloomFilterFalsePositiveRate()), localResourceFactoryProvider,
NoOpOperationCallbackFactory.INSTANCE);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryIndexCreateOp,
@@ -107,9 +106,8 @@
numSecondaryKeys,
new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, new SecondaryIndexOperationTrackerProvider(
- LSMBTreeIOOperationCallbackFactory.INSTANCE, dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, storageProperties
+ dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ LSMBTreeIOOperationCallbackFactory.INSTANCE, storageProperties
.getBloomFilterFalsePositiveRate()), BTree.DEFAULT_FILL_FACTOR);
// Connect the operators.
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
index a9b3881..9f80568 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
@@ -26,6 +26,7 @@
import edu.uci.ics.asterix.common.context.ITransactionSubsystemProvider;
import edu.uci.ics.asterix.common.context.TransactionSubsystemProvider;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
import edu.uci.ics.asterix.common.transactions.IRecoveryManager.ResourceType;
import edu.uci.ics.asterix.common.transactions.JobId;
import edu.uci.ics.asterix.formats.nontagged.AqlBinaryBooleanInspectorImpl;
@@ -296,7 +297,7 @@
new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, new PrimaryIndexOperationTrackerProvider(
dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ LSMBTreeIOOperationCallbackFactory.INSTANCE,
storageProperties.getBloomFilterFalsePositiveRate()), false, searchCallbackFactory);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, primarySearchOp,
primaryPartitionConstraint);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java
index 40e0aa9..72d0d70 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java
@@ -283,16 +283,16 @@
if (!isPartitioned) {
return new LSMInvertedIndexDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- new SecondaryIndexOperationTrackerProvider(LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
- dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
storageProperties.getBloomFilterFalsePositiveRate());
} else {
return new PartitionedLSMInvertedIndexDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- new SecondaryIndexOperationTrackerProvider(LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
- dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
storageProperties.getBloomFilterFalsePositiveRate());
}
}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
index ec62068..4d8118d 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
@@ -91,10 +91,9 @@
new LSMRTreeDataflowHelperFactory(valueProviderFactories, RTreePolicyType.RTREE,
primaryComparatorFactories, new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, new SecondaryIndexOperationTrackerProvider(
- LSMRTreeIOOperationCallbackFactory.INSTANCE, dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AqlMetadataProvider.proposeLinearizer(
- keyType, secondaryComparatorFactories.length), storageProperties
+ dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ LSMRTreeIOOperationCallbackFactory.INSTANCE, AqlMetadataProvider.proposeLinearizer(keyType,
+ secondaryComparatorFactories.length), storageProperties
.getBloomFilterFalsePositiveRate()), localResourceFactoryProvider,
NoOpOperationCallbackFactory.INSTANCE);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryIndexCreateOp,
@@ -171,17 +170,14 @@
AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
// Create secondary RTree bulk load op.
- TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = createTreeIndexBulkLoadOp(
- spec,
- numNestedSecondaryKeyFields,
- new LSMRTreeDataflowHelperFactory(valueProviderFactories, RTreePolicyType.RTREE,
- primaryComparatorFactories, new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, new SecondaryIndexOperationTrackerProvider(
- LSMRTreeIOOperationCallbackFactory.INSTANCE, dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AqlMetadataProvider.proposeLinearizer(
- keyType, secondaryComparatorFactories.length), storageProperties
- .getBloomFilterFalsePositiveRate()), BTree.DEFAULT_FILL_FACTOR);
+ TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = createTreeIndexBulkLoadOp(spec,
+ numNestedSecondaryKeyFields, new LSMRTreeDataflowHelperFactory(valueProviderFactories,
+ RTreePolicyType.RTREE, primaryComparatorFactories, new AsterixVirtualBufferCacheProvider(
+ dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMRTreeIOOperationCallbackFactory.INSTANCE,
+ AqlMetadataProvider.proposeLinearizer(keyType, secondaryComparatorFactories.length),
+ storageProperties.getBloomFilterFalsePositiveRate()), BTree.DEFAULT_FILL_FACTOR);
// Connect the operators.
spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/BaseOperationTracker.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/BaseOperationTracker.java
index 3610478..7e7ffd9 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/BaseOperationTracker.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/BaseOperationTracker.java
@@ -17,52 +17,18 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMOperationType;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
public class BaseOperationTracker implements ILSMOperationTracker {
protected final DatasetLifecycleManager datasetLifecycleManager;
- protected final ILSMIOOperationCallback ioOpCallback;
- protected long lastLSN;
- protected long firstLSN;
protected final int datasetID;
- public BaseOperationTracker(DatasetLifecycleManager datasetLifecycleManager,
- ILSMIOOperationCallbackFactory ioOpCallbackFactory, int datasetID) {
+ public BaseOperationTracker(DatasetLifecycleManager datasetLifecycleManager, int datasetID) {
this.datasetLifecycleManager = datasetLifecycleManager;
- this.ioOpCallback = ioOpCallbackFactory == null ? NoOpIOOperationCallback.INSTANCE : ioOpCallbackFactory
- .createIOOperationCallback(this);
this.datasetID = datasetID;
- resetLSNs();
- }
-
- public ILSMIOOperationCallback getIOOperationCallback() {
- return ioOpCallback;
- }
-
- public long getLastLSN() {
- return lastLSN;
- }
-
- public long getFirstLSN() {
- return firstLSN;
- }
-
- public void updateLastLSN(long lastLSN) {
- if (firstLSN == -1) {
- firstLSN = lastLSN;
- }
- this.lastLSN = Math.max(this.lastLSN, lastLSN);
- }
-
- public void resetLSNs() {
- lastLSN = -1;
- firstLSN = -1;
}
@Override
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/ConstantMergePolicy.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/ConstantMergePolicy.java
index cf69bfe..dee652e 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/ConstantMergePolicy.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/ConstantMergePolicy.java
@@ -43,7 +43,7 @@
if (!ctx.isShuttingdown() && immutableComponents.size() >= threshold) {
ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
- accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE);
+ accessor.scheduleMerge(index.getIOOperationCallback());
}
}
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
index 617b6ff..6335fb1 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
@@ -26,7 +26,6 @@
import edu.uci.ics.asterix.common.api.ILocalResourceMetadata;
import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
-import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
@@ -253,7 +252,7 @@
if (iInfo.isOpen) {
ILSMIndexAccessor accessor = (ILSMIndexAccessor) iInfo.index.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
- accessor.scheduleFlush(((BaseOperationTracker) iInfo.index.getOperationTracker()).getIOOperationCallback());
+ accessor.scheduleFlush(iInfo.index.getIOOperationCallback());
}
// Wait for the above flush op.
while (dsInfo.numActiveIOOps > 0) {
@@ -316,8 +315,7 @@
synchronized (datasetOpTrackers) {
ILSMOperationTracker opTracker = datasetOpTrackers.get(datasetID);
if (opTracker == null) {
- opTracker = new PrimaryIndexOperationTracker(this, datasetID,
- LSMBTreeIOOperationCallbackFactory.INSTANCE);
+ opTracker = new PrimaryIndexOperationTracker(this, datasetID);
datasetOpTrackers.put(datasetID, opTracker);
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
index 2ed4b0ec..ef58937 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -18,11 +18,11 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
+import edu.uci.ics.asterix.common.transactions.AbstractOperationCallback;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexInternal;
@@ -33,9 +33,8 @@
// Number of active operations on an ILSMIndex instance.
private final AtomicInteger numActiveOperations;
- public PrimaryIndexOperationTracker(DatasetLifecycleManager datasetLifecycleManager, int datasetID,
- ILSMIOOperationCallbackFactory ioOpCallbackFactory) {
- super(datasetLifecycleManager, ioOpCallbackFactory, datasetID);
+ public PrimaryIndexOperationTracker(DatasetLifecycleManager datasetLifecycleManager, int datasetID) {
+ super(datasetLifecycleManager, datasetID);
this.numActiveOperations = new AtomicInteger();
}
@@ -43,7 +42,7 @@
public void beforeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback) throws HyracksDataException {
if (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION) {
- numActiveOperations.incrementAndGet();
+ incrementNumActiveOperations(modificationCallback);
} else if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE) {
datasetLifecycleManager.declareActiveIOOperation(datasetID);
}
@@ -62,14 +61,11 @@
public void completeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback) throws HyracksDataException {
if (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION) {
- numActiveOperations.decrementAndGet();
+ decrementNumActiveOperations(modificationCallback);
+ flushIfFull();
} else if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE) {
datasetLifecycleManager.undeclareActiveIOOperation(datasetID);
}
-
- if (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION) {
- flushIfFull();
- }
}
private void flushIfFull() throws HyracksDataException {
@@ -88,8 +84,7 @@
for (ILSMIndex lsmIndex : indexes) {
ILSMIndexAccessor accessor = (ILSMIndexAccessor) lsmIndex.createAccessor(
NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
- accessor.scheduleFlush(((BaseOperationTracker) lsmIndex.getOperationTracker())
- .getIOOperationCallback());
+ accessor.scheduleFlush(lsmIndex.getIOOperationCallback());
}
}
}
@@ -103,4 +98,27 @@
public int getNumActiveOperations() {
return numActiveOperations.get();
}
+
+ private void incrementNumActiveOperations(IModificationOperationCallback modificationCallback) {
+ //modificationCallback can be NoOpOperationCallback when redo/undo operations are executed.
+ if (modificationCallback != NoOpOperationCallback.INSTANCE) {
+ numActiveOperations.incrementAndGet();
+ ((AbstractOperationCallback) modificationCallback).incrementLocalNumActiveOperations();
+ }
+ }
+
+ private void decrementNumActiveOperations(IModificationOperationCallback modificationCallback) {
+ //modificationCallback can be NoOpOperationCallback when redo/undo operations are executed.
+ if (modificationCallback != NoOpOperationCallback.INSTANCE) {
+ numActiveOperations.decrementAndGet();
+ ((AbstractOperationCallback) modificationCallback).decrementLocalNumActiveOperations();
+ }
+ }
+
+ public void cleanupNumActiveOperationsForAbortedJob(AbstractOperationCallback callback) {
+ int delta = callback.getLocalNumActiveOperations() * -1;
+ numActiveOperations.getAndAdd(delta);
+ callback.resetLocalNumActiveOperations();
+ }
+
}
\ No newline at end of file
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
index da08cd8..684068b 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
@@ -17,34 +17,51 @@
import java.util.List;
-import edu.uci.ics.asterix.common.context.BaseOperationTracker;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrame;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMOperationType;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
import edu.uci.ics.hyracks.storage.common.file.BufferedFileHandle;
public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationCallback {
- protected final BaseOperationTracker opTracker;
+ protected long firstLSN;
+ protected long lastLSN;
+ protected long[] immutableLastLSNs;
+ protected int readIndex;
+ protected int writeIndex;
- public AbstractLSMIOOperationCallback(BaseOperationTracker opTracker) {
- this.opTracker = opTracker;
+ public AbstractLSMIOOperationCallback() {
+ resetLSNs();
}
@Override
- public void beforeOperation() {
+ public void setNumOfMutableComponents(int count) {
+ immutableLastLSNs = new long[count];
+ readIndex = 0;
+ writeIndex = 0;
+ }
+
+ @Override
+ public void beforeOperation(LSMOperationType opType) {
+ if (opType == LSMOperationType.FLUSH) {
+ synchronized (this) {
+ immutableLastLSNs[writeIndex] = lastLSN;
+ writeIndex = (writeIndex + 1) % immutableLastLSNs.length;
+ resetLSNs();
+ }
+ }
+ }
+
+ @Override
+ public void afterFinalize(LSMOperationType opType, ILSMComponent newComponent) {
// Do nothing.
}
- @Override
- public void afterFinalize(ILSMComponent newComponent) {
- opTracker.resetLSNs();
- }
-
public abstract long getComponentLSN(List<ILSMComponent> oldComponents) throws HyracksDataException;
protected void putLSNIntoMetadata(ITreeIndex treeIndex, List<ILSMComponent> oldComponents)
@@ -80,4 +97,25 @@
bufferCache.unpin(metadataPage);
}
}
+
+ protected void resetLSNs() {
+ firstLSN = -1;
+ lastLSN = -1;
+ }
+
+ public void updateLastLSN(long lastLSN) {
+ if (firstLSN == -1) {
+ firstLSN = lastLSN;
+ }
+ this.lastLSN = Math.max(this.lastLSN, lastLSN);
+ }
+
+ public long getFirstLSN() {
+ return firstLSN;
+ }
+
+ public long getLastLSN() {
+ return lastLSN;
+ }
+
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
index b6025cb..8e9b44e 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
@@ -17,22 +17,22 @@
import java.util.List;
-import edu.uci.ics.asterix.common.context.BaseOperationTracker;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
import edu.uci.ics.hyracks.storage.am.lsm.btree.impls.LSMBTreeDiskComponent;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMOperationType;
public class LSMBTreeIOOperationCallback extends AbstractLSMIOOperationCallback {
- public LSMBTreeIOOperationCallback(BaseOperationTracker opTracker) {
- super(opTracker);
+ public LSMBTreeIOOperationCallback() {
+ super();
}
@Override
- public void afterOperation(List<ILSMComponent> oldComponents, ILSMComponent newComponent)
+ public void afterOperation(LSMOperationType opType, List<ILSMComponent> oldComponents, ILSMComponent newComponent)
throws HyracksDataException {
- if (oldComponents != null && newComponent != null) {
+ if (newComponent != null) {
LSMBTreeDiskComponent btreeComponent = (LSMBTreeDiskComponent) newComponent;
putLSNIntoMetadata(btreeComponent.getBTree(), oldComponents);
}
@@ -42,7 +42,11 @@
public long getComponentLSN(List<ILSMComponent> diskComponents) throws HyracksDataException {
if (diskComponents == null) {
// Implies a flush IO operation.
- return opTracker.getLastLSN();
+ synchronized (this) {
+ long lsn = immutableLastLSNs[readIndex];
+ readIndex = (readIndex + 1) % immutableLastLSNs.length;
+ return lsn;
+ }
}
// Get max LSN from the diskComponents. Implies a merge IO operation or Recovery operation.
long maxLSN = -1;
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
index 92ba9ec..1028015 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
@@ -15,7 +15,6 @@
package edu.uci.ics.asterix.common.ioopcallbacks;
-import edu.uci.ics.asterix.common.context.BaseOperationTracker;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
@@ -29,7 +28,7 @@
}
@Override
- public ILSMIOOperationCallback createIOOperationCallback(Object syncObj) {
- return new LSMBTreeIOOperationCallback((BaseOperationTracker) syncObj);
+ public ILSMIOOperationCallback createIOOperationCallback() {
+ return new LSMBTreeIOOperationCallback();
}
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
index 4f99ae6..5532f97 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
@@ -17,21 +17,21 @@
import java.util.List;
-import edu.uci.ics.asterix.common.context.BaseOperationTracker;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMOperationType;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndexDiskComponent;
public class LSMInvertedIndexIOOperationCallback extends AbstractLSMIOOperationCallback {
- public LSMInvertedIndexIOOperationCallback(BaseOperationTracker opTracker) {
- super(opTracker);
+ public LSMInvertedIndexIOOperationCallback() {
+ super();
}
@Override
- public void afterOperation(List<ILSMComponent> oldComponents, ILSMComponent newComponent)
+ public void afterOperation(LSMOperationType opType, List<ILSMComponent> oldComponents, ILSMComponent newComponent)
throws HyracksDataException {
- if (oldComponents != null && newComponent != null) {
+ if (newComponent != null) {
LSMInvertedIndexDiskComponent invIndexComponent = (LSMInvertedIndexDiskComponent) newComponent;
putLSNIntoMetadata(invIndexComponent.getDeletedKeysBTree(), oldComponents);
}
@@ -41,7 +41,11 @@
public long getComponentLSN(List<ILSMComponent> diskComponents) throws HyracksDataException {
if (diskComponents == null) {
// Implies a flush IO operation.
- return opTracker.getLastLSN();
+ synchronized (this) {
+ long lsn = immutableLastLSNs[readIndex];
+ readIndex = (readIndex + 1) % immutableLastLSNs.length;
+ return lsn;
+ }
}
// Get max LSN from the diskComponents. Implies a merge IO operation or Recovery operation.
long maxLSN = -1;
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
index c20cdb3..5dc0c0b 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
@@ -15,7 +15,6 @@
package edu.uci.ics.asterix.common.ioopcallbacks;
-import edu.uci.ics.asterix.common.context.BaseOperationTracker;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
@@ -29,7 +28,7 @@
}
@Override
- public ILSMIOOperationCallback createIOOperationCallback(Object syncObj) {
- return new LSMInvertedIndexIOOperationCallback((BaseOperationTracker) syncObj);
+ public ILSMIOOperationCallback createIOOperationCallback() {
+ return new LSMInvertedIndexIOOperationCallback();
}
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
index cd7b7a0..1497e17 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
@@ -17,21 +17,21 @@
import java.util.List;
-import edu.uci.ics.asterix.common.context.BaseOperationTracker;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMOperationType;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTreeDiskComponent;
public class LSMRTreeIOOperationCallback extends AbstractLSMIOOperationCallback {
- public LSMRTreeIOOperationCallback(BaseOperationTracker opTracker) {
- super(opTracker);
+ public LSMRTreeIOOperationCallback() {
+ super();
}
@Override
- public void afterOperation(List<ILSMComponent> oldComponents, ILSMComponent newComponent)
+ public void afterOperation(LSMOperationType opType, List<ILSMComponent> oldComponents, ILSMComponent newComponent)
throws HyracksDataException {
- if (oldComponents != null && newComponent != null) {
+ if (newComponent != null) {
LSMRTreeDiskComponent rtreeComponent = (LSMRTreeDiskComponent) newComponent;
putLSNIntoMetadata(rtreeComponent.getRTree(), oldComponents);
putLSNIntoMetadata(rtreeComponent.getBTree(), oldComponents);
@@ -42,7 +42,11 @@
public long getComponentLSN(List<ILSMComponent> diskComponents) throws HyracksDataException {
if (diskComponents == null) {
// Implies a flush IO operation.
- return opTracker.getLastLSN();
+ synchronized (this) {
+ long lsn = immutableLastLSNs[readIndex];
+ readIndex = (readIndex + 1) % immutableLastLSNs.length;
+ return lsn;
+ }
}
// Get max LSN from the diskComponents. Implies a merge IO operation or Recovery operation.
long maxLSN = -1;
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
index 0cd2539..841a1d5 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
@@ -15,7 +15,6 @@
package edu.uci.ics.asterix.common.ioopcallbacks;
-import edu.uci.ics.asterix.common.context.BaseOperationTracker;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
@@ -29,7 +28,7 @@
}
@Override
- public ILSMIOOperationCallback createIOOperationCallback(Object syncObj) {
- return new LSMRTreeIOOperationCallback((BaseOperationTracker) syncObj);
+ public ILSMIOOperationCallback createIOOperationCallback() {
+ return new LSMRTreeIOOperationCallback();
}
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/AbstractOperationCallback.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/AbstractOperationCallback.java
index d4b26f7..c549e7d 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/AbstractOperationCallback.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/AbstractOperationCallback.java
@@ -15,6 +15,8 @@
package edu.uci.ics.asterix.common.transactions;
+import java.util.concurrent.atomic.AtomicInteger;
+
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.MurmurHash128Bit;
@@ -27,6 +29,7 @@
protected final ITransactionContext txnCtx;
protected final ILockManager lockManager;
protected final long[] longHashes;
+ protected final AtomicInteger transactorLocalNumActiveOperations;
public AbstractOperationCallback(int datasetId, int[] primaryKeyFields, ITransactionContext txnCtx,
ILockManager lockManager) {
@@ -34,6 +37,7 @@
this.primaryKeyFields = primaryKeyFields;
this.txnCtx = txnCtx;
this.lockManager = lockManager;
+ this.transactorLocalNumActiveOperations = new AtomicInteger(0);
this.longHashes = new long[2];
}
@@ -41,4 +45,21 @@
MurmurHash128Bit.hash3_x64_128(tuple, primaryKeyFields, SEED, longHashes);
return Math.abs((int) longHashes[0]);
}
+
+ public void resetLocalNumActiveOperations() {
+ transactorLocalNumActiveOperations.set(0);
+ }
+
+ public int getLocalNumActiveOperations() {
+ return transactorLocalNumActiveOperations.get();
+ }
+
+ public void incrementLocalNumActiveOperations() {
+ transactorLocalNumActiveOperations.incrementAndGet();
+ }
+
+ public void decrementLocalNumActiveOperations() {
+ transactorLocalNumActiveOperations.decrementAndGet();
+ }
+
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
index 2f522b9..2638c9f 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
@@ -18,7 +18,6 @@
import edu.uci.ics.hyracks.api.io.IIOManager;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
@@ -44,14 +43,6 @@
public ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID);
- public ILSMIOOperationCallbackProvider getLSMBTreeIOOperationCallbackProvider(boolean isPrimary);
-
- public ILSMIOOperationCallbackProvider getLSMRTreeIOOperationCallbackProvider();
-
- public ILSMIOOperationCallbackProvider getLSMInvertedIndexIOOperationCallbackProvider();
-
- public ILSMIOOperationCallbackProvider getNoOpIOOperationCallbackProvider();
-
public ILSMIOOperationScheduler getLSMIOScheduler();
public ILocalResourceRepository getLocalResourceRepository();
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogManager.java
index 8913f8a..27a91a4 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogManager.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogManager.java
@@ -14,9 +14,11 @@
*/
package edu.uci.ics.asterix.common.transactions;
+import edu.uci.ics.asterix.common.exceptions.ACIDException;
+
public interface ILogManager {
- public void log(ILogRecord logRecord);
+ public void log(ILogRecord logRecord) throws ACIDException;
public ILogReader getLogReader(boolean isRecoveryMode);
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogRecord.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogRecord.java
index d13ef6c..3068867 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogRecord.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogRecord.java
@@ -20,15 +20,15 @@
public interface ILogRecord {
- public static final int JOB_COMMIT_LOG_SIZE = 13;
- public static final int ENTITY_COMMIT_LOG_BASE_SIZE = 29;
- public static final int UPDATE_LOG_BASE_SIZE = 64;
+ public static final int JOB_TERMINATE_LOG_SIZE = 13; //JOB_COMMIT or ABORT log type
+ public static final int ENTITY_COMMIT_LOG_BASE_SIZE = 25;
+ public static final int UPDATE_LOG_BASE_SIZE = 60;
public boolean readLogRecord(ByteBuffer buffer);
public void writeLogRecord(ByteBuffer buffer);
- public void formJobCommitLogRecord(ITransactionContext txnCtx);
+ public void formJobTerminateLogRecord(ITransactionContext txnCtx, boolean isCommit);
public void formEntityCommitLogRecord(ITransactionContext txnCtx, int datasetId, int PKHashValue,
ITupleReference tupleReference, int[] primaryKeyFields);
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionManager.java
index 77e960b..ffd4cc2 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionManager.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionManager.java
@@ -52,10 +52,11 @@
*
* @param jobId
* a unique value for the transaction id.
+ * @param createIfNotExist TODO
* @return
* @throws ACIDException
*/
- public ITransactionContext getTransactionContext(JobId jobId) throws ACIDException;
+ public ITransactionContext getTransactionContext(JobId jobId, boolean createIfNotExist) throws ACIDException;
/**
* Commits a transaction.
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
index 8765aae..ee0791b 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
@@ -115,14 +115,15 @@
@Override
public void commitTransaction(JobId jobId) throws RemoteException, ACIDException {
- ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
+ ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false);
transactionSubsystem.getTransactionManager().commitTransaction(txnCtx, new DatasetId(-1), -1);
}
@Override
public void abortTransaction(JobId jobId) throws RemoteException, ACIDException {
try {
- ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
+ ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId,
+ false);
transactionSubsystem.getTransactionManager().abortTransaction(txnCtx, new DatasetId(-1), -1);
} catch (ACIDException e) {
e.printStackTrace();
@@ -132,13 +133,13 @@
@Override
public void lock(JobId jobId, byte lockMode) throws ACIDException, RemoteException {
- ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
+ ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false);
transactionSubsystem.getLockManager().lock(METADATA_DATASET_ID, -1, lockMode, txnCtx);
}
@Override
public void unlock(JobId jobId) throws ACIDException, RemoteException {
- ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
+ ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false);
transactionSubsystem.getLockManager().unlock(METADATA_DATASET_ID, -1, txnCtx);
}
@@ -273,7 +274,7 @@
ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE);
- ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
+ ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false);
txnCtx.setWriteTxn(true);
txnCtx.registerIndexAndCallback(resourceID, lsmIndex, (AbstractOperationCallback) modCallback,
metadataIndex.isPrimaryIndex());
@@ -286,7 +287,7 @@
private IModificationOperationCallback createIndexModificationCallback(JobId jobId, long resourceId,
IMetadataIndex metadataIndex, ILSMIndex lsmIndex, IndexOperation indexOp) throws Exception {
- ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
+ ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false);
if (metadataIndex.isPrimaryIndex()) {
return new PrimaryIndexModificationOperationCallback(metadataIndex.getDatasetId().getId(),
@@ -581,7 +582,7 @@
lsmIndex, IndexOperation.DELETE);
ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE);
- ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
+ ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false);
txnCtx.setWriteTxn(true);
txnCtx.registerIndexAndCallback(resourceID, lsmIndex, (AbstractOperationCallback) modCallback,
metadataIndex.isPrimaryIndex());
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
index 4a02bc5..7be4e8e 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -60,7 +60,6 @@
import edu.uci.ics.asterix.runtime.formats.NonTaggedDataFormat;
import edu.uci.ics.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadata;
import edu.uci.ics.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
-import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
import edu.uci.ics.hyracks.api.application.INCApplicationContext;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -333,14 +332,14 @@
int[] bloomFilterKeyFields = index.getBloomFilterKeyFields();
LSMBTree lsmBtree = null;
long resourceID = -1;
- AsterixRuntimeComponentsProvider rtcProvider = AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
ILSMOperationTracker opTracker = index.isPrimaryIndex() ? runtimeContext.getLSMBTreeOperationTracker(index
.getDatasetId().getId()) : new BaseOperationTracker((DatasetLifecycleManager) indexLifecycleManager,
- LSMBTreeIOOperationCallbackFactory.INSTANCE, index.getDatasetId().getId());
+ index.getDatasetId().getId());
if (create) {
lsmBtree = LSMBTreeUtils.createLSMTree(virtualBufferCaches, file, bufferCache, fileMapProvider, typeTraits,
comparatorFactories, bloomFilterKeyFields, runtimeContext.getBloomFilterFalsePositiveRate(),
- runtimeContext.getLSMMergePolicy(), opTracker, runtimeContext.getLSMIOScheduler(), rtcProvider);
+ runtimeContext.getLSMMergePolicy(), opTracker, runtimeContext.getLSMIOScheduler(),
+ LSMBTreeIOOperationCallbackFactory.INSTANCE.createIOOperationCallback());
lsmBtree.create();
resourceID = runtimeContext.getResourceIdFactory().createId();
ILocalResourceMetadata localResourceMetadata = new LSMBTreeLocalResourceMetadata(typeTraits,
@@ -359,7 +358,7 @@
typeTraits, comparatorFactories, bloomFilterKeyFields,
runtimeContext.getBloomFilterFalsePositiveRate(), runtimeContext.getLSMMergePolicy(),
opTracker, runtimeContext.getLSMIOScheduler(),
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER);
+ LSMBTreeIOOperationCallbackFactory.INSTANCE.createIOOperationCallback());
indexLifecycleManager.register(resourceID, lsmBtree);
}
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 8b422c8..4909e21 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -579,11 +579,10 @@
typeTraits, comparatorFactories, bloomFilterKeyFields, lowKeyFields, highKeyFields,
lowKeyInclusive, highKeyInclusive, new LSMBTreeDataflowHelperFactory(
new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), rtcProvider,
- isSecondary ? new SecondaryIndexOperationTrackerProvider(
- LSMBTreeIOOperationCallbackFactory.INSTANCE, dataset.getDatasetId())
+ isSecondary ? new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId())
: new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()), rtcProvider,
- rtcProvider, storageProperties.getBloomFilterFalsePositiveRate()), retainInput,
- searchCallbackFactory);
+ LSMBTreeIOOperationCallbackFactory.INSTANCE,
+ storageProperties.getBloomFilterFalsePositiveRate()), retainInput, searchCallbackFactory);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeSearchOp, spPc.second);
@@ -649,11 +648,11 @@
valueProviderFactories, RTreePolicyType.RTREE, primaryComparatorFactories,
new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- new SecondaryIndexOperationTrackerProvider(LSMRTreeIOOperationCallbackFactory.INSTANCE,
- dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, proposeLinearizer(
- nestedKeyType.getTypeTag(), comparatorFactories.length),
- storageProperties.getBloomFilterFalsePositiveRate()), retainInput, searchCallbackFactory);
+ new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ LSMRTreeIOOperationCallbackFactory.INSTANCE, proposeLinearizer(nestedKeyType.getTypeTag(),
+ comparatorFactories.length), storageProperties.getBloomFilterFalsePositiveRate()),
+ retainInput, searchCallbackFactory);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(rtreeSearchOp, spPc.second);
} catch (MetadataException me) {
@@ -809,7 +808,7 @@
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ LSMBTreeIOOperationCallbackFactory.INSTANCE,
storageProperties.getBloomFilterFalsePositiveRate()), NoOpOperationCallbackFactory.INSTANCE);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad,
splitsAndConstraint.second);
@@ -877,7 +876,7 @@
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, storageProperties
+ LSMBTreeIOOperationCallbackFactory.INSTANCE, storageProperties
.getBloomFilterFalsePositiveRate()), null, modificationCallbackFactory, true);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(insertDeleteOp,
@@ -1071,9 +1070,9 @@
comparatorFactories, bloomFilterKeyFields, fieldPermutation, indexOp,
new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(datasetId),
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- new SecondaryIndexOperationTrackerProvider(LSMBTreeIOOperationCallbackFactory.INSTANCE,
- dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, storageProperties
+ new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ LSMBTreeIOOperationCallbackFactory.INSTANCE, storageProperties
.getBloomFilterFalsePositiveRate()), filterFactory, modificationCallbackFactory,
false);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad,
@@ -1199,10 +1198,9 @@
invListsTypeTraits, invListComparatorFactories, tokenizerFactory, fieldPermutation, indexOp,
new LSMInvertedIndexDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(datasetId),
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- new SecondaryIndexOperationTrackerProvider(
- LSMInvertedIndexIOOperationCallbackFactory.INSTANCE, dataset.getDatasetId()),
+ new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, storageProperties
+ LSMInvertedIndexIOOperationCallbackFactory.INSTANCE, storageProperties
.getBloomFilterFalsePositiveRate()), filterFactory, modificationCallbackFactory);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(insertDeleteOp,
splitsAndConstraint.second);
@@ -1294,12 +1292,11 @@
valueProviderFactories, RTreePolicyType.RTREE, primaryComparatorFactories,
new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- new SecondaryIndexOperationTrackerProvider(LSMRTreeIOOperationCallbackFactory.INSTANCE,
- dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, proposeLinearizer(
- nestedKeyType.getTypeTag(), comparatorFactories.length),
- storageProperties.getBloomFilterFalsePositiveRate()), filterFactory,
- modificationCallbackFactory, false);
+ new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ LSMRTreeIOOperationCallbackFactory.INSTANCE, proposeLinearizer(nestedKeyType.getTypeTag(),
+ comparatorFactories.length), storageProperties.getBloomFilterFalsePositiveRate()),
+ filterFactory, modificationCallbackFactory, false);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(rtreeUpdate, splitsAndConstraint.second);
} catch (MetadataException | IOException e) {
throw new AlgebricksException(e);
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/job/listener/JobEventListenerFactory.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/job/listener/JobEventListenerFactory.java
index b44755c..4f4eba2 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/job/listener/JobEventListenerFactory.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/job/listener/JobEventListenerFactory.java
@@ -49,7 +49,7 @@
try {
ITransactionManager txnManager = ((IAsterixAppRuntimeContext) jobletContext.getApplicationContext()
.getApplicationObject()).getTransactionSubsystem().getTransactionManager();
- ITransactionContext txnContext = txnManager.getTransactionContext(jobId);
+ ITransactionContext txnContext = txnManager.getTransactionContext(jobId, false);
txnContext.setWriteTxn(transactionalWrite);
txnManager.completedTransaction(txnContext, new DatasetId(-1), -1,
!(jobStatus == JobStatus.FAILURE));
@@ -62,7 +62,7 @@
public void jobletStart() {
try {
((IAsterixAppRuntimeContext) jobletContext.getApplicationContext().getApplicationObject())
- .getTransactionSubsystem().getTransactionManager().getTransactionContext(jobId);
+ .getTransactionSubsystem().getTransactionManager().getTransactionContext(jobId, true);
} catch (ACIDException e) {
throw new Error(e);
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
index a6537cd..00f0c4a 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
@@ -41,7 +41,7 @@
throws HyracksDataException {
ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
try {
- ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId);
+ ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false);
return new PrimaryIndexInstantSearchOperationCallback(datasetId, primaryKeyFields,
txnSubsystem.getLockManager(), txnCtx);
} catch (ACIDException e) {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
index abeec62..07daee4 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
@@ -58,7 +58,7 @@
}
try {
- ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId);
+ ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false);
IModificationOperationCallback modCallback = new PrimaryIndexModificationOperationCallback(datasetId,
primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resourceId, resourceType,
indexOp);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
index b59eb75..01cb725 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
@@ -41,7 +41,7 @@
throws HyracksDataException {
ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
try {
- ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId);
+ ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false);
return new PrimaryIndexSearchOperationCallback(datasetId, primaryKeyFields, txnSubsystem.getLockManager(),
txnCtx);
} catch (ACIDException e) {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
index d5bc877..563e9b7 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
@@ -55,7 +55,7 @@
}
try {
- ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId);
+ ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false);
IModificationOperationCallback modCallback = new SecondaryIndexModificationOperationCallback(datasetId, primaryKeyFields, txnCtx,
txnSubsystem.getLockManager(), txnSubsystem, resourceId, resourceType, indexOp);
txnCtx.registerIndexAndCallback(resourceId, index, (AbstractOperationCallback) modCallback, false);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerProvider.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerProvider.java
index aec378b..140a8dd 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerProvider.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerProvider.java
@@ -18,7 +18,6 @@
import edu.uci.ics.asterix.common.context.BaseOperationTracker;
import edu.uci.ics.asterix.common.context.DatasetLifecycleManager;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider;
@@ -27,10 +26,8 @@
private static final long serialVersionUID = 1L;
private final int datasetID;
- private final ILSMIOOperationCallbackFactory ioOpCallbackFactory;
- public SecondaryIndexOperationTrackerProvider(ILSMIOOperationCallbackFactory ioOpCallbackFactory, int datasetID) {
- this.ioOpCallbackFactory = ioOpCallbackFactory;
+ public SecondaryIndexOperationTrackerProvider(int datasetID) {
this.datasetID = datasetID;
}
@@ -38,7 +35,7 @@
public ILSMOperationTracker getOperationTracker(IHyracksTaskContext ctx) {
DatasetLifecycleManager dslcManager = (DatasetLifecycleManager) ((IAsterixAppRuntimeContext) ctx
.getJobletContext().getApplicationContext().getApplicationObject()).getIndexLifecycleManager();
- return new BaseOperationTracker(dslcManager, ioOpCallbackFactory, datasetID);
+ return new BaseOperationTracker(dslcManager, datasetID);
}
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
index d243dd2..cf60182 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
@@ -55,10 +55,11 @@
LSMBTree lsmBTree = LSMBTreeUtils.createLSMTree(virtualBufferCaches, file, runtimeContextProvider
.getBufferCache(), runtimeContextProvider.getFileMapManager(), typeTraits, cmpFactories,
bloomFilterKeyFields, runtimeContextProvider.getBloomFilterFalsePositiveRate(), runtimeContextProvider
- .getLSMMergePolicy(), isPrimary ? runtimeContextProvider.getLSMBTreeOperationTracker(datasetID)
- : new BaseOperationTracker((DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(),
- LSMBTreeIOOperationCallbackFactory.INSTANCE, datasetID), runtimeContextProvider
- .getLSMIOScheduler(), runtimeContextProvider.getLSMBTreeIOOperationCallbackProvider(isPrimary));
+ .getLSMMergePolicy(),
+ isPrimary ? runtimeContextProvider.getLSMBTreeOperationTracker(datasetID) : new BaseOperationTracker(
+ (DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(), datasetID),
+ runtimeContextProvider.getLSMIOScheduler(), LSMBTreeIOOperationCallbackFactory.INSTANCE
+ .createIOOperationCallback());
return lsmBTree;
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
index 8482172..5ed7019 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
@@ -59,25 +59,37 @@
List<IVirtualBufferCache> virtualBufferCaches = runtimeContextProvider.getVirtualBufferCaches(datasetID);
try {
if (isPartitioned) {
- return InvertedIndexUtils.createPartitionedLSMInvertedIndex(virtualBufferCaches, runtimeContextProvider
- .getFileMapManager(), invListTypeTraits, invListCmpFactories, tokenTypeTraits,
- tokenCmpFactories, tokenizerFactory, runtimeContextProvider.getBufferCache(), filePath,
- runtimeContextProvider.getBloomFilterFalsePositiveRate(), runtimeContextProvider
- .getLSMMergePolicy(), new BaseOperationTracker(
- (DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(),
- LSMInvertedIndexIOOperationCallbackFactory.INSTANCE, datasetID), runtimeContextProvider
- .getLSMIOScheduler(), runtimeContextProvider
- .getLSMInvertedIndexIOOperationCallbackProvider());
+ return InvertedIndexUtils.createPartitionedLSMInvertedIndex(
+ virtualBufferCaches,
+ runtimeContextProvider.getFileMapManager(),
+ invListTypeTraits,
+ invListCmpFactories,
+ tokenTypeTraits,
+ tokenCmpFactories,
+ tokenizerFactory,
+ runtimeContextProvider.getBufferCache(),
+ filePath,
+ runtimeContextProvider.getBloomFilterFalsePositiveRate(),
+ runtimeContextProvider.getLSMMergePolicy(),
+ new BaseOperationTracker((DatasetLifecycleManager) runtimeContextProvider
+ .getIndexLifecycleManager(), datasetID), runtimeContextProvider.getLSMIOScheduler(),
+ LSMInvertedIndexIOOperationCallbackFactory.INSTANCE.createIOOperationCallback());
} else {
- return InvertedIndexUtils.createLSMInvertedIndex(virtualBufferCaches, runtimeContextProvider
- .getFileMapManager(), invListTypeTraits, invListCmpFactories, tokenTypeTraits,
- tokenCmpFactories, tokenizerFactory, runtimeContextProvider.getBufferCache(), filePath,
- runtimeContextProvider.getBloomFilterFalsePositiveRate(), runtimeContextProvider
- .getLSMMergePolicy(), new BaseOperationTracker(
- (DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(),
- LSMInvertedIndexIOOperationCallbackFactory.INSTANCE, datasetID), runtimeContextProvider
- .getLSMIOScheduler(), runtimeContextProvider
- .getLSMInvertedIndexIOOperationCallbackProvider());
+ return InvertedIndexUtils.createLSMInvertedIndex(
+ virtualBufferCaches,
+ runtimeContextProvider.getFileMapManager(),
+ invListTypeTraits,
+ invListCmpFactories,
+ tokenTypeTraits,
+ tokenCmpFactories,
+ tokenizerFactory,
+ runtimeContextProvider.getBufferCache(),
+ filePath,
+ runtimeContextProvider.getBloomFilterFalsePositiveRate(),
+ runtimeContextProvider.getLSMMergePolicy(),
+ new BaseOperationTracker((DatasetLifecycleManager) runtimeContextProvider
+ .getIndexLifecycleManager(), datasetID), runtimeContextProvider.getLSMIOScheduler(),
+ LSMInvertedIndexIOOperationCallbackFactory.INSTANCE.createIOOperationCallback());
}
} catch (IndexException e) {
throw new HyracksDataException(e);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
index bc1e889..dc6b30b 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
@@ -66,10 +66,9 @@
runtimeContextProvider.getFileMapManager(), typeTraits, rtreeCmpFactories, btreeCmpFactories,
valueProviderFactories, rtreePolicyType, runtimeContextProvider.getBloomFilterFalsePositiveRate(),
runtimeContextProvider.getLSMMergePolicy(), new BaseOperationTracker(
- (DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(),
- LSMRTreeIOOperationCallbackFactory.INSTANCE, datasetID), runtimeContextProvider
- .getLSMIOScheduler(), runtimeContextProvider.getLSMRTreeIOOperationCallbackProvider(),
- linearizeCmpFactory);
+ (DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(), datasetID),
+ runtimeContextProvider.getLSMIOScheduler(), LSMRTreeIOOperationCallbackFactory.INSTANCE
+ .createIOOperationCallback(), linearizeCmpFactory);
} catch (TreeIndexException e) {
throw new HyracksDataException(e);
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
index 6d86f70..c7df2f2 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
@@ -665,8 +665,6 @@
latchLockTable();
try {
- validateJob(txnContext);
-
if (IS_DEBUG_MODE) {
trackLockRequest("Requested", RequestType.UNLOCK, datasetId, entityHashValue, (byte) 0, txnContext,
dLockInfo, eLockInfo);
@@ -2212,14 +2210,14 @@
if (logRecord.getLogType() == LogType.ENTITY_COMMIT) {
tempDatasetIdObj.setId(logRecord.getDatasetId());
tempJobIdObj.setId(logRecord.getJobId());
- txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(tempJobIdObj);
+ txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(tempJobIdObj, false);
unlock(tempDatasetIdObj, logRecord.getPKHashValue(), txnCtx);
txnCtx.notifyOptracker(false);
- } else if (logRecord.getLogType() == LogType.JOB_COMMIT) {
+ } else if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) {
tempJobIdObj.setId(logRecord.getJobId());
- txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(tempJobIdObj);
+ txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(tempJobIdObj, false);
txnCtx.notifyOptracker(true);
- ((LogPage) logPage).notifyJobCommitter();
+ ((LogPage) logPage).notifyJobTerminator();
}
logRecord = logPageReader.next();
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index 4f0bb59..933afcd 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -38,6 +38,7 @@
import edu.uci.ics.asterix.common.transactions.ILogReader;
import edu.uci.ics.asterix.common.transactions.ILogRecord;
import edu.uci.ics.asterix.common.transactions.ITransactionContext;
+import edu.uci.ics.asterix.common.transactions.ITransactionManager;
import edu.uci.ics.asterix.common.transactions.LogManagerProperties;
import edu.uci.ics.asterix.common.transactions.MutableLong;
import edu.uci.ics.asterix.transaction.management.service.locking.LockManager;
@@ -74,16 +75,16 @@
logDir = logManagerProperties.getLogDir();
logFilePrefix = logManagerProperties.getLogFilePrefix();
flushLSN = new MutableLong();
- initializeLogManager();
+ initializeLogManager(0);
}
- private void initializeLogManager() {
+ private void initializeLogManager(long nextLogFileId) {
emptyQ = new LinkedBlockingQueue<LogPage>(numLogPages);
flushQ = new LinkedBlockingQueue<LogPage>(numLogPages);
for (int i = 0; i < numLogPages; i++) {
emptyQ.offer(new LogPage((LockManager) txnSubsystem.getLockManager(), logPageSize, flushLSN));
}
- appendLSN = initializeLogAnchor();
+ appendLSN = initializeLogAnchor(nextLogFileId);
flushLSN.set(appendLSN);
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("LogManager starts logging in LSN: " + appendLSN);
@@ -95,12 +96,13 @@
}
@Override
- public void log(ILogRecord logRecord) {
+ public void log(ILogRecord logRecord) throws ACIDException {
if (logRecord.getLogSize() > logPageSize) {
throw new IllegalStateException();
}
syncLog(logRecord);
- if (logRecord.getLogType() == LogType.JOB_COMMIT && !logRecord.isFlushed()) {
+ if ((logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT)
+ && !logRecord.isFlushed()) {
synchronized (logRecord) {
while (!logRecord.isFlushed()) {
try {
@@ -113,13 +115,16 @@
}
}
- private synchronized void syncLog(ILogRecord logRecord) {
+ private synchronized void syncLog(ILogRecord logRecord) throws ACIDException {
ITransactionContext txnCtx = logRecord.getTxnCtx();
+ if (txnCtx.getTxnState() == ITransactionManager.ABORTED && logRecord.getLogType() != LogType.ABORT) {
+ throw new ACIDException("Aborted job(" + txnCtx.getJobId() + ") tried to write non-abort type log record.");
+ }
if (getLogFileOffset(appendLSN) + logRecord.getLogSize() > logFileSize) {
prepareNextLogFile();
appendPage.isFull(true);
getAndInitNewPage();
- } else if (!appendPage.hasSpace(logRecord.getLogSize(), getLogFileOffset(appendLSN))) {
+ } else if (!appendPage.hasSpace(logRecord.getLogSize())) {
appendPage.isFull(true);
getAndInitNewPage();
}
@@ -141,7 +146,6 @@
}
appendPage.reset();
appendPage.setFileChannel(appendChannel);
- appendPage.setInitialFlushOffset(getLogFileOffset(appendLSN));
flushQ.offer(appendPage);
}
@@ -229,7 +233,7 @@
return flushLSN;
}
- private long initializeLogAnchor() {
+ private long initializeLogAnchor(long nextLogFileId) {
long fileId = 0;
long offset = 0;
File fileLogDir = new File(logDir);
@@ -237,9 +241,10 @@
if (fileLogDir.exists()) {
List<Long> logFileIds = getLogFileIds();
if (logFileIds == null) {
- createFileIfNotExists(getLogFilePath(0));
+ fileId = nextLogFileId;
+ createFileIfNotExists(getLogFilePath(fileId));
if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("created a log file: " + getLogFilePath(0));
+ LOGGER.info("created a log file: " + getLogFilePath(fileId));
}
} else {
fileId = logFileIds.get(logFileIds.size() - 1);
@@ -247,13 +252,14 @@
offset = logFile.length();
}
} else {
+ fileId = nextLogFileId;
createNewDirectory(logDir);
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("created the log directory: " + logManagerProperties.getLogDir());
}
- createFileIfNotExists(getLogFilePath(0));
+ createFileIfNotExists(getLogFilePath(fileId));
if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("created a log file: " + getLogFilePath(0));
+ LOGGER.info("created a log file: " + getLogFilePath(fileId));
}
}
} catch (IOException ioe) {
@@ -267,8 +273,8 @@
public void renewLogFiles() {
terminateLogFlusher();
- deleteAllLogFiles();
- initializeLogManager();
+ long lastMaxLogFileId = deleteAllLogFiles();
+ initializeLogManager(lastMaxLogFileId + 1);
}
private void terminateLogFlusher() {
@@ -290,7 +296,7 @@
}
}
- private void deleteAllLogFiles() {
+ private long deleteAllLogFiles() {
if (appendChannel != null) {
try {
appendChannel.close();
@@ -305,6 +311,7 @@
throw new IllegalStateException("Failed to delete a file: " + file.getAbsolutePath());
}
}
+ return logFileIds.get(logFileIds.size() - 1);
}
private List<Long> getLogFileIds() {
@@ -384,10 +391,16 @@
}
return newFileChannel;
}
+
+ public long getReadableSmallestLSN() {
+ List<Long> logFileIds = getLogFileIds();
+ return logFileIds.get(0) * logFileSize;
+ }
}
class LogFlusher implements Callable<Boolean> {
- private final static LogPage POISON_PILL = new LogPage(null, ILogRecord.JOB_COMMIT_LOG_SIZE, null);
+ private static final Logger LOGGER = Logger.getLogger(LogFlusher.class.getName());
+ private final static LogPage POISON_PILL = new LogPage(null, ILogRecord.JOB_TERMINATE_LOG_SIZE, null);
private final LogManager logMgr;//for debugging
private final LinkedBlockingQueue<LogPage> emptyQ;
private final LinkedBlockingQueue<LogPage> flushQ;
@@ -402,13 +415,13 @@
flushPage = null;
isStarted = new AtomicBoolean(false);
terminateFlag = new AtomicBoolean(false);
-
+
}
public void terminate() {
//make sure the LogFlusher thread started before terminating it.
synchronized (isStarted) {
- while(!isStarted.get()) {
+ while (!isStarted.get()) {
try {
isStarted.wait();
} catch (InterruptedException e) {
@@ -416,7 +429,7 @@
}
}
}
-
+
terminateFlag.set(true);
if (flushPage != null) {
synchronized (flushPage) {
@@ -432,24 +445,34 @@
@Override
public Boolean call() {
- synchronized(isStarted) {
+ synchronized (isStarted) {
isStarted.set(true);
isStarted.notify();
}
- while (true) {
- flushPage = null;
- try {
- flushPage = flushQ.take();
- if (flushPage == POISON_PILL || terminateFlag.get()) {
- return true;
+ try {
+ while (true) {
+ flushPage = null;
+ try {
+ flushPage = flushQ.take();
+ if (flushPage == POISON_PILL || terminateFlag.get()) {
+ return true;
+ }
+ } catch (InterruptedException e) {
+ if (flushPage == null) {
+ continue;
+ }
}
- } catch (InterruptedException e) {
- if (flushPage == null) {
- continue;
- }
+ flushPage.flush();
+ emptyQ.offer(flushPage);
}
- flushPage.flush();
- emptyQ.offer(flushPage);
+ } catch (Exception e) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("-------------------------------------------------------------------------");
+ LOGGER.info("LogFlusher is terminating abnormally. System is in unusalbe state.");
+ LOGGER.info("-------------------------------------------------------------------------");
+ }
+ e.printStackTrace();
+ throw e;
}
}
}
\ No newline at end of file
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
index edfec69..a3f42a7 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
@@ -58,7 +58,7 @@
appendOffset = 0;
flushOffset = 0;
isLastPage = false;
- syncCommitQ = new LinkedBlockingQueue<ILogRecord>(logPageSize / ILogRecord.JOB_COMMIT_LOG_SIZE);
+ syncCommitQ = new LinkedBlockingQueue<ILogRecord>(logPageSize / ILogRecord.JOB_TERMINATE_LOG_SIZE);
}
////////////////////////////////////
@@ -68,15 +68,14 @@
@Override
public void append(ILogRecord logRecord, long appendLSN) {
logRecord.writeLogRecord(appendBuffer);
- if (logRecord.getLogType() == LogType.UPDATE) {
- logRecord.getTxnCtx().setLastLSN(logRecord.getResourceId(), appendLSN);
- }
+ logRecord.getTxnCtx().setLastLSN(logRecord.getLogType() == LogType.UPDATE ? logRecord.getResourceId() : -1,
+ appendLSN);
synchronized (this) {
appendOffset += logRecord.getLogSize();
if (IS_DEBUG_MODE) {
LOGGER.info("append()| appendOffset: " + appendOffset);
}
- if (logRecord.getLogType() == LogType.JOB_COMMIT) {
+ if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) {
logRecord.isFlushed(false);
syncCommitQ.offer(logRecord);
}
@@ -105,7 +104,7 @@
this.isLastPage = isLastPage;
}
- public boolean hasSpace(int logSize, long logFileOffset) {
+ public boolean hasSpace(int logSize) {
return appendOffset + logSize <= logPageSize;
}
@@ -192,7 +191,7 @@
}
}
- public void notifyJobCommitter() {
+ public void notifyJobTerminator() {
ILogRecord logRecord = null;
while (logRecord == null) {
try {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecord.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecord.java
index 4b0e1f2..dd81df7 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecord.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecord.java
@@ -34,10 +34,9 @@
* LogType(1)
* JobId(4)
* ---------------------------
- * [Header2] (16 bytes + PKValueSize) : for entity_commit and update log types
+ * [Header2] (12 bytes + PKValueSize) : for entity_commit and update log types
* DatasetId(4) //stored in dataset_dataset in Metadata Node
* PKHashValue(4)
- * PKFieldCnt(4)
* PKValueSize(4)
* PKValue(PKValueSize)
* ---------------------------
@@ -61,10 +60,10 @@
* ---------------------------
* = LogSize =
* 1) JOB_COMMIT_LOG_SIZE: 13 bytes (5 + 8)
- * 2) ENTITY_COMMIT: 29 + PKSize (5 + 16 + PKSize + 8)
- * --> ENTITY_COMMIT_LOG_BASE_SIZE = 29
- * 3) UPDATE: 64 + PKSize + New/OldValueSize (5 + 16 + PKSize + 21 + 14 + New/OldValueSize + 8)
- * --> UPDATE_LOG_BASE_SIZE = 64
+ * 2) ENTITY_COMMIT: 25 + PKSize (5 + 12 + PKSize + 8)
+ * --> ENTITY_COMMIT_LOG_BASE_SIZE = 25
+ * 3) UPDATE: 64 + PKSize + New/OldValueSize (5 + 12 + PKSize + 21 + 14 + New/OldValueSize + 8)
+ * --> UPDATE_LOG_BASE_SIZE = 60
*/
public class LogRecord implements ILogRecord {
@@ -73,7 +72,6 @@
private int jobId;
private int datasetId;
private int PKHashValue;
- private int PKFieldCnt;
private int PKValueSize;
private ITupleReference PKValue;
private long prevLSN;
@@ -90,12 +88,13 @@
private long checksum;
//------------- fields in a log record (end) --------------//
+ private int PKFieldCnt;
private static final int CHECKSUM_SIZE = 8;
private ITransactionContext txnCtx;
private long LSN;
private final AtomicBoolean isFlushed;
private final SimpleTupleWriter tupleWriter;
- private final SimpleTupleReference readPKValue;
+ private final PrimaryKeyTupleReference readPKValue;
private final SimpleTupleReference readNewValue;
private final SimpleTupleReference readOldValue;
private final CRC32 checksumGen;
@@ -104,7 +103,7 @@
public LogRecord() {
isFlushed = new AtomicBoolean(false);
tupleWriter = new SimpleTupleWriter();
- readPKValue = (SimpleTupleReference) tupleWriter.createTupleReference();
+ readPKValue = new PrimaryKeyTupleReference();
readNewValue = (SimpleTupleReference) tupleWriter.createTupleReference();
readOldValue = (SimpleTupleReference) tupleWriter.createTupleReference();
checksumGen = new CRC32();
@@ -115,10 +114,9 @@
int beginOffset = buffer.position();
buffer.put(logType);
buffer.putInt(jobId);
- if (logType != LogType.JOB_COMMIT) {
+ if (logType == LogType.UPDATE || logType == LogType.ENTITY_COMMIT) {
buffer.putInt(datasetId);
buffer.putInt(PKHashValue);
- buffer.putInt(PKFieldCnt);
if (PKValueSize <= 0) {
throw new IllegalStateException("Primary Key Size is less than or equal to 0");
}
@@ -173,13 +171,12 @@
try {
logType = buffer.get();
jobId = buffer.getInt();
- if (logType == LogType.JOB_COMMIT) {
+ if (logType == LogType.JOB_COMMIT || logType == LogType.ABORT) {
datasetId = -1;
PKHashValue = -1;
} else {
- datasetId = buffer.getInt();
+ datasetId = buffer.getInt();
PKHashValue = buffer.getInt();
- PKFieldCnt = buffer.getInt();
PKValueSize = buffer.getInt();
if (PKValueSize <= 0) {
throw new IllegalStateException("Primary Key Size is less than or equal to 0");
@@ -217,12 +214,20 @@
}
return true;
}
-
+
private ITupleReference readPKValue(ByteBuffer buffer) {
- return readTuple(buffer, readPKValue, PKFieldCnt, PKValueSize);
+ if (buffer.position() + PKValueSize > buffer.limit()) {
+ throw new BufferUnderflowException();
+ }
+ readPKValue.reset(buffer.array(), buffer.position(), PKValueSize);
+ buffer.position(buffer.position() + PKValueSize);
+ return readPKValue;
}
private ITupleReference readTuple(ByteBuffer srcBuffer, SimpleTupleReference destTuple, int fieldCnt, int size) {
+ if (srcBuffer.position() + size > srcBuffer.limit()) {
+ throw new BufferUnderflowException();
+ }
destTuple.setFieldCount(fieldCnt);
destTuple.resetByTupleOffset(srcBuffer, srcBuffer.position());
srcBuffer.position(srcBuffer.position() + size);
@@ -230,9 +235,9 @@
}
@Override
- public void formJobCommitLogRecord(ITransactionContext txnCtx) {
+ public void formJobTerminateLogRecord(ITransactionContext txnCtx, boolean isCommit) {
this.txnCtx = txnCtx;
- this.logType = LogType.JOB_COMMIT;
+ this.logType = isCommit ? LogType.JOB_COMMIT : LogType.ABORT;
this.jobId = txnCtx.getJobId().getId();
this.datasetId = -1;
this.PKHashValue = -1;
@@ -281,7 +286,8 @@
setUpdateLogSize();
break;
case LogType.JOB_COMMIT:
- logSize = JOB_COMMIT_LOG_SIZE;
+ case LogType.ABORT:
+ logSize = JOB_TERMINATE_LOG_SIZE;
break;
case LogType.ENTITY_COMMIT:
logSize = ENTITY_COMMIT_LOG_BASE_SIZE + PKValueSize;
@@ -298,7 +304,7 @@
builder.append(" LogType : ").append(LogType.toString(logType));
builder.append(" LogSize : ").append(logSize);
builder.append(" JobId : ").append(jobId);
- if (logType != LogType.JOB_COMMIT) {
+ if (logType == LogType.ENTITY_COMMIT || logType == LogType.UPDATE) {
builder.append(" DatasetId : ").append(datasetId);
builder.append(" PKHashValue : ").append(PKHashValue);
builder.append(" PKFieldCnt : ").append(PKFieldCnt);
@@ -496,17 +502,17 @@
public void setLSN(long LSN) {
this.LSN = LSN;
}
-
+
@Override
public int getPKValueSize() {
return PKValueSize;
}
-
+
@Override
public ITupleReference getPKValue() {
return PKValue;
}
-
+
@Override
public void setPKFields(int[] primaryKeyFields) {
PKFields = primaryKeyFields;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java
index 823c8d3..f9e9304 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java
@@ -19,11 +19,14 @@
public static final byte UPDATE = 0;
public static final byte JOB_COMMIT = 1;
public static final byte ENTITY_COMMIT = 2;
+ public static final byte ABORT = 3;
private static final String STRING_UPDATE = "UPDATE";
private static final String STRING_JOB_COMMIT = "JOB_COMMIT";
private static final String STRING_ENTITY_COMMIT = "ENTITY_COMMIT";
+ private static final String STRING_ABORT = "ABORT";
private static final String STRING_INVALID_LOG_TYPE = "INVALID_LOG_TYPE";
+
public static String toString(byte logType) {
switch (logType) {
case LogType.UPDATE:
@@ -32,6 +35,8 @@
return STRING_JOB_COMMIT;
case LogType.ENTITY_COMMIT:
return STRING_ENTITY_COMMIT;
+ case LogType.ABORT:
+ return STRING_ABORT;
default:
return STRING_INVALID_LOG_TYPE;
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/PrimaryKeyTupleReference.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/PrimaryKeyTupleReference.java
new file mode 100644
index 0000000..d45b209
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/PrimaryKeyTupleReference.java
@@ -0,0 +1,36 @@
+package edu.uci.ics.asterix.transaction.management.service.logging;
+
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public class PrimaryKeyTupleReference implements ITupleReference {
+ private byte[] fieldData;
+ private int start;
+ private int length;
+
+ public void reset(byte[] fieldData, int start, int length) {
+ this.fieldData = fieldData;
+ this.start = start;
+ this.length = length;
+ }
+
+ @Override
+ public int getFieldCount() {
+ return 1;
+ }
+
+ @Override
+ public byte[] getFieldData(int fIdx) {
+ return fieldData;
+ }
+
+ @Override
+ public int getFieldStart(int fIdx) {
+ return start;
+ }
+
+ @Override
+ public int getFieldLength(int fIdx) {
+ return length;
+ }
+
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointThread.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointThread.java
index 6f6da4a..dca14d8 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointThread.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointThread.java
@@ -16,8 +16,8 @@
import java.util.List;
-import edu.uci.ics.asterix.common.context.BaseOperationTracker;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
import edu.uci.ics.asterix.common.transactions.IRecoveryManager;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
@@ -70,7 +70,8 @@
long firstLSN;
if (openIndexList.size() > 0) {
for (IIndex index : openIndexList) {
- firstLSN = ((BaseOperationTracker) ((ILSMIndex) index).getOperationTracker()).getFirstLSN();
+ firstLSN = ((AbstractLSMIOOperationCallback) ((ILSMIndex) index).getIOOperationCallback())
+ .getFirstLSN();
minMCTFirstLSN = Math.min(minMCTFirstLSN, firstLSN);
}
} else {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
index 2ad3055..95ee767 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -38,7 +38,6 @@
import java.util.logging.Logger;
import edu.uci.ics.asterix.common.api.ILocalResourceMetadata;
-import edu.uci.ics.asterix.common.context.BaseOperationTracker;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
import edu.uci.ics.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
import edu.uci.ics.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
@@ -77,6 +76,7 @@
private final TransactionSubsystem txnSubsystem;
private final LogManager logMgr;
private final int checkpointHistory;
+ private final long SHARP_CHECKPOINT_LSN = -1;
/**
* A file at a known location that contains the LSN of the last log record
@@ -115,18 +115,23 @@
return state;
}
- //#. if minMCTFirstLSN is equal to -1 &&
- // checkpointLSN in the checkpoint file is equal to the lastLSN in the log file,
- // then return healthy state. Otherwise, return corrupted.
- if ((checkpointObject.getMinMCTFirstLsn() == -2 && logMgr.getAppendLSN() == 0)
- || (checkpointObject.getMinMCTFirstLsn() == -1 && checkpointObject.getCheckpointLsn() == logMgr
- .getAppendLSN())) {
+ long readableSmallestLSN = logMgr.getReadableSmallestLSN();
+ if (logMgr.getAppendLSN() == readableSmallestLSN) {
+ if (checkpointObject.getMinMCTFirstLsn() != SHARP_CHECKPOINT_LSN) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("[Warning] ---------------------------------------------------");
+ LOGGER.info("[Warning] Some(or all) of transaction log files are lost.");
+ LOGGER.info("[Warning] ---------------------------------------------------");
+ //No choice but continuing when the log files are lost.
+ }
+ }
+ state = SystemState.HEALTHY;
+ return state;
+ } else if (checkpointObject.getCheckpointLsn() == logMgr.getAppendLSN()
+ && checkpointObject.getMinMCTFirstLsn() == SHARP_CHECKPOINT_LSN) {
state = SystemState.HEALTHY;
return state;
} else {
- if (logMgr.getAppendLSN() == 0) {
- throw new IllegalStateException("Transaction log files are lost.");
- }
state = SystemState.CORRUPTED;
return state;
}
@@ -138,6 +143,7 @@
int entityCommitLogCount = 0;
int jobCommitLogCount = 0;
int redoCount = 0;
+ int abortLogCount = 0;
int jobId = -1;
state = SystemState.RECOVERING;
@@ -154,10 +160,11 @@
TxnId winnerEntity = null;
//#. read checkpoint file and set lowWaterMark where anaylsis and redo start
+ long readableSmallestLSN = logMgr.getReadableSmallestLSN();
CheckpointObject checkpointObject = readCheckpoint();
- long lowWaterMarkLsn = checkpointObject.getMinMCTFirstLsn();
- if (lowWaterMarkLsn == -1 || lowWaterMarkLsn == -2) {
- lowWaterMarkLsn = 0;
+ long lowWaterMarkLSN = checkpointObject.getMinMCTFirstLsn();
+ if (lowWaterMarkLSN < readableSmallestLSN) {
+ lowWaterMarkLSN = readableSmallestLSN;
}
int maxJobId = checkpointObject.getMaxJobId();
@@ -171,11 +178,11 @@
//#. set log reader to the lowWaterMarkLsn
ILogReader logReader = logMgr.getLogReader(true);
- logReader.initializeScan(lowWaterMarkLsn);
+ logReader.initializeScan(lowWaterMarkLSN);
ILogRecord logRecord = logReader.next();
while (logRecord != null) {
if (IS_DEBUG_MODE) {
- System.out.println(logRecord.getLogRecordForDisplay());
+ LOGGER.info(logRecord.getLogRecordForDisplay());
}
//update max jobId
if (logRecord.getJobId() > maxJobId) {
@@ -183,16 +190,12 @@
}
switch (logRecord.getLogType()) {
case LogType.UPDATE:
- if (IS_DEBUG_MODE) {
- updateLogCount++;
- }
+ updateLogCount++;
break;
case LogType.JOB_COMMIT:
winnerJobSet.add(Integer.valueOf(logRecord.getJobId()));
jobId2WinnerEntitiesMap.remove(Integer.valueOf(logRecord.getJobId()));
- if (IS_DEBUG_MODE) {
- jobCommitLogCount++;
- }
+ jobCommitLogCount++;
break;
case LogType.ENTITY_COMMIT:
jobId = logRecord.getJobId();
@@ -205,9 +208,10 @@
winnerEntitySet = jobId2WinnerEntitiesMap.get(Integer.valueOf(jobId));
}
winnerEntitySet.add(winnerEntity);
- if (IS_DEBUG_MODE) {
- entityCommitLogCount++;
- }
+ entityCommitLogCount++;
+ break;
+ case LogType.ABORT:
+ abortLogCount++;
break;
default:
throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
@@ -239,11 +243,11 @@
ILocalResourceRepository localResourceRepository = appRuntimeContext.getLocalResourceRepository();
//#. set log reader to the lowWaterMarkLsn again.
- logReader.initializeScan(lowWaterMarkLsn);
+ logReader.initializeScan(lowWaterMarkLSN);
logRecord = logReader.next();
while (logRecord != null) {
- if (LogManager.IS_DEBUG_MODE) {
- System.out.println(logRecord.getLogRecordForDisplay());
+ if (IS_DEBUG_MODE) {
+ LOGGER.info(logRecord.getLogRecordForDisplay());
}
LSN = logRecord.getLSN();
jobId = logRecord.getJobId();
@@ -284,6 +288,7 @@
* log record.
*******************************************************************/
if (localResource == null) {
+ logRecord = logReader.next();
continue;
}
/*******************************************************************/
@@ -297,10 +302,8 @@
//#. get maxDiskLastLSN
ILSMIndex lsmIndex = (ILSMIndex) index;
- BaseOperationTracker indexOpTracker = (BaseOperationTracker) lsmIndex.getOperationTracker();
- AbstractLSMIOOperationCallback abstractLSMIOCallback = (AbstractLSMIOOperationCallback) indexOpTracker
- .getIOOperationCallback();
- maxDiskLastLsn = abstractLSMIOCallback.getComponentLSN(index.getImmutableComponents());
+ maxDiskLastLsn = ((AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback())
+ .getComponentLSN(lsmIndex.getImmutableComponents());
//#. set resourceId and maxDiskLastLSN to the map
resourceId2MaxLSNMap.put(Long.valueOf(resourceId), Long.valueOf(maxDiskLastLsn));
@@ -310,15 +313,14 @@
if (LSN > maxDiskLastLsn) {
redo(logRecord);
- if (IS_DEBUG_MODE) {
- redoCount++;
- }
+ redoCount++;
}
}
break;
case LogType.JOB_COMMIT:
case LogType.ENTITY_COMMIT:
+ case LogType.ABORT:
//do nothing
break;
@@ -338,10 +340,8 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("[RecoveryMgr] recovery is completed.");
- }
- if (IS_DEBUG_MODE) {
- System.out.println("[RecoveryMgr] Count: Update/EntityCommit/JobCommit/Redo = " + updateLogCount + "/"
- + entityCommitLogCount + "/" + jobCommitLogCount + "/" + redoCount);
+ LOGGER.info("[RecoveryMgr's recovery log count] update/entityCommit/jobCommit/abort/redo = " + updateLogCount + "/"
+ + entityCommitLogCount + "/" + jobCommitLogCount + "/" + abortLogCount + "/" + redoCount);
}
}
@@ -375,9 +375,8 @@
ILSMIndex lsmIndex = (ILSMIndex) index;
ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
- BaseOperationTracker indexOpTracker = (BaseOperationTracker) lsmIndex.getOperationTracker();
BlockingIOOperationCallbackWrapper cb = new BlockingIOOperationCallbackWrapper(
- indexOpTracker.getIOOperationCallback());
+ lsmIndex.getIOOperationCallback());
callbackList.add(cb);
try {
indexAccessor.scheduleFlush(cb);
@@ -393,17 +392,18 @@
throw new ACIDException(e);
}
}
- minMCTFirstLSN = -2;
+ minMCTFirstLSN = SHARP_CHECKPOINT_LSN;
} else {
long firstLSN;
minMCTFirstLSN = Long.MAX_VALUE;
if (openIndexList.size() > 0) {
for (IIndex index : openIndexList) {
- firstLSN = ((BaseOperationTracker) ((ILSMIndex) index).getOperationTracker()).getFirstLSN();
+ firstLSN = ((AbstractLSMIOOperationCallback) ((ILSMIndex) index).getIOOperationCallback())
+ .getFirstLSN();
minMCTFirstLSN = Math.min(minMCTFirstLSN, firstLSN);
}
} else {
- minMCTFirstLSN = -1;
+ minMCTFirstLSN = SHARP_CHECKPOINT_LSN;
}
}
CheckpointObject checkpointObject = new CheckpointObject(logMgr.getAppendLSN(), minMCTFirstLSN,
@@ -580,7 +580,7 @@
break;
} else {
if (IS_DEBUG_MODE) {
- System.out.println(logRecord.getLogRecordForDisplay());
+ LOGGER.info(logRecord.getLogRecordForDisplay());
}
currentLSN = logRecord.getLSN();
}
@@ -600,9 +600,9 @@
loserTxnTable.put(loserEntity, undoLSNSet);
}
undoLSNSet.add(Long.valueOf(currentLSN));
+ updateLogCount++;
if (IS_DEBUG_MODE) {
- updateLogCount++;
- System.out.println("" + Thread.currentThread().getId() + "======> update[" + currentLSN + "]:"
+ LOGGER.info("" + Thread.currentThread().getId() + "======> update[" + currentLSN + "]:"
+ tempKeyTxnId);
}
break;
@@ -611,14 +611,21 @@
throw new ACIDException("Unexpected LogType(" + logRecord.getLogType() + ") during abort.");
case LogType.ENTITY_COMMIT:
- loserTxnTable.remove(tempKeyTxnId);
+ undoLSNSet = loserTxnTable.remove(tempKeyTxnId);
+ if (undoLSNSet == null) {
+ undoLSNSet = loserTxnTable.remove(tempKeyTxnId);
+ }
+ entityCommitLogCount++;
if (IS_DEBUG_MODE) {
- entityCommitLogCount++;
- System.out.println("" + Thread.currentThread().getId() + "======> entity_commit[" + currentLSN + "]"
+ LOGGER.info("" + Thread.currentThread().getId() + "======> entity_commit[" + currentLSN + "]"
+ tempKeyTxnId);
}
break;
+ case LogType.ABORT:
+ //ignore
+ break;
+
default:
throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
}
@@ -650,23 +657,17 @@
throw new ACIDException("IllegalState exception during abort( " + txnContext.getJobId() + ")");
}
if (IS_DEBUG_MODE) {
- System.out.println(logRecord.getLogRecordForDisplay());
+ LOGGER.info(logRecord.getLogRecordForDisplay());
}
undo(logRecord);
- if (IS_DEBUG_MODE) {
- undoCount++;
- }
+ undoCount++;
}
}
-
logReader.close();
-
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info(" undone loser transaction's effect");
- }
- if (IS_DEBUG_MODE) {
- System.out.println("UpdateLogCount/CommitLogCount/UndoCount:" + updateLogCount + "/" + entityCommitLogCount
- + "/" + undoCount);
+ LOGGER.info("[RecoveryManager's rollback log count] update/entityCommit/undo:" + updateLogCount + "/" + entityCommitLogCount + "/"
+ + undoCount);
}
}
@@ -721,7 +722,7 @@
} else {
throw new IllegalStateException("Unsupported OperationType: " + logRecord.getNewOp());
}
- ((BaseOperationTracker) index.getOperationTracker()).updateLastLSN(logRecord.getLSN());
+ ((AbstractLSMIOOperationCallback) index.getIOOperationCallback()).updateLastLSN(logRecord.getLSN());
} catch (Exception e) {
throw new IllegalStateException("Failed to redo", e);
}
@@ -765,6 +766,7 @@
this.datasetId = datasetId;
this.pkHashValue = pkHashValue;
this.tupleReferencePKValue = pkValue;
+ this.pkSize = pkSize;
isByteArrayPKValue = false;
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java
index 59a8363..5d4806d 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java
@@ -15,15 +15,11 @@
package edu.uci.ics.asterix.transaction.management.service.transaction;
import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
-import edu.uci.ics.asterix.common.context.BaseOperationTracker;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
@@ -33,18 +29,12 @@
import edu.uci.ics.hyracks.storage.common.file.ResourceIdFactory;
public class AsterixRuntimeComponentsProvider implements IIndexLifecycleManagerProvider, IStorageManagerInterface,
- ILSMIOOperationSchedulerProvider, ILSMMergePolicyProvider,
- ILSMIOOperationCallbackProvider {
+ ILSMIOOperationSchedulerProvider, ILSMMergePolicyProvider {
private static final long serialVersionUID = 1L;
public static final AsterixRuntimeComponentsProvider RUNTIME_PROVIDER = new AsterixRuntimeComponentsProvider();
-
- private AsterixRuntimeComponentsProvider() {
- }
- @Override
- public ILSMIOOperationCallback getIOOperationCallback(ILSMIndex index) {
- return ((BaseOperationTracker) index.getOperationTracker()).getIOOperationCallback();
+ private AsterixRuntimeComponentsProvider() {
}
@Override
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
index 678956b..6fb91c8 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
@@ -21,9 +21,9 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import edu.uci.ics.asterix.common.context.BaseOperationTracker;
import edu.uci.ics.asterix.common.context.PrimaryIndexOperationTracker;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
import edu.uci.ics.asterix.common.transactions.AbstractOperationCallback;
import edu.uci.ics.asterix.common.transactions.ITransactionContext;
import edu.uci.ics.asterix.common.transactions.ITransactionManager;
@@ -71,7 +71,7 @@
//indexMap is concurrently accessed by multiple threads,
//so those threads are synchronized on indexMap object itself
- private Map<MutableLong, BaseOperationTracker> indexMap;
+ private Map<MutableLong, AbstractLSMIOOperationCallback> indexMap;
//TODO: fix ComponentLSNs' issues.
//primaryIndex, primaryIndexCallback, and primaryIndexOptracker will be modified accordingly
@@ -97,7 +97,7 @@
isTimeout = false;
isWriteTxn = new AtomicBoolean(false);
isMetadataTxn = false;
- indexMap = new HashMap<MutableLong, BaseOperationTracker>();
+ indexMap = new HashMap<MutableLong, AbstractLSMIOOperationCallback>();
primaryIndex = null;
tempResourceIdForRegister = new MutableLong();
tempResourceIdForSetLSN = new MutableLong();
@@ -114,7 +114,8 @@
}
tempResourceIdForRegister.set(resourceId);
if (!indexMap.containsKey(tempResourceIdForRegister)) {
- indexMap.put(new MutableLong(resourceId), ((BaseOperationTracker) index.getOperationTracker()));
+ indexMap.put(new MutableLong(resourceId),
+ ((AbstractLSMIOOperationCallback) index.getIOOperationCallback()));
}
}
}
@@ -122,16 +123,17 @@
//[Notice]
//This method is called sequentially by the LogAppender threads.
//However, the indexMap is concurrently read and modified through this method and registerIndexAndCallback()
- //TODO: fix issues - 591, 609, 612, and 614.
@Override
public void setLastLSN(long resourceId, long LSN) {
synchronized (indexMap) {
firstLSN.compareAndSet(-1, LSN);
lastLSN.set(Math.max(lastLSN.get(), LSN));
- tempResourceIdForSetLSN.set(resourceId);
- //TODO; create version number tracker and keep LSNs there.
- BaseOperationTracker opTracker = indexMap.get(tempResourceIdForSetLSN);
- opTracker.updateLastLSN(LSN);
+ if (resourceId != -1) {
+ //Non-update log's resourceId is -1.
+ tempResourceIdForSetLSN.set(resourceId);
+ AbstractLSMIOOperationCallback ioOpCallback = indexMap.get(tempResourceIdForSetLSN);
+ ioOpCallback.updateLastLSN(LSN);
+ }
}
}
@@ -221,4 +223,10 @@
public LogRecord getLogRecord() {
return logRecord;
}
+
+ public void cleanupForAbort() {
+ if (primaryIndexOpTracker != null) {
+ primaryIndexOpTracker.cleanupNumActiveOperationsForAbortedJob(primaryIndexCallback);
+ }
+ }
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
index 01b38c2..07fc152 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -53,7 +53,12 @@
txnCtx.setTxnState(ITransactionManager.ABORTED);
}
try {
- txnSubsystem.getRecoveryManager().rollbackTransaction(txnCtx);
+ if (txnCtx.isWriteTxn()) {
+ LogRecord logRecord = ((TransactionContext) txnCtx).getLogRecord();
+ logRecord.formJobTerminateLogRecord(txnCtx, false);
+ txnSubsystem.getLogManager().log(logRecord);
+ txnSubsystem.getRecoveryManager().rollbackTransaction(txnCtx);
+ }
} catch (Exception ae) {
String msg = "Could not complete rollback! System is in an inconsistent state";
if (LOGGER.isLoggable(Level.SEVERE)) {
@@ -62,6 +67,7 @@
ae.printStackTrace();
throw new ACIDException(msg, ae);
} finally {
+ ((TransactionContext) txnCtx).cleanupForAbort();
txnSubsystem.getLockManager().releaseLocks(txnCtx);
transactionContextRepository.remove(txnCtx.getJobId());
}
@@ -69,20 +75,24 @@
@Override
public ITransactionContext beginTransaction(JobId jobId) throws ACIDException {
- return getTransactionContext(jobId);
+ return getTransactionContext(jobId, true);
}
@Override
- public ITransactionContext getTransactionContext(JobId jobId) throws ACIDException {
+ public ITransactionContext getTransactionContext(JobId jobId, boolean createIfNotExist) throws ACIDException {
setMaxJobId(jobId.getId());
ITransactionContext txnCtx = transactionContextRepository.get(jobId);
if (txnCtx == null) {
- synchronized (this) {
- txnCtx = transactionContextRepository.get(jobId);
- if (txnCtx == null) {
- txnCtx = new TransactionContext(jobId, txnSubsystem);
- transactionContextRepository.put(jobId, txnCtx);
+ if (createIfNotExist) {
+ synchronized (this) {
+ txnCtx = transactionContextRepository.get(jobId);
+ if (txnCtx == null) {
+ txnCtx = new TransactionContext(jobId, txnSubsystem);
+ transactionContextRepository.put(jobId, txnCtx);
+ }
}
+ } else {
+ throw new ACIDException("TransactionContext of " + jobId + " doesn't exist.");
}
}
return txnCtx;
@@ -94,7 +104,7 @@
try {
if (txnCtx.isWriteTxn()) {
LogRecord logRecord = ((TransactionContext) txnCtx).getLogRecord();
- logRecord.formJobCommitLogRecord(txnCtx);
+ logRecord.formJobTerminateLogRecord(txnCtx, true);
txnSubsystem.getLogManager().log(logRecord);
}
} catch (Exception ae) {