changed log record format to deal with undo/redo operation properly
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_lsm_stabilization@763 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/test/resources/metadata-transactions/check-state-queries.txt b/asterix-app/src/test/resources/metadata-transactions/check-state-queries.txt
index 6618ddd..5cee728 100644
--- a/asterix-app/src/test/resources/metadata-transactions/check-state-queries.txt
+++ b/asterix-app/src/test/resources/metadata-transactions/check-state-queries.txt
@@ -1,6 +1,6 @@
-check_dataset.aql
-check_datatype.aql
-check_dataverse.aql
-check_index.aql
-check_node.aql
-check_nodegroup.aql
+//check_dataset.aql
+//check_datatype.aql
+//check_dataverse.aql
+//check_index.aql
+//check_node.aql
+//check_nodegroup.aql
diff --git a/asterix-app/src/test/resources/metadata-transactions/init-state-queries.txt b/asterix-app/src/test/resources/metadata-transactions/init-state-queries.txt
index 5359017..5fb1c42 100644
--- a/asterix-app/src/test/resources/metadata-transactions/init-state-queries.txt
+++ b/asterix-app/src/test/resources/metadata-transactions/init-state-queries.txt
@@ -1 +1 @@
-customers_orders.aql
\ No newline at end of file
+//customers_orders.aql
diff --git a/asterix-app/src/test/resources/metadata-transactions/queries.txt b/asterix-app/src/test/resources/metadata-transactions/queries.txt
index 0762364..5f1589e 100644
--- a/asterix-app/src/test/resources/metadata-transactions/queries.txt
+++ b/asterix-app/src/test/resources/metadata-transactions/queries.txt
@@ -1,20 +1,20 @@
-create_duplicate_dataset.aql
-create_duplicate_dataverse.aql
-create_duplicate_index.aql
-create_duplicate_nodegroup.aql
-create_duplicate_type.aql
-drop_nonexistent_dataset.aql
-drop_nonexistent_datatype.aql
-drop_nonexistent_dataverse.aql
-drop_nonexistent_index.aql
-drop_nonexistent_nodegroup.aql
-rollback_drop_dataset.aql
-rollback_drop_datatype.aql
-rollback_drop_dataverse.aql
-rollback_drop_index.aql
-rollback_drop_nodegroup.aql
-rollback_new_dataset.aql
-rollback_new_datatype.aql
-rollback_new_dataverse.aql
-rollback_new_index.aql
-rollback_new_nodegroup.aql
+//create_duplicate_dataset.aql
+//create_duplicate_dataverse.aql
+//create_duplicate_index.aql
+//create_duplicate_nodegroup.aql
+//create_duplicate_type.aql
+//drop_nonexistent_dataset.aql
+//drop_nonexistent_datatype.aql
+//drop_nonexistent_dataverse.aql
+//drop_nonexistent_index.aql
+//drop_nonexistent_nodegroup.aql
+//rollback_drop_dataset.aql
+//rollback_drop_datatype.aql
+//rollback_drop_dataverse.aql
+//rollback_drop_index.aql
+//rollback_drop_nodegroup.aql
+//rollback_new_dataset.aql
+//rollback_new_datatype.aql
+//rollback_new_dataverse.aql
+//rollback_new_index.aql
+//rollback_new_nodegroup.aql
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
index 98d6ed3..c572ed6 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
@@ -78,6 +78,7 @@
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
import edu.uci.ics.hyracks.storage.am.lsm.btree.impls.LSMBTreeRangeSearchCursor;
+import edu.uci.ics.hyracks.storage.am.lsm.btree.tuples.LSMBTreeTupleReference;
public class MetadataNode implements IMetadataNode {
private static final long serialVersionUID = 1L;
@@ -263,7 +264,9 @@
transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.X, txnCtx);
// TODO: fix exceptions once new BTree exception model is in hyracks.
indexAccessor.insert(tuple);
- index.getTreeLogger().generateLogRecord(transactionProvider, txnCtx, IndexOperation.INSERT, tuple);
+ //TODO: extract the key from the tuple and get the PKHashValue from the key.
+ //index.getIndexLogger().generateLogRecord(transactionProvider, txnCtx, index.getDatasetId().getId(), null,
+ // resourceID, IndexOperation.INSERT, tuple, null, null);
indexLifecycleManager.close(resourceID);
}
@@ -515,7 +518,10 @@
// regular waiters in the LockManager.
transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.X, txnCtx);
indexAccessor.delete(tuple);
- index.getTreeLogger().generateLogRecord(transactionProvider, txnCtx, IndexOperation.DELETE, tuple);
+ //TODO: extract the key from the tuple and get the PKHashValue from the key.
+ //check how to get the oldValue.
+ //index.getIndexLogger().generateLogRecord(transactionProvider, txnCtx, index.getDatasetId().getId(), null,
+ // resourceID, IndexOperation.DELETE, tuple, operation, null);
indexLifecycleManager.close(resourceID);
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataIndex.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataIndex.java
index 88dac70..8554ddd 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataIndex.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataIndex.java
@@ -19,14 +19,14 @@
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
-import edu.uci.ics.asterix.transaction.management.service.logging.TreeLogger;
+import edu.uci.ics.asterix.transaction.management.service.logging.IndexLogger;
import edu.uci.ics.asterix.transaction.management.service.transaction.DatasetId;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
/**
* Descriptor interface for a primary or secondary index on metadata datasets.
@@ -66,7 +66,7 @@
public void setFileId(int fileId);
- public void initTreeLogger(ITreeIndex treeIndex) throws ACIDException;
+ public void initIndexLogger(IIndex index) throws ACIDException;
public int getFileId();
@@ -74,9 +74,7 @@
public long getResourceID();
- public byte[] getResourceId();
-
- public TreeLogger getTreeLogger();
+ public IndexLogger getIndexLogger();
public DatasetId getDatasetId();
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
index 3b5af14..a1a510c 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -44,8 +44,8 @@
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
import edu.uci.ics.asterix.transaction.management.resource.TransactionalResourceRepository;
-import edu.uci.ics.asterix.transaction.management.service.logging.DataUtil;
-import edu.uci.ics.asterix.transaction.management.service.logging.TreeResourceManager;
+import edu.uci.ics.asterix.transaction.management.service.logging.IndexResourceManager;
+import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager.ResourceType;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
import edu.uci.ics.hyracks.api.application.INCApplicationContext;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -54,8 +54,8 @@
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.io.IIOManager;
import edu.uci.ics.hyracks.storage.am.common.api.IInMemoryFreePageManager;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.frames.LIFOMetaDataFrameFactory;
import edu.uci.ics.hyracks.storage.am.lsm.btree.impls.LSMBTree;
@@ -128,8 +128,8 @@
boolean isNewUniverse = true;
TransactionalResourceRepository resourceRepository = runtimeContext.getTransactionProvider()
.getTransactionalResourceRepository();
- resourceRepository.registerTransactionalResourceManager(TreeResourceManager.ID, new TreeResourceManager(
- runtimeContext.getTransactionProvider()));
+ resourceRepository.registerTransactionalResourceManager(ResourceType.LSM_BTREE, new IndexResourceManager(
+ ResourceType.LSM_BTREE, runtimeContext.getTransactionProvider()));
metadataNodeName = asterixProperties.getMetadataNodeName();
isNewUniverse = asterixProperties.isNewUniverse();
@@ -205,13 +205,12 @@
}
}
- private static void registerTransactionalResource(IMetadataIndex index,
+ private static void registerTransactionalResource(IMetadataIndex metadataIndex,
TransactionalResourceRepository resourceRepository) throws ACIDException {
- long resourceID = index.getResourceID();
- ITreeIndex treeIndex = (ITreeIndex) indexLifecycleManager.getIndex(resourceID);
- byte[] resourceId = DataUtil.longToByteArray(resourceID);
- resourceRepository.registerTransactionalResource(resourceId, treeIndex);
- index.initTreeLogger(treeIndex);
+ long resourceId = metadataIndex.getResourceID();
+ IIndex index = indexLifecycleManager.getIndex(resourceId);
+ resourceRepository.registerTransactionalResource(resourceId, index);
+ metadataIndex.initIndexLogger(index);
}
public static void insertInitialDataverses(MetadataTransactionContext mdTxnCtx) throws Exception {
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataIndex.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataIndex.java
index d1e377e..c069d0d 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataIndex.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataIndex.java
@@ -28,16 +28,16 @@
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
-import edu.uci.ics.asterix.transaction.management.service.logging.DataUtil;
-import edu.uci.ics.asterix.transaction.management.service.logging.TreeLogger;
+import edu.uci.ics.asterix.transaction.management.service.logging.IndexLogger;
import edu.uci.ics.asterix.transaction.management.service.transaction.DatasetId;
+import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager.ResourceType;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
/**
* Descriptor for a primary or secondary index on metadata datasets.
@@ -67,11 +67,10 @@
protected FileReference file;
// Identifier of file BufferCache backing this metadata btree index.
protected int fileId;
- protected long resourceID;
// Resource id of this index for use in transactions.
- protected byte[] indexResourceId;
+ protected long resourceId;
// Logger for tree indexes.
- private TreeLogger treeLogger;
+ private IndexLogger indexLogger;
// datasetId
private final DatasetId datasetId;
@@ -205,12 +204,11 @@
@Override
public void setFileId(int fileId) {
this.fileId = fileId;
- this.indexResourceId = DataUtil.intToByteArray(fileId);
}
@Override
- public void initTreeLogger(ITreeIndex treeIndex) throws ACIDException {
- this.treeLogger = new TreeLogger(indexResourceId, treeIndex);
+ public void initIndexLogger(IIndex index) throws ACIDException {
+ this.indexLogger = new IndexLogger(resourceId, ResourceType.LSM_BTREE, index);
}
@Override
@@ -223,13 +221,8 @@
return payloadType;
}
- @Override
- public byte[] getResourceId() {
- return indexResourceId;
- }
-
- public TreeLogger getTreeLogger() {
- return treeLogger;
+ public IndexLogger getIndexLogger() {
+ return indexLogger;
}
@Override
@@ -244,13 +237,12 @@
@Override
public void setResourceID(long resourceID) {
- this.resourceID = resourceID;
- this.indexResourceId = DataUtil.longToByteArray(resourceID);
+ this.resourceId = resourceID;
}
@Override
public long getResourceID() {
- return resourceID;
+ return resourceId;
}
@Override
diff --git a/asterix-transactions/pom.xml b/asterix-transactions/pom.xml
index c440c2a..cb7d4e2 100644
--- a/asterix-transactions/pom.xml
+++ b/asterix-transactions/pom.xml
@@ -32,5 +32,10 @@
<type>jar</type>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-storage-am-lsm-common</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ </dependency>
</dependencies>
</project>
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/TransactionalResourceRepository.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/TransactionalResourceRepository.java
index 25309e1..5997bf1 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/TransactionalResourceRepository.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/TransactionalResourceRepository.java
@@ -20,6 +20,7 @@
import java.util.Map;
import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager;
+import edu.uci.ics.asterix.transaction.management.service.transaction.MutableResourceId;
/**
* Represents a repository containing Resource Managers and Resources in the
@@ -32,17 +33,18 @@
*/
public class TransactionalResourceRepository {
- private Map<ByteBuffer, Object> resourceRepository = new HashMap<ByteBuffer, Object>(); // repository
+ private Map<MutableResourceId, Object> resourceRepository = new HashMap<MutableResourceId, Object>(); // repository
private Map<Byte, IResourceManager> resourceMgrRepository = new HashMap<Byte, IResourceManager>(); // repository
+
+ private MutableResourceId mutableResourceId = new MutableResourceId(0);
- public void registerTransactionalResource(byte[] resourceBytes, Object resource) {
- // convert to ByteBuffer so that a byte[] can be used as a key in a hash map.
- ByteBuffer resourceId = ByteBuffer.wrap(resourceBytes); // need to
-
+ public void registerTransactionalResource(long resourceId, Object resource) {
synchronized (resourceRepository) {
+ mutableResourceId.setId(resourceId);
if (resourceRepository.get(resourceId) == null) {
- resourceRepository.put(resourceId, resource);
+ MutableResourceId newMutableResourceId = new MutableResourceId(resourceId);
+ resourceRepository.put(newMutableResourceId, resource);
// wake up threads waiting for the resource
resourceRepository.notifyAll();
@@ -61,10 +63,10 @@
}
}
- public Object getTransactionalResource(byte[] resourceIdBytes) {
- ByteBuffer buffer = ByteBuffer.wrap(resourceIdBytes);
+ public Object getTransactionalResource(long resourceId) {
synchronized (resourceRepository) {
- while (resourceRepository.get(buffer) == null) {
+ mutableResourceId.setId(resourceId);
+ while (resourceRepository.get(mutableResourceId) == null) {
try {
resourceRepository.wait();
} catch (InterruptedException ie) {
@@ -73,7 +75,7 @@
// failures occurring elsewhere, break from the loop
}
}
- return resourceRepository.get(buffer);
+ return resourceRepository.get(mutableResourceId);
}
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/DataUtil.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/DataUtil.java
index 4b8d62f..91ce7ed5 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/DataUtil.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/DataUtil.java
@@ -47,6 +47,13 @@
return bytes;
}
+ public static long byteArrayToLong(byte[] bytes, int offset) {
+ return ((bytes[offset] & 0xff) << 56) + ((bytes[offset + 1] & 0xff) << 48) + ((bytes[offset + 2] & 0xff) << 40)
+ + ((bytes[offset + 3] & 0xff) << 32) + ((bytes[offset + 4] & 0xff) << 24)
+ + ((bytes[offset + 5] & 0xff) << 16) + ((bytes[offset + 6] & 0xff) << 8)
+ + ((bytes[offset + 7] & 0xff) << 0);
+ }
+
public static byte[] longToByteArray(long value) {
byte[] bytes = new byte[8];
bytes[0] = (byte) ((value >>> 56) & 0xFF);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogManager.java
index 1158029..40f716c 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogManager.java
@@ -15,51 +15,30 @@
package edu.uci.ics.asterix.transaction.management.service.logging;
import java.io.IOException;
-import java.util.Map;
import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
+import edu.uci.ics.asterix.transaction.management.service.logging.IndexLogger.ReusableLogContentObject;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionProvider;
public interface ILogManager {
/**
- * An API to write a log record.
- *
- * @param logicalLogLocator
- * A reusable object passed in by the caller. When the call
- * returns, this object has the physical location of the log
- * record that was written.
- * @param context
- * the transaction context associated with the transaction that
- * is writing the log record
- * @param resourceMgrId
- * the unique identifier of the resource manager that would be
- * handling (interpreting) the log record if there is a need to
- * process and apply the log record during a redo/undo task.
- * @param pageId
- * the unique identifier of the page where the operation
- * corresponding to the log record is applied
* @param logType
- * the type of log record (@see LogType)
- * @param logActionType
- * the action that needs to be taken when processing the log
- * record (@see LogActionType)
- * @param length
- * the length of the content inside the log record. This does not
- * include the header or the checksum size.
+ * @param context
+ * @param datasetId
+ * @param PKHashValue
+ * @param resourceId
+ * @param resourceMgrId
+ * @param logContentSize
+ * @param reusableLogContentObject
* @param logger
- * an implementation of the @see ILogger interface that is
- * invoked by the ILogManager instance to get the actual content
- * for the log record.
- * @param loggerArguments
- * Represent any additional arguments that needs to be passed
- * back in the call the to ILogger interface APIs.
+ * @param logicalLogLocator
* @throws ACIDException
*/
- public void log(LogicalLogLocator logicalLogLocator, TransactionContext context, byte resourceMgrId, long pageId,
- byte logType, byte logActionType, int length, ILogger logger, Map<Object, Object> loggerArguments)
- throws ACIDException;
+ void log(byte logType, TransactionContext context, int datasetId, int PKHashValue, long resourceId,
+ byte resourceMgrId, int logContentSize, ReusableLogContentObject reusableLogContentObject, ILogger logger,
+ LogicalLogLocator logicalLogLocator) throws ACIDException;
/**
* @param physicalLogLocator
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogRecordHelper.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogRecordHelper.java
index ab4bfea..4f28c7d 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogRecordHelper.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogRecordHelper.java
@@ -25,38 +25,41 @@
public interface ILogRecordHelper {
- public byte getLogType(LogicalLogLocator logicalLogLocator);
+ byte getLogType(LogicalLogLocator logicalLogLocator);
- public int getLogLength(LogicalLogLocator logicalLogLocator);
+ int getJobId(LogicalLogLocator logicalLogLocator);
- public long getLogTimestamp(LogicalLogLocator logicalLogLocator);
+ int getDatasetId(LogicalLogLocator logicalLogLocator);
- public long getLogChecksum(LogicalLogLocator logicalLogLocator);
+ int getPKHashValue(LogicalLogLocator logicalLogLocator);
- public long getLogTransactionId(LogicalLogLocator logicalLogLocator);
+ PhysicalLogLocator getPrevLSN(LogicalLogLocator logicalLogLocator);
- public byte getResourceMgrId(LogicalLogLocator logicalLogLocator);
+ boolean getPrevLSN(PhysicalLogLocator physicalLogLocator, LogicalLogLocator logicalLogLocator);
+
+ long getResourceId(LogicalLogLocator logicalLogLocator);
+
+ byte getResourceMgrId(LogicalLogLocator logicalLogLocater);
- public long getPageId(LogicalLogLocator logicalLogLocator);
+ int getLogRecordSize(LogicalLogLocator logicalLogLocater);
- public int getLogContentBeginPos(LogicalLogLocator logicalLogLocator);
+ long getLogChecksum(LogicalLogLocator logicalLogLocator);
- public int getLogContentEndPos(LogicalLogLocator logicalLogLocator);
+ int getLogContentBeginPos(LogicalLogLocator logicalLogLocator);
- public String getLogRecordForDisplay(LogicalLogLocator logicalLogLocator);
+ int getLogContentEndPos(LogicalLogLocator logicalLogLocator);
- public byte getLogActionType(LogicalLogLocator logicalLogLocator);
+ String getLogRecordForDisplay(LogicalLogLocator logicalLogLocator);
- public PhysicalLogLocator getPreviousLsnByTransaction(LogicalLogLocator logicalLogLocator);
+ void writeLogHeader(LogicalLogLocator logicalLogLocator, byte logType, TransactionContext context, int datasetId,
+ int PKHashValue, long prevLogicalLogLocator, long resourceId, byte resourceMgrId, int logRecordSize);
- public boolean getPreviousLsnByTransaction(PhysicalLogLocator physicalLogLocator,
- LogicalLogLocator logicalLogLocator);
+ boolean validateLogRecord(LogicalLogLocator logicalLogLocator);
- public void writeLogHeader(TransactionContext context, LogicalLogLocator logicalLogLocator, byte resourceMgrId,
- long pageId, byte logType, byte logActionType, int logContentSize, long prevLsnValue);
+ int getLogRecordSize(byte logType, int logBodySize);
- public void writeLogTail(LogicalLogLocator logicalLogLocator, ILogManager logManager);
+ int getLogHeaderSize(byte logType);
- public boolean validateLogRecord(LogManagerProperties logManagerProperties, LogicalLogLocator logicalLogLocator);
+ int getLogChecksumSize();
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogger.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogger.java
index 92984cb..e26a3cc 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogger.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogger.java
@@ -14,9 +14,9 @@
*/
package edu.uci.ics.asterix.transaction.management.service.logging;
-import java.util.Map;
import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
+import edu.uci.ics.asterix.transaction.management.service.logging.IndexLogger.ReusableLogContentObject;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
/**
@@ -25,11 +25,11 @@
*/
public interface ILogger {
- public void preLog(TransactionContext context, Map<Object, Object> loggerArguments) throws ACIDException;
+ public void preLog(TransactionContext context, ReusableLogContentObject reusableLogContentObject) throws ACIDException;
- public void log(TransactionContext context, final LogicalLogLocator logicalLogLocator, int logRecordSize,
- Map<Object, Object> loggerArguments) throws ACIDException;
+ public void log(TransactionContext context, final LogicalLogLocator logicalLogLocator, int logContentSize,
+ ReusableLogContentObject reusableLogContentObject) throws ACIDException;
- public void postLog(TransactionContext context, Map<Object, Object> loggerArguments) throws ACIDException;
+ public void postLog(TransactionContext context, ReusableLogContentObject reusableLogContentObject) throws ACIDException;
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexLogger.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexLogger.java
new file mode 100644
index 0000000..48a9dbf
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexLogger.java
@@ -0,0 +1,259 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.transaction.management.service.logging;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
+import edu.uci.ics.asterix.transaction.management.resource.ICloseable;
+import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager.ResourceType;
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
+import edu.uci.ics.hyracks.storage.am.common.tuples.SimpleTupleWriter;
+
+
+
+public class IndexLogger implements ILogger, ICloseable {
+
+ private final Map<Object, Object> jobId2ReusableLogContentObjectRepositoryMap = new ConcurrentHashMap<Object, Object>();
+
+ public static final String TREE_INDEX = "TREE_INDEX";
+ public static final String TUPLE_REFERENCE = "TUPLE_REFERENCE";
+ public static final String TUPLE_WRITER = "TUPLE_WRITER";
+ public static final String INDEX_OPERATION = "INDEX_OPERATION";
+ public static final String RESOURCE_ID = "RESOURCE_ID";
+
+ private final long resourceId;
+ private final byte resourceType;
+ private final SimpleTupleWriter tupleWriter;
+
+ public class BTreeOperationCodes {
+ public static final byte INSERT = 0;
+ public static final byte DELETE = 1;
+ }
+
+ public IndexLogger(long resourceId, byte resourceType, IIndex index) {
+ this.resourceId = resourceId;
+ this.resourceType = resourceType;
+ this.tupleWriter = new SimpleTupleWriter();
+ }
+
+ public synchronized void close(TransactionContext context) {
+ ReusableLogContentObjectRepository txnThreadStateRepository = (ReusableLogContentObjectRepository) jobId2ReusableLogContentObjectRepositoryMap
+ .get(context.getJobId());
+ txnThreadStateRepository.remove(Thread.currentThread().getId());
+ jobId2ReusableLogContentObjectRepositoryMap.remove(context.getJobId());
+ }
+
+ public void generateLogRecord(TransactionProvider provider, TransactionContext context, int datasetId,
+ int PKHashValue, long resourceId, IndexOperation newOperation, ITupleReference newValue,
+ IndexOperation oldOperation, ITupleReference oldValue) throws ACIDException {
+
+ if (this.resourceId != resourceId) {
+ throw new ACIDException("IndexLogger mistach");
+ }
+
+ context.addCloseableResource(this); // the close method would be called
+ // on this TreeLogger instance at
+ // the time of transaction
+ // commit/abort.
+ if (newOperation != IndexOperation.INSERT && newOperation != IndexOperation.DELETE) {
+ throw new ACIDException("Loging for Operation " + newOperation + " not supported");
+ }
+
+ ReusableLogContentObject reusableLogContentObject = null;
+ ReusableLogContentObjectRepository reusableLogContentObjectRepository = null;
+ reusableLogContentObjectRepository = (ReusableLogContentObjectRepository) jobId2ReusableLogContentObjectRepositoryMap
+ .get(context.getJobId());
+ if (reusableLogContentObjectRepository == null) {
+ synchronized (context) { // threads belonging to different
+ // transaction do not need to
+ // synchronize amongst them.
+ if (reusableLogContentObjectRepository == null) {
+ reusableLogContentObjectRepository = new ReusableLogContentObjectRepository();
+ jobId2ReusableLogContentObjectRepositoryMap.put(context.getJobId(),
+ reusableLogContentObjectRepository);
+ }
+ }
+ }
+
+ reusableLogContentObject = reusableLogContentObjectRepository.getObject(Thread.currentThread().getId());
+ if (reusableLogContentObject == null) {
+ LogicalLogLocator logicalLogLocator = LogUtil.getDummyLogicalLogLocator(provider.getLogManager());
+ reusableLogContentObject = new ReusableLogContentObject(logicalLogLocator, newOperation, newValue,
+ oldOperation, oldValue);
+ reusableLogContentObjectRepository.putObject(Thread.currentThread().getId(), reusableLogContentObject);
+ } else {
+ reusableLogContentObject.setNewOperation(newOperation);
+ reusableLogContentObject.setNewValue(newValue);
+ reusableLogContentObject.setOldOperation(oldOperation);
+ reusableLogContentObject.setOldValue(oldValue);
+ }
+
+ int logContentSize = 4/*TupleFieldCount*/+ 1/*NewOperation*/+ 4/*newValueLength*/+ tupleWriter
+ .bytesRequired(newValue);
+ logContentSize += 1/*OldOperation*/+ 4/*oldValueLength*/+ tupleWriter.bytesRequired(oldValue);
+
+ provider.getLogManager().log(LogType.UPDATE, context, datasetId, PKHashValue, resourceId, resourceType,
+ logContentSize, reusableLogContentObject, this, reusableLogContentObject.getLogicalLogLocator());
+ }
+
+ @Override
+ public void log(TransactionContext context, LogicalLogLocator logicalLogLocator, int logContentSize,
+ ReusableLogContentObject reusableLogContentObject) throws ACIDException {
+ int offset = 0;
+ int tupleSize = 0;
+
+ //tuple field count
+ (logicalLogLocator.getBuffer()).writeInt(logicalLogLocator.getMemoryOffset() + offset, reusableLogContentObject
+ .getNewValue().getFieldCount());
+ offset += 4;
+
+ //new operation
+ (logicalLogLocator.getBuffer()).put(logicalLogLocator.getMemoryOffset() + offset,
+ (byte) reusableLogContentObject.getNewOperation().ordinal());
+ offset += 1;
+
+ //new tuple size
+ tupleSize = tupleWriter.bytesRequired(reusableLogContentObject.getNewValue());
+ (logicalLogLocator.getBuffer()).writeInt(logicalLogLocator.getMemoryOffset() + offset, tupleSize);
+ offset += 4;
+
+ //new tuple
+ tupleWriter.writeTuple(reusableLogContentObject.getNewValue(), logicalLogLocator.getBuffer().getArray(),
+ logicalLogLocator.getMemoryOffset() + offset);
+ offset += tupleSize;
+
+ if (resourceType == ResourceType.LSM_BTREE) {
+ //old operation
+ (logicalLogLocator.getBuffer()).put(logicalLogLocator.getMemoryOffset() + offset,
+ (byte) reusableLogContentObject.getOldOperation().ordinal());
+ offset += 1;
+
+ if (reusableLogContentObject.getOldOperation() != IndexOperation.NOOP) {
+ //old tuple size
+ tupleSize = tupleWriter.bytesRequired(reusableLogContentObject.getOldValue());
+ (logicalLogLocator.getBuffer()).writeInt(logicalLogLocator.getMemoryOffset() + offset, tupleSize);
+ offset += 4;
+
+ //old tuple
+ tupleWriter.writeTuple(reusableLogContentObject.getNewValue(),
+ logicalLogLocator.getBuffer().getArray(), logicalLogLocator.getMemoryOffset() + offset);
+ }
+ }
+ }
+
+ @Override
+ public void postLog(TransactionContext context, ReusableLogContentObject reusableLogContentObject)
+ throws ACIDException {
+ }
+
+ @Override
+ public void preLog(TransactionContext context, ReusableLogContentObject reusableLogContentObject)
+ throws ACIDException {
+ }
+
+ /**
+ * Represents a utility class for generating log records corresponding to
+ * operations on a ITreeIndex implementation. A TreeLogger instance is thread
+ * safe and can be shared across multiple threads that may belong to same or
+ * different transactions.
+ */
+ public class ReusableLogContentObjectRepository {
+
+ private final Map<Long, ReusableLogContentObject> id2Object = new HashMap<Long, ReusableLogContentObject>();
+
+ public synchronized ReusableLogContentObject getObject(long threadId) {
+ return id2Object.get(threadId);
+ }
+
+ public synchronized void putObject(long threadId, ReusableLogContentObject reusableLogContentObject) {
+ this.id2Object.put(threadId, reusableLogContentObject);
+ }
+
+ public synchronized void remove(long threadId) {
+ id2Object.remove(threadId);
+ }
+ }
+
+ /**
+ * Represents the state of a transaction thread. The state contains information
+ * that includes the tuple being operated, the operation and the location of the
+ * log record corresponding to the operation.
+ */
+ public class ReusableLogContentObject {
+
+ private LogicalLogLocator logicalLogLocator;
+ private IndexOperation newOperation;
+ private ITupleReference newValue;
+ private IndexOperation oldOperation;
+ private ITupleReference oldValue;
+
+ public ReusableLogContentObject(LogicalLogLocator logicalLogLocator, IndexOperation newOperation,
+ ITupleReference newValue, IndexOperation oldOperation, ITupleReference oldValue) {
+ this.logicalLogLocator = logicalLogLocator;
+ this.newOperation = newOperation;
+ this.newValue = newValue;
+ this.oldOperation = oldOperation;
+ this.oldValue = oldValue;
+ }
+
+ public synchronized LogicalLogLocator getLogicalLogLocator() {
+ return logicalLogLocator;
+ }
+
+ public synchronized void setLogicalLogLocator(LogicalLogLocator logicalLogLocator) {
+ this.logicalLogLocator = logicalLogLocator;
+ }
+
+ public synchronized void setNewOperation(IndexOperation newOperation) {
+ this.newOperation = newOperation;
+ }
+
+ public synchronized IndexOperation getNewOperation() {
+ return newOperation;
+ }
+
+ public synchronized void setNewValue(ITupleReference newValue) {
+ this.newValue = newValue;
+ }
+
+ public synchronized ITupleReference getNewValue() {
+ return newValue;
+ }
+
+ public synchronized void setOldOperation(IndexOperation oldOperation) {
+ this.oldOperation = oldOperation;
+ }
+
+ public synchronized IndexOperation getOldOperation() {
+ return oldOperation;
+ }
+
+ public synchronized void setOldValue(ITupleReference oldValue) {
+ this.oldValue = oldValue;
+ }
+
+ public synchronized ITupleReference getOldValue() {
+ return oldValue;
+ }
+ }
+
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexLoggerRepository.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexLoggerRepository.java
new file mode 100644
index 0000000..54790b3
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexLoggerRepository.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.transaction.management.service.logging;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import edu.uci.ics.asterix.transaction.management.service.transaction.MutableResourceId;
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
+
+public class IndexLoggerRepository {
+
+ private final Map<MutableResourceId, IndexLogger> loggers = new HashMap<MutableResourceId, IndexLogger>();
+ private final TransactionProvider provider;
+ private MutableResourceId mutableResourceId;
+
+ public IndexLoggerRepository(TransactionProvider provider) {
+ this.provider = provider;
+ mutableResourceId = new MutableResourceId(0);
+ }
+
+ public synchronized IndexLogger getIndexLogger(long resourceId, byte resourceType) {
+ mutableResourceId.setId(resourceId);
+ IndexLogger logger = loggers.get(mutableResourceId);
+ if (logger == null) {
+ MutableResourceId newMutableResourceId = new MutableResourceId(resourceId);
+ IIndex index = (IIndex) provider.getTransactionalResourceRepository().getTransactionalResource(resourceId);
+ logger = new IndexLogger(resourceId, resourceType, index);
+ loggers.put(newMutableResourceId, logger);
+ }
+ return logger;
+ }
+}
\ No newline at end of file
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexResourceManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexResourceManager.java
new file mode 100644
index 0000000..19f055c
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexResourceManager.java
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.transaction.management.service.logging;
+
+import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
+import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager;
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
+import edu.uci.ics.hyracks.storage.am.common.tuples.SimpleTupleReference;
+import edu.uci.ics.hyracks.storage.am.common.tuples.SimpleTupleWriter;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+
+public class IndexResourceManager implements IResourceManager {
+
+ public final byte resourceType;
+
+ private final TransactionProvider provider;
+
+ public IndexResourceManager(byte resourceType, TransactionProvider provider) {
+ this.resourceType = resourceType;
+ this.provider = provider;
+ }
+
+ public byte getResourceManagerId() {
+ return resourceType;
+ }
+
+ public void undo(ILogRecordHelper logRecordHelper, LogicalLogLocator logLocator) throws ACIDException {
+ long resourceId = logRecordHelper.getResourceId(logLocator);
+ int offset = logRecordHelper.getLogContentBeginPos(logLocator);
+
+ /*
+ byte[] logBufferContent = logLocator.getBuffer().getArray();
+ // read the length of resource id byte array
+ int resourceIdLength = DataUtil.byteArrayToInt(logBufferContent, logContentBeginPos);
+ byte[] resourceIdBytes = new byte[resourceIdLength];
+
+ // copy the resource if bytes
+ System.arraycopy(logBufferContent, logContentBeginPos + 4, resourceIdBytes, 0, resourceIdLength);
+ */
+
+ // look up the repository to obtain the resource object
+ IIndex index = (IIndex) provider.getTransactionalResourceRepository().getTransactionalResource(resourceId);
+
+ /* field count */
+ int fieldCount = logLocator.getBuffer().readInt(logLocator.getMemoryOffset() + offset);
+ offset += 4;
+
+ /* new operation */
+ byte newOperation = logLocator.getBuffer().getByte(logLocator.getMemoryOffset() + offset);
+ offset += 1;
+
+ /* new value size */
+ int newValueSize = logLocator.getBuffer().readInt(logLocator.getMemoryOffset() + offset);
+ offset += 4;
+
+ /* new value */
+ SimpleTupleWriter tupleWriter = new SimpleTupleWriter();
+ SimpleTupleReference newTuple = (SimpleTupleReference) tupleWriter.createTupleReference();
+ newTuple.setFieldCount(fieldCount);
+ newTuple.resetByTupleOffset(logLocator.getBuffer().getByteBuffer(), offset);
+ offset += newValueSize;
+
+ ILSMIndexAccessor indexAccessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+ NoOpOperationCallback.INSTANCE);
+
+ try {
+ if (resourceType == ResourceType.LSM_BTREE) {
+
+ /* old operation */
+ byte oldOperation = logLocator.getBuffer().getByte(logLocator.getMemoryOffset() + offset);
+ offset += 1;
+
+ if (oldOperation != (byte) IndexOperation.NOOP.ordinal()) {
+ /* old value size */
+ int oldValueSize = logLocator.getBuffer().readInt(logLocator.getMemoryOffset() + offset);
+ offset += 4;
+
+ /* old value */
+ SimpleTupleReference oldTuple = (SimpleTupleReference) tupleWriter.createTupleReference();
+ oldTuple.setFieldCount(fieldCount);
+ oldTuple.resetByTupleOffset(logLocator.getBuffer().getByteBuffer(), offset);
+ offset += oldValueSize;
+
+ if (oldOperation == (byte) IndexOperation.DELETE.ordinal()) {
+ indexAccessor.delete(oldTuple);
+ } else {
+ indexAccessor.insert(oldTuple);
+ }
+ } else {
+ indexAccessor.physicalDelete(newTuple);
+ }
+ } else {
+ //For LSMRtree and LSMInvertedIndex
+ //delete --> physical delete
+ //insert --> logical delete
+ if (newOperation == (byte) IndexOperation.DELETE.ordinal()) {
+ indexAccessor.physicalDelete(newTuple);
+ } else {
+ indexAccessor.delete(newTuple);
+ }
+ }
+ } catch (Exception e) {
+ throw new ACIDException("Undo failed", e);
+ }
+ }
+
+ public void redo(ILogRecordHelper logRecordHelper, LogicalLogLocator logicalLogLocator) throws ACIDException {
+ throw new UnsupportedOperationException(" Redo logic will be implemented as part of crash recovery feature");
+ }
+
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java
index 7ba9130..8e89301 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java
@@ -77,8 +77,11 @@
int integerRead = -1;
boolean logRecordBeginPosFound = false;
long bytesSkipped = 0;
+
+ //check whether the currentOffset has enough space to have new log record by comparing
+ //the smallest log record type(which is commit)'s log header.
while (logicalLogLocator.getMemoryOffset() <= readOnlyBuffer.getSize()
- - logManager.getLogManagerProperties().getLogHeaderSize()) {
+ - logManager.getLogRecordHelper().getLogHeaderSize(LogType.COMMIT)) {
integerRead = readOnlyBuffer.readInt(logicalLogLocator.getMemoryOffset());
if (integerRead == logManager.getLogManagerProperties().logMagicNumber) {
logRecordBeginPosFound = true;
@@ -109,8 +112,8 @@
}
}
- int logLength = logManager.getLogRecordHelper().getLogLength(logicalLogLocator);
- if (logManager.getLogRecordHelper().validateLogRecord(logManager.getLogManagerProperties(), logicalLogLocator)) {
+ int logLength = logManager.getLogRecordHelper().getLogRecordSize(logicalLogLocator);
+ if (logManager.getLogRecordHelper().validateLogRecord(logicalLogLocator)) {
if (nextLogicalLogLocator == null) {
nextLogicalLogLocator = new LogicalLogLocator(0, readOnlyBuffer, -1, logManager);
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index 1d1e067..5736105 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -21,7 +21,6 @@
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
-import java.util.Map;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
@@ -30,6 +29,7 @@
import java.util.logging.Logger;
import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
+import edu.uci.ics.asterix.transaction.management.service.logging.IndexLogger.ReusableLogContentObject;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionProvider;
@@ -495,33 +495,33 @@
}
}
- public void log(LogicalLogLocator logLocator, TransactionContext context, byte resourceMgrId, long pageId,
- byte logType, byte logActionType, int requestedSpaceForLog, ILogger logger,
- Map<Object, Object> loggerArguments) throws ACIDException {
+ @Override
+ public void log(byte logType, TransactionContext context, int datasetId, int PKHashValue, long resourceId,
+ byte resourceMgrId, int logContentSize, ReusableLogContentObject reusableLogContentObject,
+ ILogger logger, LogicalLogLocator logicalLogLocator) throws ACIDException {
/*
* logLocator is a re-usable object that is appropriately set in each
* invocation. If the reference is null, the log manager must throw an
* exception
*/
- if (logLocator == null) {
+ if (logicalLogLocator == null) {
throw new ACIDException(
" you need to pass in a non-null logLocator, if you dont have it, then pass in a dummy so that the +"
+ "log manager can set it approporiately for you");
}
// compute the total log size including the header and the checksum.
- int totalLogSize = logManagerProperties.getLogHeaderSize() + requestedSpaceForLog
- + logManagerProperties.getLogChecksumSize();
+ int totalLogSize = logRecordHelper.getLogRecordSize(logType, logContentSize);
// check for the total space requirement to be less than a log page.
if (totalLogSize > logManagerProperties.getLogPageSize()) {
throw new ACIDException(
" Maximum Log Content Size is "
- + (logManagerProperties.getLogPageSize() - logManagerProperties.getLogHeaderSize() - logManagerProperties
+ + (logManagerProperties.getLogPageSize() - logRecordHelper.getLogHeaderSize(LogType.UPDATE) - logRecordHelper
.getLogChecksumSize()));
}
- // all constraints checked and we are goot to go and acquire a lsn.
+ // all constraints checked and we are good to go and acquire a lsn.
long previousLogLocator = -1;
long myLogLocator; // the will be set to the location (a long value)
// where the log record needs to be placed.
@@ -537,7 +537,7 @@
previousLogLocator = context.getLastLogLocator().getLsn();
myLogLocator = getLsn(totalLogSize, logType);
context.getLastLogLocator().setLsn(myLogLocator);
- logLocator.setLsn(myLogLocator);
+ logicalLogLocator.setLsn(myLogLocator);
}
/*
@@ -564,43 +564,43 @@
try {
- logLocator.setBuffer(logPages[pageIndex]);
+ logicalLogLocator.setBuffer(logPages[pageIndex]);
int pageOffset = getLogPageOffset(myLogLocator);
- logLocator.setMemoryOffset(pageOffset);
+ logicalLogLocator.setMemoryOffset(pageOffset);
/*
* write the log header.
*/
- logRecordHelper.writeLogHeader(context, logLocator, resourceMgrId, pageId, logType, logActionType,
- requestedSpaceForLog, previousLogLocator);
-
+ logRecordHelper.writeLogHeader(logicalLogLocator, logType, context, datasetId, PKHashValue, previousLogLocator,
+ resourceId, resourceMgrId, logContentSize);
+
// increment the offset so that the transaction can fill up the
// content in the correct region of the allocated space.
- logLocator.increaseMemoryOffset(logManagerProperties.getLogHeaderSize());
+ logicalLogLocator.increaseMemoryOffset(logRecordHelper.getLogHeaderSize(logType));
// a COMMIT log record does not have any content
// and hence the logger (responsible for putting the log content) is
// not invoked.
- if (requestedSpaceForLog != 0) {
- logger.preLog(context, loggerArguments);
+ if (logContentSize != 0) {
+ logger.preLog(context, reusableLogContentObject);
}
- if (requestedSpaceForLog != 0) {
+ if (logContentSize != 0) {
// call the logger implementation and ask to fill in the log
// record content at the allocated space.
- logger.log(context, logLocator, requestedSpaceForLog, loggerArguments);
- logger.postLog(context, loggerArguments);
+ logger.log(context, logicalLogLocator, logContentSize, reusableLogContentObject);
+ logger.postLog(context, reusableLogContentObject);
}
/*
* The log record has been written. For integrity checks, compute
* the checksum and put it at the end of the log record.
*/
- int startPosChecksum = logLocator.getMemoryOffset() - logManagerProperties.getLogHeaderSize();
- int length = totalLogSize - logManagerProperties.getLogChecksumSize();
+ int startPosChecksum = logicalLogLocator.getMemoryOffset() - logRecordHelper.getLogHeaderSize(logType);
+ int length = totalLogSize - logRecordHelper.getLogChecksumSize();
long checksum = DataUtil.getChecksum(logPages[pageIndex], startPosChecksum, length);
- logPages[pageIndex].writeLong(pageOffset + logManagerProperties.getLogHeaderSize() + requestedSpaceForLog,
- checksum);
+ logPages[pageIndex].writeLong(pageOffset + logRecordHelper.getLogHeaderSize(logType)
+ + logContentSize, checksum);
/*
* release the ownership as the log record has been placed in
@@ -704,7 +704,7 @@
buffer.limit(buffer.getInt(4));
MemBasedBuffer memBuffer = new MemBasedBuffer(buffer.slice());
logicalLogLocator = new LogicalLogLocator(physicalLogLocator.getLsn(), memBuffer, 0, this);
- if (!logRecordHelper.validateLogRecord(logManagerProperties, logicalLogLocator)) {
+ if (!logRecordHelper.validateLogRecord(logicalLogLocator)) {
throw new ACIDException(" invalid log record at lsn " + physicalLogLocator.getLsn());
}
} catch (Exception fnfe) {
@@ -761,7 +761,7 @@
logLocator = new LogicalLogLocator(lsnValue, memBuffer, 0, this);
try {
// validate the log record by comparing checksums
- if (!logRecordHelper.validateLogRecord(logManagerProperties, logLocator)) {
+ if (!logRecordHelper.validateLogRecord(logLocator)) {
throw new ACIDException(" invalid log record at lsn " + physicalLogLocator);
}
} catch (Exception e) {
@@ -779,6 +779,7 @@
return readDiskLog(physicalLogLocator);
}
+ @Override
public ILogRecordHelper getLogRecordHelper() {
return logRecordHelper;
}
@@ -789,6 +790,7 @@
* logic to event based when log manager support is integrated with the
* Buffer Manager.
*/
+ @Override
public synchronized void flushLog(LogicalLogLocator logicalLogLocator) throws ACIDException {
if (logicalLogLocator.getLsn() > lsn.get()) {
throw new ACIDException(" invalid lsn " + logicalLogLocator.getLsn());
@@ -828,6 +830,7 @@
return pageNo == 0 ? numLogPages - 1 : pageNo - 1;
}
+ @Override
public LogManagerProperties getLogManagerProperties() {
return logManagerProperties;
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java
index 14b45b6..fe48743 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java
@@ -48,14 +48,6 @@
// flushing.
private int logBufferSize = logPageSize * numLogPages;
- private final int logHeaderSize = 43; /*
- * ( magic number(4) + (length(4) +
- * type(1) + actionType(1) +
- * timestamp(8) + transacitonId(8) +
- * resourceMgrId(1) + pageId(8) +
- * prevLSN(8)
- */
- private int logTailSize = 8; /* checksum(8) */
public int logMagicNumber = 123456789;
public static final String LOG_PARTITION_SIZE_KEY = "log_partition_size";
@@ -104,18 +96,6 @@
return logDir;
}
- public int getLogHeaderSize() {
- return logHeaderSize;
- }
-
- public int getLogChecksumSize() {
- return logTailSize;
- }
-
- public int getTotalLogRecordLength(int logContentSize) {
- return logContentSize + logHeaderSize + logTailSize;
- }
-
public int getLogPageSize() {
return logPageSize;
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java
index 7575b45..ef8fc54 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java
@@ -21,84 +21,83 @@
* for writing/reading of log header and checksum as well as validating log
* record by checksum comparison. Every ILogManager implementation has an
* associated ILogRecordHelper implementation.
+ * == LogRecordFormat ==
+ * [Header]
+ * --------------------------- Header part1(17) : Both COMMIT and UPDATE log type have part1 fields
+ * LogMagicNumber(4)
+ * LogType(1)
+ * JobId(4)
+ * DatasetId(4) //stored in dataset_dataset in Metadata Node
+ * PKHashValue(4)
+ * --------------------------- Header part2(21) : Only UPDATE log type has part2 fields
+ * PrevLSN(8) //only for UPDATE
+ * ResourceId(8) //stored in .metadata of the corresponding index in NC node
+ * ResourceMgrId(1)
+ * LogRecordSize(4)
+ * --------------------------- COMMIT doesn't have Body fields.
+ * [Body] The Body size is given through the parameter reusableLogContentObjectLength
+ * NewOp(1)
+ * NewValueLength(4)
+ * NewValue(NewValueLength)
+ * OldOp(1)
+ * OldValueLength(4)
+ * OldValue(OldValueLength)
+ * --------------------------- Both COMMIT and UPDATE have tail fields.
+ * [Tail]
+ * Checksum(8)
*/
public class LogRecordHelper implements ILogRecordHelper {
- private final int BEGIN_MAGIC_NO_POS = 0;
- private final int BEGING_LENGTH_POS = 4;
- private final int BEGIN_TYPE_POS = 8;
- private final int BEGIN_ACTION_TYPE_POS = 9;
- private final int BEGIN_TIMESTAMP_POS = 10;
- private final int BEGIN_TRANSACTION_ID_POS = 18;
- /*
- private final int BEGIN_RESOURCE_MGR_ID_POS = 26;
- private final int BEGIN_PAGE_ID_POS = 27;
- private final int BEGIN_PREV_LSN_POS = 35;
- */
- private final int BEGIN_RESOURCE_MGR_ID_POS = 22;
- private final int BEGIN_PAGE_ID_POS = 23;
- private final int BEGIN_PREV_LSN_POS = 31;
+ private final int LOG_CHECKSUM_SIZE = 8;
-
+ private final int MAGIC_NO_POS = 0;
+ private final int LOG_TYPE_POS = 4;
+ private final int JOB_ID_POS = 5;
+ private final int DATASET_ID_POS = 9;
+ private final int PK_HASH_VALUE_POS = 13;
+ private final int PREV_LSN_POS = 17;
+ private final int RESOURCE_ID_POS = 25;
+ private final int RESOURCE_MGR_ID_POS = 33;
+ private final int LOG_RECORD_SIZE_POS = 34;
+
private ILogManager logManager;
public LogRecordHelper(ILogManager logManager) {
this.logManager = logManager;
}
+ @Override
public byte getLogType(LogicalLogLocator logicalLogLocator) {
- return logicalLogLocator.getBuffer().getByte(logicalLogLocator.getMemoryOffset() + BEGIN_TYPE_POS);
+ return logicalLogLocator.getBuffer().getByte(logicalLogLocator.getMemoryOffset() + LOG_TYPE_POS);
}
- public byte getLogActionType(LogicalLogLocator logicalLogLocator) {
- return logicalLogLocator.getBuffer().getByte(logicalLogLocator.getMemoryOffset() + BEGIN_ACTION_TYPE_POS);
+ @Override
+ public int getJobId(LogicalLogLocator logicalLogLocator) {
+ return logicalLogLocator.getBuffer().readInt(logicalLogLocator.getMemoryOffset() + JOB_ID_POS);
}
- public int getLogLength(LogicalLogLocator logicalLogLocator) {
- return (logicalLogLocator.getBuffer()).readInt(logicalLogLocator.getMemoryOffset() + BEGING_LENGTH_POS);
+ @Override
+ public int getDatasetId(LogicalLogLocator logicalLogLocator) {
+ return logicalLogLocator.getBuffer().readInt(logicalLogLocator.getMemoryOffset() + DATASET_ID_POS);
}
- public long getLogTimestamp(LogicalLogLocator logicalLogLocator) {
- return (logicalLogLocator.getBuffer()).readLong(logicalLogLocator.getMemoryOffset() + BEGIN_TIMESTAMP_POS);
+ @Override
+ public int getPKHashValue(LogicalLogLocator logicalLogLocator) {
+ return logicalLogLocator.getBuffer().readInt(logicalLogLocator.getMemoryOffset() + PK_HASH_VALUE_POS);
}
- public long getLogChecksum(LogicalLogLocator logicalLogLocator) {
- return (logicalLogLocator.getBuffer()).readLong(logicalLogLocator.getMemoryOffset()
- + getLogLength(logicalLogLocator) - 8);
- }
-
- public long getLogTransactionId(LogicalLogLocator logicalLogLocator) {
- return (logicalLogLocator.getBuffer()).readInt(logicalLogLocator.getMemoryOffset() + BEGIN_TRANSACTION_ID_POS);
- }
-
- public byte getResourceMgrId(LogicalLogLocator logicalLogLocator) {
- return (logicalLogLocator.getBuffer()).getByte(logicalLogLocator.getMemoryOffset() + BEGIN_RESOURCE_MGR_ID_POS);
- }
-
- public long getPageId(LogicalLogLocator logicalLogLocator) {
- return (logicalLogLocator.getBuffer()).readLong(logicalLogLocator.getMemoryOffset() + BEGIN_PAGE_ID_POS);
- }
-
- public int getLogContentBeginPos(LogicalLogLocator logicalLogLocator) {
- return logicalLogLocator.getMemoryOffset() + logManager.getLogManagerProperties().getLogHeaderSize();
- }
-
- public int getLogContentEndPos(LogicalLogLocator logicalLogLocator) {
- return logicalLogLocator.getMemoryOffset() + getLogLength(logicalLogLocator)
- - logManager.getLogManagerProperties().getLogChecksumSize();
- }
-
- public PhysicalLogLocator getPreviousLsnByTransaction(LogicalLogLocator logicalLogLocator) {
- long prevLsnValue = (logicalLogLocator.getBuffer()).readLong(logicalLogLocator.getMemoryOffset()
- + BEGIN_PREV_LSN_POS);
+ @Override
+ public PhysicalLogLocator getPrevLSN(LogicalLogLocator logicalLogLocator) {
+ long prevLsnValue = (logicalLogLocator.getBuffer())
+ .readLong(logicalLogLocator.getMemoryOffset() + PREV_LSN_POS);
PhysicalLogLocator previousLogLocator = new PhysicalLogLocator(prevLsnValue, logManager);
return previousLogLocator;
}
- public boolean getPreviousLsnByTransaction(PhysicalLogLocator physicalLogLocator,
- LogicalLogLocator logicalLogLocator) {
- long prevLsnValue = (logicalLogLocator.getBuffer()).readLong(logicalLogLocator.getMemoryOffset()
- + BEGIN_PREV_LSN_POS);
+ @Override
+ public boolean getPrevLSN(PhysicalLogLocator physicalLogLocator, LogicalLogLocator logicalLogLocator) {
+ long prevLsnValue = (logicalLogLocator.getBuffer())
+ .readLong(logicalLogLocator.getMemoryOffset() + PREV_LSN_POS);
if (prevLsnValue == -1) {
return false;
}
@@ -106,6 +105,38 @@
return true;
}
+ @Override
+ public long getResourceId(LogicalLogLocator logicalLogLocator) {
+ return logicalLogLocator.getBuffer().readLong(logicalLogLocator.getMemoryOffset() + RESOURCE_ID_POS);
+ }
+
+ @Override
+ public byte getResourceMgrId(LogicalLogLocator logicalLogLocater) {
+ return logicalLogLocater.getBuffer().getByte(logicalLogLocater.getMemoryOffset() + RESOURCE_MGR_ID_POS);
+ }
+
+ @Override
+ public int getLogRecordSize(LogicalLogLocator logicalLogLocater) {
+ return logicalLogLocater.getBuffer().readInt(logicalLogLocater.getMemoryOffset() + LOG_RECORD_SIZE_POS);
+ }
+
+ @Override
+ public long getLogChecksum(LogicalLogLocator logicalLogLocator) {
+ return (logicalLogLocator.getBuffer()).readLong(logicalLogLocator.getMemoryOffset()
+ + getLogRecordSize(logicalLogLocator) - LOG_CHECKSUM_SIZE);
+ }
+
+ @Override
+ public int getLogContentBeginPos(LogicalLogLocator logicalLogLocator) {
+ return logicalLogLocator.getMemoryOffset() + getLogHeaderSize(getLogType(logicalLogLocator));
+ }
+
+ @Override
+ public int getLogContentEndPos(LogicalLogLocator logicalLogLocator) {
+ return logicalLogLocator.getMemoryOffset() + getLogRecordSize(logicalLogLocator) - LOG_CHECKSUM_SIZE;
+ }
+
+ @Override
public String getLogRecordForDisplay(LogicalLogLocator logicalLogLocator) {
StringBuilder builder = new StringBuilder();
byte logType = new Byte(getLogType(logicalLogLocator));
@@ -118,66 +149,92 @@
logTypeDisplay = "UPDATE";
break;
}
- builder.append(" Log Type :" + logTypeDisplay);
- builder.append(" Log Length :" + getLogLength(logicalLogLocator));
- builder.append(" Log Timestamp:" + getLogTimestamp(logicalLogLocator));
- builder.append(" Log Transaction Id:" + getLogTransactionId(logicalLogLocator));
- builder.append(" Log Resource Mgr Id:" + getResourceMgrId(logicalLogLocator));
- builder.append(" Page Id:" + getPageId(logicalLogLocator));
- builder.append(" Log Checksum:" + getLogChecksum(logicalLogLocator));
- builder.append(" Log Previous lsn: " + getPreviousLsnByTransaction(logicalLogLocator));
- return new String(builder);
- }
-
- public void writeLogHeader(TransactionContext context, LogicalLogLocator logicalLogLocator, byte resourceMgrId,
- long pageId, byte logType, byte logActionType, int logContentSize, long prevLogicalLogLocator) {
- /* magic no */
- (logicalLogLocator.getBuffer()).writeInt(logicalLogLocator.getMemoryOffset() + BEGIN_MAGIC_NO_POS,
- logManager.getLogManagerProperties().logMagicNumber);
-
- /* length */
- int length = logManager.getLogManagerProperties().getTotalLogRecordLength(logContentSize);
- (logicalLogLocator.getBuffer()).writeInt(logicalLogLocator.getMemoryOffset() + BEGING_LENGTH_POS, length);
-
- /* log type */
- (logicalLogLocator.getBuffer()).put(logicalLogLocator.getMemoryOffset() + BEGIN_TYPE_POS, logType);
-
- /* log action type */
- (logicalLogLocator.getBuffer()).put(logicalLogLocator.getMemoryOffset() + BEGIN_ACTION_TYPE_POS, logActionType);
-
- /* timestamp */
- long timestamp = System.currentTimeMillis();
- (logicalLogLocator.getBuffer()).writeLong(logicalLogLocator.getMemoryOffset() + BEGIN_TIMESTAMP_POS, timestamp);
-
- /* transaction id */
- (logicalLogLocator.getBuffer()).writeInt(logicalLogLocator.getMemoryOffset() + BEGIN_TRANSACTION_ID_POS,
- context.getJobId().getId());
-
- /* resource Mgr id */
- (logicalLogLocator.getBuffer()).put(logicalLogLocator.getMemoryOffset() + BEGIN_RESOURCE_MGR_ID_POS,
- resourceMgrId);
-
- /* page id */
- (logicalLogLocator.getBuffer()).writeLong(logicalLogLocator.getMemoryOffset() + BEGIN_PAGE_ID_POS, pageId);
-
- /* previous LSN's File Id by the transaction */
- (logicalLogLocator.getBuffer()).writeLong(logicalLogLocator.getMemoryOffset() + BEGIN_PREV_LSN_POS,
- prevLogicalLogLocator);
- }
-
- public void writeLogTail(LogicalLogLocator logicalLogLocator, ILogManager logManager) {
- (logicalLogLocator.getBuffer()).writeInt(logicalLogLocator.getMemoryOffset(),
- logManager.getLogManagerProperties().logMagicNumber);
+ builder.append(" Log Type : ").append(logTypeDisplay);
+ builder.append(" Job Id : ").append(getJobId(logicalLogLocator));
+ builder.append(" Dataset Id : ").append(getDatasetId(logicalLogLocator));
+ builder.append(" PK Hash Value : ").append(getPKHashValue(logicalLogLocator));
+ builder.append(" PrevLSN : ").append(getPrevLSN(logicalLogLocator));
+ builder.append(" Resource Id : ").append(getResourceId(logicalLogLocator));
+ builder.append(" ResourceMgr Id : ").append(getResourceMgrId(logicalLogLocator));
+ builder.append(" Log Record Size : ").append(getLogRecordSize(logicalLogLocator));
+ return builder.toString();
}
@Override
- public boolean validateLogRecord(LogManagerProperties logManagerProperties, LogicalLogLocator logicalLogLocator) {
- int logLength = this.getLogLength(logicalLogLocator);
+ public void writeLogHeader(LogicalLogLocator logicalLogLocator, byte logType, TransactionContext context,
+ int datasetId, int PKHashValue, long prevLogicalLogLocator, long resourceId, byte resourceMgrId,
+ int logRecordSize) {
+
+ /* magic no */
+ (logicalLogLocator.getBuffer()).writeInt(logicalLogLocator.getMemoryOffset() + MAGIC_NO_POS,
+ logManager.getLogManagerProperties().logMagicNumber);
+
+ /* log type */
+ (logicalLogLocator.getBuffer()).put(logicalLogLocator.getMemoryOffset() + LOG_TYPE_POS, logType);
+
+ /* jobId */
+ (logicalLogLocator.getBuffer()).writeInt(logicalLogLocator.getMemoryOffset() + JOB_ID_POS, context.getJobId()
+ .getId());
+
+ /* datasetId */
+ (logicalLogLocator.getBuffer()).writeInt(logicalLogLocator.getMemoryOffset() + DATASET_ID_POS, datasetId);
+
+ /* PK hash value */
+ (logicalLogLocator.getBuffer()).writeInt(logicalLogLocator.getMemoryOffset() + PK_HASH_VALUE_POS, PKHashValue);
+
+ if (logType == LogType.UPDATE) {
+ /* prevLSN */
+ (logicalLogLocator.getBuffer()).writeLong(logicalLogLocator.getMemoryOffset() + PREV_LSN_POS,
+ prevLogicalLogLocator);
+
+ /* resourceId */
+ (logicalLogLocator.getBuffer())
+ .writeLong(logicalLogLocator.getMemoryOffset() + RESOURCE_ID_POS, resourceId);
+
+ /* resourceMgr id */
+ (logicalLogLocator.getBuffer()).put(logicalLogLocator.getMemoryOffset() + RESOURCE_MGR_ID_POS,
+ resourceMgrId);
+
+ /* log record size */
+ (logicalLogLocator.getBuffer()).writeInt(logicalLogLocator.getMemoryOffset() + LOG_RECORD_SIZE_POS,
+ logRecordSize);
+ }
+ }
+
+ @Override
+ public boolean validateLogRecord(LogicalLogLocator logicalLogLocator) {
+ int logLength = this.getLogRecordSize(logicalLogLocator);
long expectedChecksum = DataUtil.getChecksum(logicalLogLocator.getBuffer(),
- logicalLogLocator.getMemoryOffset(), logLength - logManagerProperties.getLogChecksumSize());
- long actualChecksum = logicalLogLocator.getBuffer().readLong(
- logicalLogLocator.getMemoryOffset() + logLength - logManagerProperties.getLogChecksumSize());
+ logicalLogLocator.getMemoryOffset(), logLength - LOG_CHECKSUM_SIZE);
+ long actualChecksum = getLogChecksum(logicalLogLocator);
return expectedChecksum == actualChecksum;
}
+ /**
+ * @param logType
+ * @param logBodySize
+ * @return
+ */
+ @Override
+ public int getLogRecordSize(byte logType, int logBodySize) {
+ if (logType == LogType.UPDATE) {
+ return 46 + logBodySize;
+ } else {
+ return 25;
+ }
+ }
+
+ @Override
+ public int getLogHeaderSize(byte logType) {
+ if (logType == LogType.UPDATE) {
+ return 38;
+ } else {
+ return 17;
+ }
+ }
+
+ @Override
+ public int getLogChecksumSize() {
+ return LOG_CHECKSUM_SIZE;
+ }
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java
index 72b393d..dd276c6 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java
@@ -18,8 +18,5 @@
public static final byte UPDATE = 0;
public static final byte COMMIT = 1;
- public static final byte CLR = 2;
- public static final byte BGN_CHPKT = 3;
- public static final byte END_CHPKT = 4;
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/TreeLogger.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/TreeLogger.java
deleted file mode 100644
index b742ada..0000000
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/TreeLogger.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Copyright 2009-2011 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.transaction.management.service.logging;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
-import edu.uci.ics.asterix.transaction.management.resource.ICloseable;
-import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
-import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionProvider;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
-
-/**
- * Represents a utility class for generating log records corresponding to
- * operations on a ITreeIndex implementation. A TreeLogger instance is thread
- * safe and can be shared across multiple threads that may belong to same or
- * different transactions.
- */
-class TransactionState {
-
- private final Map<Long, TxnThreadState> transactionThreads = new HashMap<Long, TxnThreadState>();
-
- public synchronized TxnThreadState getTransactionThreadState(long threadId) {
- return transactionThreads.get(threadId);
- }
-
- public synchronized void putTransactionThreadState(long threadId, TxnThreadState txnThreadState) {
- this.transactionThreads.put(threadId, txnThreadState);
- }
-
- public synchronized void remove(long threadId) {
- transactionThreads.remove(threadId);
- }
-}
-
-/**
- * Represents the state of a transaction thread. The state contains information
- * that includes the tuple being operated, the operation and the location of the
- * log record corresponding to the operation.
- */
-class TxnThreadState {
-
- private ITupleReference tuple;
- private IndexOperation indexOperation;
- private LogicalLogLocator logicalLogLocator;
-
- public TxnThreadState(LogicalLogLocator logicalLogLocator, IndexOperation indexOperation, ITupleReference tupleReference) {
- this.tuple = tupleReference;
- this.indexOperation = indexOperation;
- this.logicalLogLocator = logicalLogLocator;
- }
-
- public synchronized ITupleReference getTuple() {
- return tuple;
- }
-
- public synchronized void setTuple(ITupleReference tuple) {
- this.tuple = tuple;
- }
-
- public synchronized IndexOperation getIndexOperation() {
- return indexOperation;
- }
-
- public synchronized void setIndexOperation(IndexOperation indexOperation) {
- this.indexOperation = indexOperation;
- }
-
- public synchronized LogicalLogLocator getLogicalLogLocator() {
- return logicalLogLocator;
- }
-
- public synchronized void setLogicalLogLocator(LogicalLogLocator logicalLogLocator) {
- this.logicalLogLocator = logicalLogLocator;
- }
-
-}
-
-public class TreeLogger implements ILogger, ICloseable {
-
- private static final byte resourceMgrId = TreeResourceManager.ID;
- private final Map<Object, Object> arguments = new ConcurrentHashMap<Object, Object>();
-
- public static final String TREE_INDEX = "TREE_INDEX";
- public static final String TUPLE_REFERENCE = "TUPLE_REFERENCE";
- public static final String TUPLE_WRITER = "TUPLE_WRITER";
- public static final String INDEX_OPERATION = "INDEX_OPERATION";
- public static final String RESOURCE_ID = "RESOURCE_ID";
-
- private final ITreeIndex treeIndex;
- private final ITreeIndexTupleWriter treeIndexTupleWriter;
- private final byte[] resourceIdBytes;
- private final byte[] resourceIdLengthBytes;
-
- public class BTreeOperationCodes {
- public static final byte INSERT = 0;
- public static final byte DELETE = 1;
- }
-
- public TreeLogger(byte[] resourceIdBytes, ITreeIndex treeIndex) {
- this.resourceIdBytes = resourceIdBytes;
- this.treeIndex = treeIndex;
- treeIndexTupleWriter = treeIndex.getLeafFrameFactory().getTupleWriterFactory().createTupleWriter();
- this.resourceIdLengthBytes = DataUtil.intToByteArray(resourceIdBytes.length);
- }
-
- public synchronized void close(TransactionContext context) {
- TransactionState txnState = (TransactionState) arguments.get(context.getJobId());
- txnState.remove(Thread.currentThread().getId());
- arguments.remove(context.getJobId());
- }
-
- public void generateLogRecord(TransactionProvider provider, TransactionContext context, IndexOperation operation,
- ITupleReference tuple) throws ACIDException {
- context.addCloseableResource(this); // the close method would be called
- // on this TreeLogger instance at
- // the time of transaction
- // commit/abort.
- if (operation != IndexOperation.INSERT && operation != IndexOperation.DELETE) {
- throw new ACIDException("Loging for Operation " + operation + " not supported");
-
- }
-
- TxnThreadState txnThreadState = null;
- TransactionState txnState;
- txnState = (TransactionState) arguments.get(context.getJobId());
- if (txnState == null) {
- synchronized (context) { // threads belonging to different
- // transaction do not need to
- // synchronize amongst them.
- if (txnState == null) {
- txnState = new TransactionState();
- arguments.put(context.getJobId(), txnState);
- }
- }
- }
-
- txnThreadState = txnState.getTransactionThreadState(Thread.currentThread().getId());
- if (txnThreadState == null) {
- LogicalLogLocator logicalLogLocator = LogUtil.getDummyLogicalLogLocator(provider.getLogManager());
- txnThreadState = new TxnThreadState(logicalLogLocator, operation, tuple);
- txnState.putTransactionThreadState(Thread.currentThread().getId(), txnThreadState);
- }
- txnThreadState.setIndexOperation(operation);
- txnThreadState.setTuple(tuple);
- int tupleSize = treeIndexTupleWriter.bytesRequired(tuple);
- // Below 4 is for the int representing the length of resource id and 1
- // is for
- // the byte representing the operation
- int logContentLength = 4 + resourceIdBytes.length + 1 + tupleSize;
- provider.getLogManager().log(txnThreadState.getLogicalLogLocator(), context, resourceMgrId, 0L, LogType.UPDATE,
- LogActionType.REDO_UNDO, logContentLength, (ILogger) this, arguments);
- }
-
- @Override
- public void log(TransactionContext context, LogicalLogLocator logicalLogLocator, int logRecordSize,
- Map<Object, Object> loggerArguments) throws ACIDException {
- TransactionState txnState = (TransactionState) loggerArguments.get(context.getJobId());
- TxnThreadState state = (TxnThreadState) txnState.getTransactionThreadState(Thread.currentThread().getId());
- int count = 0;
- byte[] logBuffer = logicalLogLocator.getBuffer().getArray();
- System.arraycopy(resourceIdLengthBytes, 0, logBuffer, logicalLogLocator.getMemoryOffset(), 4);
- count += 4; // count is incremented by 4 because we wrote the length
- // that is an int and hence 4 bytes
- System.arraycopy(resourceIdBytes, 0, logBuffer, logicalLogLocator.getMemoryOffset() + count,
- resourceIdBytes.length);
- count += resourceIdBytes.length;
- logBuffer[logicalLogLocator.getMemoryOffset() + count] = (byte) state.getIndexOperation().ordinal();
- count += 1; // count is incremented by 1 to account for the byte
- // written.
- treeIndexTupleWriter.writeTuple(state.getTuple(), logicalLogLocator.getBuffer().getArray(),
- logicalLogLocator.getMemoryOffset() + count);
- }
-
- @Override
- public void postLog(TransactionContext context, Map<Object, Object> loggerArguments) throws ACIDException {
- }
-
- @Override
- public void preLog(TransactionContext context, Map<Object, Object> loggerArguments) throws ACIDException {
- }
-
-}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/TreeLoggerRepository.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/TreeLoggerRepository.java
deleted file mode 100644
index 34e1466..0000000
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/TreeLoggerRepository.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Copyright 2009-2011 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.transaction.management.service.logging;
-
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-
-import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionProvider;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
-
-public class TreeLoggerRepository {
-
- private final Map<ByteBuffer, TreeLogger> loggers = new HashMap<ByteBuffer, TreeLogger>();
- private final TransactionProvider provider;
-
- public TreeLoggerRepository(TransactionProvider provider) {
- this.provider = provider;
- }
-
- public synchronized TreeLogger getTreeLogger(byte[] resourceIdBytes) {
- ByteBuffer resourceId = ByteBuffer.wrap(resourceIdBytes);
- TreeLogger logger = loggers.get(resourceId);
- if (logger == null) {
- ITreeIndex treeIndex = (ITreeIndex) provider.getTransactionalResourceRepository().getTransactionalResource(
- resourceIdBytes);
- logger = new TreeLogger(resourceIdBytes, treeIndex);
- loggers.put(resourceId, logger);
- }
- return logger;
- }
-}
\ No newline at end of file
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/TreeResourceManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/TreeResourceManager.java
deleted file mode 100644
index 4e6ad80..0000000
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/TreeResourceManager.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Copyright 2009-2011 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.transaction.management.service.logging;
-
-import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
-import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager;
-import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionProvider;
-import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleReference;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
-
-public class TreeResourceManager implements IResourceManager {
-
- public static final byte ID = (byte) 1;
-
- private final TransactionProvider provider;
-
- public TreeResourceManager(TransactionProvider provider) {
- this.provider = provider;
- }
-
- public byte getResourceManagerId() {
- return ID;
- }
-
- public void undo(ILogRecordHelper logRecordHelper, LogicalLogLocator logLocator) throws ACIDException {
- int logContentBeginPos = logRecordHelper.getLogContentBeginPos(logLocator);
- byte[] logBufferContent = logLocator.getBuffer().getArray();
- // read the length of resource id byte array
- int resourceIdLength = DataUtil.byteArrayToInt(logBufferContent, logContentBeginPos);
- byte[] resourceIdBytes = new byte[resourceIdLength];
-
- // copy the resource if bytes
- System.arraycopy(logBufferContent, logContentBeginPos + 4, resourceIdBytes, 0, resourceIdLength);
-
- // look up the repository to obtain the resource object
- ITreeIndex treeIndex = (ITreeIndex) provider.getTransactionalResourceRepository().getTransactionalResource(
- resourceIdBytes);
- int operationOffset = logContentBeginPos + 4 + resourceIdLength;
- int tupleBeginPos = operationOffset + 1;
-
- ITreeIndexTupleReference tupleReference = treeIndex.getLeafFrameFactory().getTupleWriterFactory()
- .createTupleWriter().createTupleReference();
- // TODO: remove this call.
- tupleReference.setFieldCount(tupleReference.getFieldCount());
- tupleReference.resetByTupleOffset(logLocator.getBuffer().getByteBuffer(), tupleBeginPos);
- byte operation = logBufferContent[operationOffset];
- IIndexAccessor treeIndexAccessor = treeIndex.createAccessor(NoOpOperationCallback.INSTANCE,
- NoOpOperationCallback.INSTANCE);
- try {
- switch (operation) {
- case TreeLogger.BTreeOperationCodes.INSERT:
- treeIndexAccessor.delete(tupleReference);
- break;
- case TreeLogger.BTreeOperationCodes.DELETE:
- treeIndexAccessor.insert(tupleReference);
- break;
- }
- } catch (Exception e) {
- throw new ACIDException(" could not rollback ", e);
- }
- }
-
- public void redo(ILogRecordHelper logRecordHelper, LogicalLogLocator logicalLogLocator) throws ACIDException {
- throw new UnsupportedOperationException(" Redo logic will be implemented as part of crash recovery feature");
- }
-
-}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
index e96cc73..c810170 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -115,15 +115,7 @@
if (resourceMgr == null) {
throw new ACIDException("unknown resource mgr with id " + resourceMgrId);
} else {
- byte actionType = parser.getLogActionType(memLSN);
- switch (actionType) {
- case LogActionType.REDO:
- resourceMgr.redo(parser, memLSN);
- break;
- case LogActionType.UNDO: /* skip these records */
- break;
- default: /* do nothing */
- }
+ resourceMgr.redo(parser, memLSN);
}
writeCheckpointRecord(memLSN.getLsn());
}
@@ -204,7 +196,7 @@
byte logType = parser.getLogType(logLocator);
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine(" reading LSN value inside rollback transaction method " + txnContext.getLastLogLocator()
- + " txn id " + parser.getLogTransactionId(logLocator) + " log type " + logType);
+ + " jodId " + parser.getJobId(logLocator) + " log type " + logType);
}
switch (logType) {
@@ -222,23 +214,13 @@
if (resourceMgr == null) {
throw new ACIDException(txnContext, " unknown resource manager " + resourceMgrId);
} else {
- byte actionType = parser.getLogActionType(logLocator);
- switch (actionType) {
- case LogActionType.REDO: // no need to do anything
- break;
- case LogActionType.UNDO: // undo the log record
- resourceMgr.undo(parser, logLocator);
- break;
- case LogActionType.REDO_UNDO: // undo log record
- resourceMgr.undo(parser, logLocator);
- break;
- default:
- }
+ resourceMgr.undo(parser, logLocator);
}
- case LogType.CLR: // skip the CLRs as they are not undone
break;
case LogType.COMMIT:
throw new ACIDException(txnContext, " cannot rollback commmitted transaction");
+ default:
+ throw new ACIDException("Unsupported LogType: " + logType);
}
@@ -248,7 +230,7 @@
// the logLocator object has been
// appropriately set to the location of the next log record to be
// processed as part of the roll back
- boolean moreLogs = parser.getPreviousLsnByTransaction(lsn, logLocator);
+ boolean moreLogs = parser.getPrevLSN(lsn, logLocator);
if (!moreLogs) {
// no more logs to process
break;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/IResourceManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/IResourceManager.java
index 200527f..f7715e8 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/IResourceManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/IResourceManager.java
@@ -22,6 +22,12 @@
* Provides APIs for undo or redo of an operation on a resource.
*/
public interface IResourceManager {
+
+ public class ResourceType {
+ public static final byte LSM_BTREE = 1;
+ public static final byte LSM_RTREE = 2;
+ public static final byte LSM_INVERTED_INDEX = 3;
+ }
/**
* Returns the unique identifier for the resource manager.
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/MutableResourceId.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/MutableResourceId.java
new file mode 100644
index 0000000..5552930
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/MutableResourceId.java
@@ -0,0 +1,30 @@
+package edu.uci.ics.asterix.transaction.management.service.transaction;
+
+public class MutableResourceId{
+ long id;
+
+ public MutableResourceId(long id) {
+ this.id = id;
+ }
+
+ public void setId(long id) {
+ this.id = id;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ @Override
+ public int hashCode() {
+ return (int)id;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if ((o == null) || !(o instanceof MutableResourceId)) {
+ return false;
+ }
+ return ((MutableResourceId) o).id == this.id;
+ }
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
index 63deb13..a6ef028 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -20,7 +20,6 @@
import java.util.logging.Logger;
import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
-import edu.uci.ics.asterix.transaction.management.service.logging.LogActionType;
import edu.uci.ics.asterix.transaction.management.service.logging.LogType;
/**
@@ -90,13 +89,9 @@
}
try {
- if (txnContext.getTransactionType().equals(TransactionContext.TransactionType.READ_WRITE)) { // conditionally
- // write
- // commit
- // log
- // record
- transactionProvider.getLogManager().log(txnContext.getLastLogLocator(), txnContext, (byte) (-1), 0,
- LogType.COMMIT, LogActionType.NO_OP, 0, null, null);
+ if (txnContext.getTransactionType().equals(TransactionContext.TransactionType.READ_WRITE)) {
+ transactionProvider.getLogManager().log(LogType.COMMIT, txnContext, -1, -1, -1, (byte) 0, 0, null,
+ null, txnContext.getLastLogLocator());
}
} catch (ACIDException ae) {
if (LOGGER.isLoggable(Level.SEVERE)) {
@@ -125,5 +120,4 @@
public TransactionProvider getTransactionProvider() {
return transactionProvider;
}
-
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionProvider.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionProvider.java
index ef843f4..84414b5 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionProvider.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionProvider.java
@@ -20,7 +20,7 @@
import edu.uci.ics.asterix.transaction.management.service.locking.LockManager;
import edu.uci.ics.asterix.transaction.management.service.logging.ILogManager;
import edu.uci.ics.asterix.transaction.management.service.logging.LogManager;
-import edu.uci.ics.asterix.transaction.management.service.logging.TreeLoggerRepository;
+import edu.uci.ics.asterix.transaction.management.service.logging.IndexLoggerRepository;
import edu.uci.ics.asterix.transaction.management.service.recovery.IRecoveryManager;
import edu.uci.ics.asterix.transaction.management.service.recovery.RecoveryManager;
@@ -35,7 +35,7 @@
private final ITransactionManager transactionManager;
private final IRecoveryManager recoveryManager;
private final TransactionalResourceRepository resourceRepository;
- private final TreeLoggerRepository loggerRepository;
+ private final IndexLoggerRepository loggerRepository;
public TransactionProvider(String id) throws ACIDException {
this.id = id;
@@ -43,7 +43,7 @@
this.logManager = new LogManager(this);
this.lockManager = new LockManager(this);
this.recoveryManager = new RecoveryManager(this);
- this.loggerRepository = new TreeLoggerRepository(this);
+ this.loggerRepository = new IndexLoggerRepository(this);
this.resourceRepository = new TransactionalResourceRepository();
}
@@ -67,7 +67,7 @@
return resourceRepository;
}
- public TreeLoggerRepository getTreeLoggerRepository() {
+ public IndexLoggerRepository getTreeLoggerRepository() {
return loggerRepository;
}
diff --git a/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/logging/BasicLogger.java b/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/logging/BasicLogger.java
index 3e95d6a..c9d01a0 100644
--- a/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/logging/BasicLogger.java
+++ b/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/logging/BasicLogger.java
@@ -14,12 +14,12 @@
*/
package edu.uci.ics.asterix.transaction.management.logging;
-import java.util.Map;
import java.util.Random;
import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
import edu.uci.ics.asterix.transaction.management.service.logging.IBuffer;
import edu.uci.ics.asterix.transaction.management.service.logging.ILogger;
+import edu.uci.ics.asterix.transaction.management.service.logging.IndexLogger.ReusableLogContentObject;
import edu.uci.ics.asterix.transaction.management.service.logging.LogicalLogLocator;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
@@ -29,7 +29,7 @@
private static long count = 0;
public void log(TransactionContext context, LogicalLogLocator wMemLSN, int length,
- Map<Object, Object> loggerArguments) throws ACIDException {
+ ReusableLogContentObject reusableLogContentObject) throws ACIDException {
byte[] logContent = getRandomBytes(length);
try {
@@ -66,12 +66,12 @@
return averageContentCreationTime;
}
- public void postLog(TransactionContext context, Map<Object, Object> loggerArguments) throws ACIDException {
+ public void postLog(TransactionContext context, ReusableLogContentObject reusableLogContentObject) throws ACIDException {
// TODO Auto-generated method stub
}
- public void preLog(TransactionContext context, Map<Object, Object> loggerArguments) throws ACIDException {
+ public void preLog(TransactionContext context, ReusableLogContentObject reusableLogContentObject) throws ACIDException {
// TODO Auto-generated method stub
}
diff --git a/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/logging/test/TransactionWorkloadSimulator.java b/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/logging/test/TransactionWorkloadSimulator.java
index cc266fe..8977b04 100644
--- a/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/logging/test/TransactionWorkloadSimulator.java
+++ b/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/logging/test/TransactionWorkloadSimulator.java
@@ -29,6 +29,7 @@
import edu.uci.ics.asterix.transaction.management.service.logging.LogicalLogLocator;
import edu.uci.ics.asterix.transaction.management.service.transaction.DatasetId;
import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager;
+import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager.ResourceType;
import edu.uci.ics.asterix.transaction.management.service.transaction.JobIdFactory;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionProvider;
@@ -180,14 +181,14 @@
}
tempDatasetId.setId(resourceID);
TransactionWorkloadSimulator.lockManager.lock(tempDatasetId, -1, lockMode, context);
- TransactionWorkloadSimulator.logManager.log(memLSN, context, ResourceMgrInfo.BTreeResourceMgrId,
- pageId, logType, logActionType, logSize, logger, null);
+ TransactionWorkloadSimulator.logManager.log(logType, context, resourceID,
+ -1, resourceID, ResourceType.LSM_BTREE, logSize, null, logger, memLSN);
retry = false;
Thread.currentThread().sleep(TransactionWorkloadSimulator.workload.thinkTime);
logCount.incrementAndGet();
logByteCount.addAndGet(logSize
- + TransactionWorkloadSimulator.logManager.getLogManagerProperties().getLogHeaderSize()
- + TransactionWorkloadSimulator.logManager.getLogManagerProperties().getLogChecksumSize());
+ + TransactionWorkloadSimulator.logManager.getLogRecordHelper().getLogHeaderSize(logType)
+ + TransactionWorkloadSimulator.logManager.getLogRecordHelper().getLogChecksumSize());
myLogCount++;
}
} catch (ACIDException acide) {
diff --git a/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/FileLogger.java b/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/FileLogger.java
index 16c5d6b..54bb0b8 100644
--- a/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/FileLogger.java
+++ b/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/FileLogger.java
@@ -14,11 +14,11 @@
*/
package edu.uci.ics.asterix.transaction.management.test;
-import java.util.Map;
import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
import edu.uci.ics.asterix.transaction.management.logging.IResource;
import edu.uci.ics.asterix.transaction.management.service.logging.ILogger;
+import edu.uci.ics.asterix.transaction.management.service.logging.IndexLogger.ReusableLogContentObject;
import edu.uci.ics.asterix.transaction.management.service.logging.LogicalLogLocator;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
@@ -39,14 +39,14 @@
}
@Override
- public void preLog(TransactionContext context, Map<Object, Object> loggerArguments) throws ACIDException {
+ public void preLog(TransactionContext context, ReusableLogContentObject reusableLogContentObject) throws ACIDException {
// TODO Auto-generated method stub
}
@Override
- public void log(TransactionContext context, final LogicalLogLocator memLSN, int logRecordSize,
- Map<Object, Object> loggerArguments) throws ACIDException {
+ public void log(TransactionContext context, final LogicalLogLocator memLSN, int logContentSize,
+ ReusableLogContentObject reusableLogContentObject) throws ACIDException {
byte[] buffer = memLSN.getBuffer().getArray();
byte[] content = logRecordContent.getBytes();
for (int i = 0; i < resource.getId().length; i++) {
@@ -58,7 +58,7 @@
}
@Override
- public void postLog(TransactionContext context, Map<Object, Object> loggerArguments) throws ACIDException {
+ public void postLog(TransactionContext context, ReusableLogContentObject reusableLogContentObject) throws ACIDException {
// TODO Auto-generated method stub
}
diff --git a/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/TransactionSimulator.java b/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/TransactionSimulator.java
index 4476955..f6e6d3a 100644
--- a/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/TransactionSimulator.java
+++ b/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/TransactionSimulator.java
@@ -19,21 +19,20 @@
import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
import edu.uci.ics.asterix.transaction.management.logging.IResource;
-import edu.uci.ics.asterix.transaction.management.resource.TransactionalResourceRepository;
import edu.uci.ics.asterix.transaction.management.service.locking.ILockManager;
import edu.uci.ics.asterix.transaction.management.service.logging.ILogManager;
import edu.uci.ics.asterix.transaction.management.service.logging.ILogger;
-import edu.uci.ics.asterix.transaction.management.service.logging.LogActionType;
import edu.uci.ics.asterix.transaction.management.service.logging.LogType;
import edu.uci.ics.asterix.transaction.management.service.logging.LogUtil;
import edu.uci.ics.asterix.transaction.management.service.logging.LogicalLogLocator;
import edu.uci.ics.asterix.transaction.management.service.recovery.IRecoveryManager;
import edu.uci.ics.asterix.transaction.management.service.recovery.IRecoveryManager.SystemState;
import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager;
+import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager.ResourceType;
import edu.uci.ics.asterix.transaction.management.service.transaction.ITransactionManager;
import edu.uci.ics.asterix.transaction.management.service.transaction.JobId;
-import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
import edu.uci.ics.asterix.transaction.management.service.transaction.JobIdFactory;
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionProvider;
public class TransactionSimulator {
@@ -78,15 +77,13 @@
case INCREMENT:
finalValue = currentValue + 1;
int logRecordLength = ((FileLogger) logger).generateLogRecordContent(currentValue, finalValue);
- logManager.log(memLSN, txnContext, FileResourceManager.id, 0, LogType.UPDATE, LogActionType.REDO_UNDO,
- logRecordLength, logger, null);
+ logManager.log(LogType.UPDATE, txnContext, 1, -1, 1, ResourceType.LSM_BTREE, 0, null, logger, memLSN);
((FileResource) resource).increment();
break;
case DECREMENT:
finalValue = currentValue - 1;
logRecordLength = ((FileLogger) logger).generateLogRecordContent(currentValue, finalValue);
- logManager.log(memLSN, txnContext, FileResourceManager.id, 0, LogType.UPDATE, LogActionType.REDO_UNDO,
- logRecordLength, logger, null);
+ logManager.log(LogType.UPDATE, txnContext, 1, -1, 1, ResourceType.LSM_BTREE, 0, null, logger, memLSN);
((FileResource) resource).decrement();
break;
}