checkpoint
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
index c6d83cf..bc15644 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
@@ -1203,7 +1203,7 @@
ITupleReference libraryTuple = tupleReaderWriter.getTupleFromMetadataEntity(library);
insertTupleIntoIndex(jobId, MetadataPrimaryIndexes.LIBRARY_DATASET, libraryTuple);
- } catch (BTreeDuplicateKeyException e) {
+ } catch (TreeIndexException e) {
throw new MetadataException("A library with this name " + library.getDataverseName()
+ " already exists in dataverse '" + library.getDataverseName() + "'.", e);
} catch (Exception e) {
@@ -1342,7 +1342,7 @@
ITupleReference feedPolicyTuple = tupleReaderWriter.getTupleFromMetadataEntity(feedPolicy);
insertTupleIntoIndex(jobId, MetadataPrimaryIndexes.FEED_POLICY_DATASET, feedPolicyTuple);
- } catch (BTreeDuplicateKeyException e) {
+ } catch (TreeIndexException e) {
throw new MetadataException("A feed policy with this name " + feedPolicy.getPolicyName()
+ " already exists in dataverse '" + feedPolicy.getPolicyName() + "'.", e);
} catch (Exception e) {
@@ -1430,7 +1430,7 @@
ITupleReference feedTuple = tupleReaderWriter.getTupleFromMetadataEntity(feed);
insertTupleIntoIndex(jobId, MetadataPrimaryIndexes.FEED_DATASET, feedTuple);
- } catch (BTreeDuplicateKeyException e) {
+ } catch (TreeIndexException e) {
throw new MetadataException("A feed with this name " + feed.getFeedName()
+ " already exists in dataverse '" + feed.getDataverseName() + "'.", e);
} catch (Exception e) {
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
index 99d2e21..c241fc5 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -36,10 +36,6 @@
import edu.uci.ics.asterix.common.context.DatasetLifecycleManager;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
-import edu.uci.ics.asterix.common.transactions.IResourceManager.ResourceType;
-import edu.uci.ics.asterix.common.transactions.TransactionalResourceManagerRepository;
-import edu.uci.ics.asterix.external.adapter.factory.IAdapterFactory;
-import edu.uci.ics.asterix.external.dataset.adapter.AdapterIdentifier;
import edu.uci.ics.asterix.metadata.IDatasetDetails;
import edu.uci.ics.asterix.metadata.MetadataException;
import edu.uci.ics.asterix.metadata.MetadataManager;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/DatasetDataSource.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/DatasetDataSource.java
index 23a6d84..cb1124f 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/DatasetDataSource.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/DatasetDataSource.java
@@ -10,7 +10,7 @@
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.core.algebra.properties.AsterixNodeGroupDomain;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.DefaultNodeGroupDomain;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.INodeDomain;
public class DatasetDataSource extends AqlDataSource {
@@ -65,7 +65,7 @@
schemaTypes[i] = recordType.getFieldType(partitioningKeys.get(i));
}
schemaTypes[n] = itemType;
- domain = new AsterixNodeGroupDomain(DatasetUtils.getNodegroupName(dataset));
+ domain = new DefaultNodeGroupDomain(DatasetUtils.getNodegroupName(dataset));
}
private void initExternalDataset(IAType itemType) {
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
index cc597cc..bcadc18 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
@@ -18,7 +18,7 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
-import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeDuplicateKeyException;
+import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
/**
* FeedMetaOperatorDescriptor is a wrapper operator that provides a sanboox like
@@ -218,7 +218,7 @@
}
private boolean handleException(Throwable exception) {
- if (exception instanceof BTreeDuplicateKeyException) {
+ if (exception instanceof TreeIndexException) {
if (resumeOldState) {
if (LOGGER.isLoggable(Level.WARNING)) {
LOGGER.warning("Received duplicate key exception but that is possible post recovery");
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixAppContextInfo.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixAppContextInfo.java
index b3f1332..b8b3226 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixAppContextInfo.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixAppContextInfo.java
@@ -107,12 +107,12 @@
@Override
public IIndexLifecycleManagerProvider getIndexLifecycleManagerProvider() {
- return AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER;
+ return AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
}
@Override
public IStorageManagerInterface getStorageManagerInterface() {
- return AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER;
+ return AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
}
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
index f4b3a20..c7df2f2 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
@@ -32,8 +32,6 @@
import edu.uci.ics.asterix.common.transactions.ITransactionContext;
import edu.uci.ics.asterix.common.transactions.ITransactionManager;
import edu.uci.ics.asterix.common.transactions.JobId;
-import edu.uci.ics.asterix.common.transactions.LogUtil;
-import edu.uci.ics.asterix.common.transactions.LogicalLogLocator;
import edu.uci.ics.asterix.transaction.management.service.logging.LogPage;
import edu.uci.ics.asterix.transaction.management.service.logging.LogPageReader;
import edu.uci.ics.asterix.transaction.management.service.logging.LogRecord;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexLogger.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexLogger.java
deleted file mode 100644
index 755e85b..0000000
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexLogger.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.transaction.management.service.logging;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import edu.uci.ics.asterix.common.exceptions.ACIDException;
-import edu.uci.ics.asterix.common.transactions.ICloseable;
-import edu.uci.ics.asterix.common.transactions.ILogger;
-import edu.uci.ics.asterix.common.transactions.IResourceManager.ResourceType;
-import edu.uci.ics.asterix.common.transactions.ITransactionContext;
-import edu.uci.ics.asterix.common.transactions.ITransactionSubsystem;
-import edu.uci.ics.asterix.common.transactions.LogUtil;
-import edu.uci.ics.asterix.common.transactions.LogicalLogLocator;
-import edu.uci.ics.asterix.common.transactions.ReusableLogContentObject;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
-import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
-import edu.uci.ics.hyracks.storage.am.common.tuples.SimpleTupleWriter;
-
-public class IndexLogger implements ILogger, ICloseable {
-
- private final Map<Object, Object> jobId2ReusableLogContentObjectRepositoryMap = new ConcurrentHashMap<Object, Object>();
-
- public static final String TREE_INDEX = "TREE_INDEX";
- public static final String TUPLE_REFERENCE = "TUPLE_REFERENCE";
- public static final String TUPLE_WRITER = "TUPLE_WRITER";
- public static final String INDEX_OPERATION = "INDEX_OPERATION";
- public static final String RESOURCE_ID = "RESOURCE_ID";
-
- private final long resourceId;
- private final byte resourceType;
- private final SimpleTupleWriter tupleWriter;
-
- public class BTreeOperationCodes {
- public static final byte INSERT = 0;
- public static final byte DELETE = 1;
- }
-
- public IndexLogger(long resourceId, byte resourceType, IIndex index) {
- this.resourceId = resourceId;
- this.resourceType = resourceType;
- this.tupleWriter = new SimpleTupleWriter();
- }
-
- public synchronized void close(ITransactionContext context) {
- ReusableLogContentObjectRepository txnThreadStateRepository = (ReusableLogContentObjectRepository) jobId2ReusableLogContentObjectRepositoryMap
- .get(context.getJobId());
- txnThreadStateRepository.remove(Thread.currentThread().getId());
- jobId2ReusableLogContentObjectRepositoryMap.remove(context.getJobId());
- }
-
- public void generateLogRecord(ITransactionSubsystem txnSubsystem, ITransactionContext context, int datasetId,
- int PKHashValue, long resourceId, IndexOperation newOperation, ITupleReference newValue,
- IndexOperation oldOperation, ITupleReference oldValue) throws ACIDException {
-
- if (this.resourceId != resourceId) {
- throw new ACIDException("IndexLogger mistach");
- }
-
- context.addCloseableResource(this); // the close method would be called
- // on this TreeLogger instance at
- // the time of transaction
- // commit/abort.
- if (newOperation != IndexOperation.INSERT && newOperation != IndexOperation.DELETE) {
- throw new ACIDException("Loging for Operation " + newOperation + " not supported");
- }
-
- ReusableLogContentObject reusableLogContentObject = null;
- ReusableLogContentObjectRepository reusableLogContentObjectRepository = null;
- reusableLogContentObjectRepository = (ReusableLogContentObjectRepository) jobId2ReusableLogContentObjectRepositoryMap
- .get(context.getJobId());
- if (reusableLogContentObjectRepository == null) {
- synchronized (context) { // threads belonging to different
- // transaction do not need to
- // synchronize amongst them.
- if (reusableLogContentObjectRepository == null) {
- reusableLogContentObjectRepository = new ReusableLogContentObjectRepository();
- jobId2ReusableLogContentObjectRepositoryMap.put(context.getJobId(),
- reusableLogContentObjectRepository);
- }
- }
- }
-
- reusableLogContentObject = reusableLogContentObjectRepository.getObject(Thread.currentThread().getId());
- if (reusableLogContentObject == null) {
- LogicalLogLocator logicalLogLocator = LogUtil.getDummyLogicalLogLocator(txnSubsystem.getLogManager());
- reusableLogContentObject = new ReusableLogContentObject(logicalLogLocator, newOperation, newValue,
- oldOperation, oldValue);
- reusableLogContentObjectRepository.putObject(Thread.currentThread().getId(), reusableLogContentObject);
- } else {
- reusableLogContentObject.setNewOperation(newOperation);
- reusableLogContentObject.setNewValue(newValue);
- reusableLogContentObject.setOldOperation(oldOperation);
- reusableLogContentObject.setOldValue(oldValue);
- }
-
- int logContentSize = 4/*TupleFieldCount*/+ 1/*NewOperation*/+ 4/*newValueLength*/;
- if (newValue != null) {
- logContentSize += tupleWriter.bytesRequired(newValue);
- }
-
- logContentSize += 1/*OldOperation*/+ 4/*oldValueLength*/;
- if (oldValue != null) {
- logContentSize += tupleWriter.bytesRequired(oldValue);
- }
-
- txnSubsystem.getLogManager().log(LogType.UPDATE, context, datasetId, PKHashValue, resourceId, resourceType,
- logContentSize, reusableLogContentObject, this, reusableLogContentObject.getLogicalLogLocator());
- }
-
- @Override
- public void log(ITransactionContext context, LogicalLogLocator logicalLogLocator, int logContentSize,
- ReusableLogContentObject reusableLogContentObject) throws ACIDException {
- int offset = 0;
- int tupleSize = 0;
-
- //tuple field count
- (logicalLogLocator.getBuffer()).writeInt(logicalLogLocator.getMemoryOffset() + offset, reusableLogContentObject
- .getNewValue().getFieldCount());
- offset += 4;
-
- //new operation
- (logicalLogLocator.getBuffer()).put(logicalLogLocator.getMemoryOffset() + offset,
- (byte) reusableLogContentObject.getNewOperation().ordinal());
- offset += 1;
-
- //new tuple size
- if (reusableLogContentObject.getNewValue() != null) {
- tupleSize = tupleWriter.bytesRequired(reusableLogContentObject.getNewValue());
- }
- (logicalLogLocator.getBuffer()).writeInt(logicalLogLocator.getMemoryOffset() + offset, tupleSize);
- offset += 4;
-
- //new tuple
- if (tupleSize != 0) {
- tupleWriter.writeTuple(reusableLogContentObject.getNewValue(), logicalLogLocator.getBuffer().getArray(),
- logicalLogLocator.getMemoryOffset() + offset);
- offset += tupleSize;
- }
-
- if (resourceType == ResourceType.LSM_BTREE) {
- //old operation
- (logicalLogLocator.getBuffer()).put(logicalLogLocator.getMemoryOffset() + offset,
- (byte) reusableLogContentObject.getOldOperation().ordinal());
- offset += 1;
-
- if (reusableLogContentObject.getOldOperation() != IndexOperation.NOOP) {
- //old tuple size
- if (reusableLogContentObject.getOldValue() != null) {
- tupleSize = tupleWriter.bytesRequired(reusableLogContentObject.getOldValue());
- } else {
- tupleSize = 0;
- }
- (logicalLogLocator.getBuffer()).writeInt(logicalLogLocator.getMemoryOffset() + offset, tupleSize);
- offset += 4;
-
- if (tupleSize != 0) {
- //old tuple
- tupleWriter.writeTuple(reusableLogContentObject.getOldValue(), logicalLogLocator.getBuffer()
- .getArray(), logicalLogLocator.getMemoryOffset() + offset);
- }
- }
- }
- }
-
- @Override
- public void postLog(ITransactionContext context, ReusableLogContentObject reusableLogContentObject)
- throws ACIDException {
- }
-
- @Override
- public void preLog(ITransactionContext context, ReusableLogContentObject reusableLogContentObject)
- throws ACIDException {
- }
-
- /**
- * Represents a utility class for generating log records corresponding to
- * operations on a ITreeIndex implementation. A TreeLogger instance is thread
- * safe and can be shared across multiple threads that may belong to same or
- * different transactions.
- */
- public class ReusableLogContentObjectRepository {
-
- private final Map<Long, ReusableLogContentObject> id2Object = new HashMap<Long, ReusableLogContentObject>();
-
- public synchronized ReusableLogContentObject getObject(long threadId) {
- return id2Object.get(threadId);
- }
-
- public synchronized void putObject(long threadId, ReusableLogContentObject reusableLogContentObject) {
- this.id2Object.put(threadId, reusableLogContentObject);
- }
-
- public synchronized void remove(long threadId) {
- id2Object.remove(threadId);
- }
- }
-
-}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java
deleted file mode 100644
index 5e380ff..0000000
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.transaction.management.service.logging;
-
-import java.io.File;
-import java.io.IOException;
-
-import edu.uci.ics.asterix.common.exceptions.ACIDException;
-import edu.uci.ics.asterix.common.transactions.FileUtil;
-import edu.uci.ics.asterix.common.transactions.IBuffer;
-import edu.uci.ics.asterix.common.transactions.IFileBasedBuffer;
-import edu.uci.ics.asterix.common.transactions.ILogCursor;
-import edu.uci.ics.asterix.common.transactions.ILogFilter;
-import edu.uci.ics.asterix.common.transactions.LogManagerProperties;
-import edu.uci.ics.asterix.common.transactions.LogUtil;
-import edu.uci.ics.asterix.common.transactions.LogicalLogLocator;
-import edu.uci.ics.asterix.common.transactions.PhysicalLogLocator;
-
-public class LogCursor implements ILogCursor {
-
- private final LogManager logManager;
- private final ILogFilter logFilter;
- private final int logPageSize;
- private IBuffer readOnlyBuffer;
- private LogicalLogLocator logicalLogLocator = null;
- private boolean needReloadBuffer = true;
-
- public LogCursor(final LogManager logManager, PhysicalLogLocator startingPhysicalLogLocator, ILogFilter logFilter,
- int logPageSize) throws IOException, ACIDException {
- this.logFilter = logFilter;
- this.logManager = logManager;
- this.logPageSize = logPageSize;
- initialize(startingPhysicalLogLocator);
- }
-
- private void initialize(final PhysicalLogLocator startingPhysicalLogLocator) throws IOException, ACIDException {
- logicalLogLocator = new LogicalLogLocator(startingPhysicalLogLocator.getLsn(), null, 0, logManager);
- }
-
- private IFileBasedBuffer getReadOnlyBuffer(long lsn, int size) throws IOException {
- int fileId = (int) (lsn / logManager.getLogManagerProperties().getLogPartitionSize());
- String filePath = LogUtil.getLogFilePath(logManager.getLogManagerProperties(), fileId);
- File file = new File(filePath);
- if (file.exists()) {
- return FileUtil.getFileBasedBuffer(filePath, lsn
- % logManager.getLogManagerProperties().getLogPartitionSize(), size, logManager
- .getLogManagerProperties().getDiskSectorSize());
- } else {
- return null;
- }
- }
-
- /**
- * Moves the cursor to the next log record that satisfies the configured
- * filter. The parameter nextLogLocator is set to the point to the next log
- * record.
- *
- * @param currentLogLocator
- * @return true if the cursor was successfully moved to the next log record
- * false if there are no more log records that satisfy the
- * configured filter.
- */
- @Override
- public boolean next(LogicalLogLocator currentLogLocator) throws IOException, ACIDException {
-
- //TODO
- //Test the correctness when multiple log files are created
- int integerRead = -1;
- boolean logRecordBeginPosFound = false;
- long bytesSkipped = 0;
-
- //if the lsn to read is greater than or equal to the most recent lsn, then return false
- if (logicalLogLocator.getLsn() >= logManager.getCurrentLsn().get()) {
- return false;
- }
-
- if (logManager.isMemoryRead(logicalLogLocator.getLsn())) {
- return readFromMemory(currentLogLocator);
- }
-
- //if the readOnlyBuffer should be reloaded, then load the log page from the log file.
- //needReloadBuffer is set to true if the log record is read from the memory log page.
- if (needReloadBuffer) {
- //log page size doesn't exceed integer boundary
- int offset = (int) (logicalLogLocator.getLsn() % logPageSize);
- long adjustedLSN = logicalLogLocator.getLsn() - offset;
- readOnlyBuffer = getReadOnlyBuffer(adjustedLSN, logPageSize);
- logicalLogLocator.setBuffer(readOnlyBuffer);
- logicalLogLocator.setMemoryOffset(offset);
- needReloadBuffer = false;
- }
-
- //check whether the currentOffset has enough space to have new log record by comparing
- //the smallest log record type(which is commit)'s log header.
- while (logicalLogLocator.getMemoryOffset() <= readOnlyBuffer.getSize()
- - logManager.getLogRecordHelper().getLogHeaderSize(LogType.COMMIT)) {
- integerRead = readOnlyBuffer.readInt(logicalLogLocator.getMemoryOffset());
- if (integerRead == LogManagerProperties.LOG_MAGIC_NUMBER) {
- logRecordBeginPosFound = true;
- break;
- }
- logicalLogLocator.increaseMemoryOffset(1);
- logicalLogLocator.incrementLsn();
- bytesSkipped++;
- if (bytesSkipped > logPageSize) {
- return false; // the maximum size of a log record is limited to
- // a log page size. If we have skipped as many
- // bytes without finding a log record, it
- // indicates an absence of logs any further.
- }
-
- //if (logicalLogLocator.getLsn() > logManager.getLastFlushedLsn().get()) {
- if (logManager.isMemoryRead(logicalLogLocator.getLsn())) {
- return next(currentLogLocator); //should read from memory if there is any further log
- }
- }
-
- if (!logRecordBeginPosFound) {
- // need to reload the buffer
- // TODO
- // reduce IO by reading more pages(equal to logBufferSize) at a time.
- long lsnpos = ((logicalLogLocator.getLsn() / logPageSize) + 1) * logPageSize;
-
- readOnlyBuffer = getReadOnlyBuffer(lsnpos, logPageSize);
- if (readOnlyBuffer != null) {
- logicalLogLocator.setBuffer(readOnlyBuffer);
- logicalLogLocator.setLsn(lsnpos);
- logicalLogLocator.setMemoryOffset(0);
- return next(currentLogLocator);
- } else {
- return false;
- }
- }
-
- int logLength = logManager.getLogRecordHelper().getLogRecordSize(
- logManager.getLogRecordHelper().getLogType(logicalLogLocator),
- logManager.getLogRecordHelper().getLogContentSize(logicalLogLocator));
- if (logManager.getLogRecordHelper().validateLogRecord(logicalLogLocator)) {
- if (currentLogLocator == null) {
- currentLogLocator = new LogicalLogLocator(0, readOnlyBuffer, -1, logManager);
- }
- currentLogLocator.setLsn(logicalLogLocator.getLsn());
- currentLogLocator.setMemoryOffset(logicalLogLocator.getMemoryOffset());
- currentLogLocator.setBuffer(readOnlyBuffer);
- logicalLogLocator.incrementLsn(logLength);
- logicalLogLocator.setMemoryOffset(logicalLogLocator.getMemoryOffset() + logLength);
- } else {
- throw new ACIDException("Invalid Log Record found ! checksums do not match :( ");
- }
- return logFilter.accept(readOnlyBuffer, currentLogLocator.getMemoryOffset(), logLength);
- }
-
- /**
- * Returns the filter associated with the cursor.
- *
- * @return ILogFilter
- */
- @Override
- public ILogFilter getLogFilter() {
- return logFilter;
- }
-
- private boolean readFromMemory(LogicalLogLocator currentLogLocator) throws ACIDException, IOException {
- byte[] logRecord = null;
- long lsn = logicalLogLocator.getLsn();
-
- //set the needReloadBuffer to true
- needReloadBuffer = true;
-
- int pageIndex = logManager.getLogPageIndex(lsn);
- logicalLogLocator.setMemoryOffset(logManager.getLogPageOffset(lsn));
-
- // take a lock on the log page so that the page is not flushed to
- // disk interim
- IFileBasedBuffer logPage = logManager.getLogPage(pageIndex);
- synchronized (logPage) {
- // need to check again if the log record in the log buffer or has reached the disk
- if (logManager.isMemoryRead(lsn)) {
-
- //find the magic number to identify the start of the log record
- //----------------------------------------------------------------
- int readNumber = -1;
- int logMagicNumber = LogManagerProperties.LOG_MAGIC_NUMBER;
- int bytesSkipped = 0;
- boolean logRecordBeginPosFound = false;
- //check whether the currentOffset has enough space to have new log record by comparing
- //the smallest log record type(which is commit)'s log header.
- while (logicalLogLocator.getMemoryOffset() <= logPageSize
- - logManager.getLogRecordHelper().getLogHeaderSize(LogType.COMMIT)) {
- readNumber = logPage.readInt(logicalLogLocator.getMemoryOffset());
- if (readNumber == logMagicNumber) {
- logRecordBeginPosFound = true;
- break;
- }
- logicalLogLocator.increaseMemoryOffset(1);
- logicalLogLocator.incrementLsn();
- bytesSkipped++;
- if (bytesSkipped > logPageSize) {
- return false; // the maximum size of a log record is limited to
- // a log page size. If we have skipped as many
- // bytes without finding a log record, it
- // indicates an absence of logs any further.
- }
- }
-
- if (!logRecordBeginPosFound) {
- // need to read the next log page
- readOnlyBuffer = null;
- logicalLogLocator.setBuffer(null);
- lsn = ((logicalLogLocator.getLsn() / logPageSize) + 1) * logPageSize;
- logicalLogLocator.setLsn(lsn);
- logicalLogLocator.setMemoryOffset(0);
- return next(currentLogLocator);
- }
- //------------------------------------------------------
-
- logicalLogLocator.setBuffer(logPage);
- int logLength = logManager.getLogRecordHelper().getLogRecordSize(
- logManager.getLogRecordHelper().getLogType(logicalLogLocator),
- logManager.getLogRecordHelper().getLogContentSize(logicalLogLocator));
- logRecord = new byte[logLength];
-
- //copy the log record and set the buffer of logical log locator to the buffer of the copied log record.
- System.arraycopy(logPage.getArray(), logicalLogLocator.getMemoryOffset(), logRecord, 0, logLength);
- MemBasedBuffer memBuffer = new MemBasedBuffer(logRecord);
- readOnlyBuffer = memBuffer;
- logicalLogLocator.setBuffer(readOnlyBuffer);
- logicalLogLocator.setMemoryOffset(0);
-
- if (logManager.getLogRecordHelper().validateLogRecord(logicalLogLocator)) {
- if (currentLogLocator == null) {
- currentLogLocator = new LogicalLogLocator(0, readOnlyBuffer, -1, logManager);
- }
- currentLogLocator.setLsn(logicalLogLocator.getLsn());
- currentLogLocator.setMemoryOffset(logicalLogLocator.getMemoryOffset());
- currentLogLocator.setBuffer(readOnlyBuffer);
- logicalLogLocator.incrementLsn(logLength);
- logicalLogLocator.setMemoryOffset(logicalLogLocator.getMemoryOffset() + logLength);
- } else {
- //if the checksum doesn't match, there is two possible scenario.
- //case1) the log file corrupted: there's nothing we can do for this case during abort.
- //case2) the log record is partially written by another thread. So, we may ignore this log record
- // and continue to read the next log record
- //[NOTICE]
- //Only case2 is handled here.
- logicalLogLocator.incrementLsn(logLength);
- logicalLogLocator.setMemoryOffset(logicalLogLocator.getMemoryOffset() + logLength);
- return next(currentLogLocator);
- }
- return logFilter.accept(readOnlyBuffer, currentLogLocator.getMemoryOffset(), logLength);
-
- } else {
- return next(currentLogLocator);//read from disk
- }
- }
- }
-}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index 9551ba8..419438f 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -24,14 +24,9 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
-<<<<<<< HEAD
-import java.util.Map;
-import java.util.Set;
-=======
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
->>>>>>> master
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
@@ -45,15 +40,8 @@
import edu.uci.ics.asterix.common.transactions.ITransactionContext;
import edu.uci.ics.asterix.common.transactions.ITransactionManager;
import edu.uci.ics.asterix.common.transactions.LogManagerProperties;
-<<<<<<< HEAD
-import edu.uci.ics.asterix.common.transactions.LogUtil;
-import edu.uci.ics.asterix.common.transactions.LogicalLogLocator;
-import edu.uci.ics.asterix.common.transactions.PhysicalLogLocator;
-import edu.uci.ics.asterix.common.transactions.ReusableLogContentObject;
-=======
import edu.uci.ics.asterix.common.transactions.MutableLong;
import edu.uci.ics.asterix.transaction.management.service.locking.LockManager;
->>>>>>> master
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
index 00abc86..95ee767 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -45,10 +45,6 @@
import edu.uci.ics.asterix.common.transactions.ILogRecord;
import edu.uci.ics.asterix.common.transactions.IRecoveryManager;
import edu.uci.ics.asterix.common.transactions.ITransactionContext;
-import edu.uci.ics.asterix.common.transactions.LogUtil;
-import edu.uci.ics.asterix.common.transactions.LogicalLogLocator;
-import edu.uci.ics.asterix.common.transactions.PhysicalLogLocator;
-import edu.uci.ics.asterix.transaction.management.service.logging.IndexResourceManager;
import edu.uci.ics.asterix.transaction.management.service.logging.LogManager;
import edu.uci.ics.asterix.transaction.management.service.logging.LogType;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
index 4085964..06f5399 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
@@ -28,8 +28,6 @@
import edu.uci.ics.asterix.common.transactions.ITransactionContext;
import edu.uci.ics.asterix.common.transactions.ITransactionManager;
import edu.uci.ics.asterix.common.transactions.JobId;
-import edu.uci.ics.asterix.common.transactions.LogUtil;
-import edu.uci.ics.asterix.common.transactions.LogicalLogLocator;
import edu.uci.ics.asterix.common.transactions.MutableLong;
import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallback;
import edu.uci.ics.asterix.transaction.management.service.logging.LogRecord;
diff --git a/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/logging/test/TransactionWorkloadSimulator.java b/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/logging/test/TransactionWorkloadSimulator.java
deleted file mode 100644
index a74b079..0000000
--- a/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/logging/test/TransactionWorkloadSimulator.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.transaction.management.logging.test;
-
-import java.util.Random;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import edu.uci.ics.asterix.common.config.AsterixPropertiesAccessor;
-import edu.uci.ics.asterix.common.config.AsterixTransactionProperties;
-import edu.uci.ics.asterix.common.exceptions.ACIDException;
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.common.transactions.DatasetId;
-import edu.uci.ics.asterix.common.transactions.ILockManager;
-import edu.uci.ics.asterix.common.transactions.ILogManager;
-import edu.uci.ics.asterix.common.transactions.ILogRecordHelper;
-import edu.uci.ics.asterix.common.transactions.IResourceManager;
-import edu.uci.ics.asterix.common.transactions.IResourceManager.ResourceType;
-import edu.uci.ics.asterix.common.transactions.LogUtil;
-import edu.uci.ics.asterix.common.transactions.LogicalLogLocator;
-import edu.uci.ics.asterix.transaction.management.logging.BasicLogger;
-import edu.uci.ics.asterix.transaction.management.service.logging.LogActionType;
-import edu.uci.ics.asterix.transaction.management.service.logging.LogType;
-import edu.uci.ics.asterix.transaction.management.service.transaction.JobIdFactory;
-import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
-import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
-
-public class TransactionWorkloadSimulator {
-
- public static ILogManager logManager;
- public static ILockManager lockManager;
- TransactionSubsystem provider;
-
- public static WorkloadProperties workload;
- Transaction[] transactions;
-
- public TransactionWorkloadSimulator(WorkloadProperties workload) {
- this.workload = workload;
- transactions = new Transaction[workload.numActiveThreads];
- }
-
- public void beginWorkload() throws ACIDException, AsterixException {
- provider = new TransactionSubsystem("nc1", null, new AsterixTransactionProperties(
- new AsterixPropertiesAccessor()));
- logManager = provider.getLogManager();
- lockManager = provider.getLockManager();
- provider.getTransactionalResourceRepository().registerTransactionalResourceManager(DummyResourceMgr.id,
- new DummyResourceMgr());
- Transaction[] transactions = new Transaction[workload.numActiveThreads];
- long startTime = System.nanoTime();
- for (int i = 0; i < workload.numActiveThreads; i++) {
- transactions[i] = new Transaction(provider, "Transaction " + (i + 1), workload.singleTransaction);
- transactions[i].start();
- }
- for (int i = 0; i < workload.numActiveThreads; i++) {
- try {
- transactions[i].join();
- } catch (InterruptedException ignore) {
- }
- }
-
- for (int i = 0; i < workload.numActiveThreads; i++) {
- provider.getTransactionManager().commitTransaction(transactions[i].getContext(), new DatasetId(-1), -1);
- }
-
- long endTime = System.nanoTime();
- int totalLogs = Transaction.logCount.get();
- System.out.println(" Total logs :" + totalLogs);
- long timeTaken = ((endTime - startTime) / 1000000);
- System.out.println(" total time :" + timeTaken);
- System.out.println(" throughput :" + totalLogs * 1000 / timeTaken + " logs/sec");
- long totalBytesWritten = Transaction.logByteCount.get();
- System.out.println(" bytes written :" + totalBytesWritten);
- System.out.println(" IO throughput " + totalBytesWritten * 1000 / timeTaken + " bytes/sec");
- System.out.println(" Avg Content Creation time :" + BasicLogger.getAverageContentCreationTime());
- }
-
- public static void main(String args[]) throws AsterixException {
- WorkloadProperties workload = new WorkloadProperties();
- TransactionWorkloadSimulator simulator = new TransactionWorkloadSimulator(workload);
- try {
- simulator.beginWorkload();
- } catch (ACIDException acide) {
- acide.printStackTrace();
- }
-
- }
-}
-
-class SingleTransactionContextFactory {
- private static TransactionContext context;
-
- public static TransactionContext getContext(TransactionSubsystem provider) throws ACIDException {
- if (context == null) {
- context = new TransactionContext(JobIdFactory.generateJobId(), provider);
- }
- return context;
- }
-}
-
-class MultipleTransactionContextFactory {
-
- public static TransactionContext getContext(TransactionSubsystem provider) throws ACIDException {
- return new TransactionContext(JobIdFactory.generateJobId(), provider);
- }
-}
-
-class Transaction extends Thread {
-
- public static AtomicInteger logCount = new AtomicInteger(0);
- public static AtomicLong logByteCount = new AtomicLong(0);
- Random random = new Random();
- BasicLogger logger = new BasicLogger();
- LogicalLogLocator memLSN;
- String name;
- TransactionContext context;
- //private byte[] resourceID = new byte[1];
- private int resourceID;
- private int myLogCount = 0;
- private TransactionSubsystem transactionProvider;
- private ILogManager logManager;
- private DatasetId tempDatasetId = new DatasetId(-1);
-
- public Transaction(TransactionSubsystem provider, String name, boolean singleTransaction) throws ACIDException {
- this.name = name;
- this.transactionProvider = provider;
- if (singleTransaction) {
- context = SingleTransactionContextFactory.getContext(transactionProvider);
- } else {
- context = MultipleTransactionContextFactory.getContext(transactionProvider);
- }
- memLSN = LogUtil.getDummyLogicalLogLocator(transactionProvider.getLogManager());
- logManager = transactionProvider.getLogManager();
- }
-
- public TransactionContext getContext() {
- return context;
- }
-
- @Override
- public void run() {
- if (TransactionWorkloadSimulator.workload.minLogsPerTransactionThread == TransactionWorkloadSimulator.workload.maxLogsPerTransactionThread) {
- TransactionWorkloadSimulator.workload.maxLogsPerTransactionThread++;
- }
- int numLogs = TransactionWorkloadSimulator.workload.minLogsPerTransactionThread
- + random.nextInt(TransactionWorkloadSimulator.workload.maxLogsPerTransactionThread
- - TransactionWorkloadSimulator.workload.minLogsPerTransactionThread);
- int total = 0;
- LogicalLogLocator memLSN = LogUtil.getDummyLogicalLogLocator(logManager);
- if (TransactionWorkloadSimulator.workload.maxLogSize == TransactionWorkloadSimulator.workload.minLogSize) {
- TransactionWorkloadSimulator.workload.maxLogSize++;
- }
- if (TransactionWorkloadSimulator.workload.singleResource) {
- int choice = random.nextInt(2);
- resourceID = (byte) (choice % 2);
- } else {
- random.nextInt(resourceID);
- }
- boolean retry = false;
- byte lockMode = -1;
- try {
- for (int i = 0; i < numLogs - 1; i++) {
- int logSize = TransactionWorkloadSimulator.workload.minLogSize
- + random.nextInt(TransactionWorkloadSimulator.workload.maxLogSize
- - TransactionWorkloadSimulator.workload.minLogSize);
- total += logSize;
-
- byte logType = LogType.UPDATE;
- byte logActionType = LogActionType.REDO_UNDO;
- long pageId = 0;
- if (!retry) {
- lockMode = (byte) (random.nextInt(2));
- }
- tempDatasetId.setId(resourceID);
- TransactionWorkloadSimulator.lockManager.lock(tempDatasetId, -1, lockMode, context);
- TransactionWorkloadSimulator.logManager.log(logType, context, resourceID, -1, resourceID,
- ResourceType.LSM_BTREE, logSize, null, logger, memLSN);
- retry = false;
- Thread.currentThread().sleep(TransactionWorkloadSimulator.workload.thinkTime);
- logCount.incrementAndGet();
- logByteCount.addAndGet(logSize
- + TransactionWorkloadSimulator.logManager.getLogRecordHelper().getLogHeaderSize(logType)
- + TransactionWorkloadSimulator.logManager.getLogRecordHelper().getLogChecksumSize());
- myLogCount++;
- }
- } catch (ACIDException acide) {
- acide.printStackTrace();
- } catch (Exception ie) {
- ie.printStackTrace();
- }
- }
-
-}
-
-class WorkloadProperties {
- public int numActiveThreads = 200;
- public long thinkTime = 0; // (in mesecs)
- public int minLogsPerTransactionThread = 5;
- public int maxLogsPerTransactionThread = 5;
- public int minLogSize = 1024 - 51;
- public int maxLogSize = 1024 - 51;
- public float commitFraction = 0.5f;
- public float rollbackFraction = 0.1f;
- public boolean singleTransaction = false;
- public boolean singleResource = true;
-}
-
-class ResourceMgrInfo {
- public static final byte BTreeResourceMgrId = 1;
- public static final byte MetadataResourceMgrId = 2;
-}
-
-class DummyResourceMgr implements IResourceManager {
-
- public static final byte id = 1;
-
- @Override
- public void redo(ILogRecordHelper logParser, LogicalLogLocator memLSN) throws ACIDException {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void undo(ILogRecordHelper logParser, LogicalLogLocator memLSN) throws ACIDException {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public byte getResourceManagerId() {
- // TODO Auto-generated method stub
- return 1;
- }
-
-}
\ No newline at end of file
diff --git a/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/LogRecordReader.java b/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/LogRecordReader.java
deleted file mode 100644
index 8e13356..0000000
--- a/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/LogRecordReader.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.transaction.management.test;
-
-import java.io.IOException;
-
-import edu.uci.ics.asterix.common.exceptions.ACIDException;
-import edu.uci.ics.asterix.common.transactions.IBuffer;
-import edu.uci.ics.asterix.common.transactions.ILogCursor;
-import edu.uci.ics.asterix.common.transactions.ILogFilter;
-import edu.uci.ics.asterix.common.transactions.ILogManager;
-import edu.uci.ics.asterix.common.transactions.ILogRecordHelper;
-import edu.uci.ics.asterix.common.transactions.LogUtil;
-import edu.uci.ics.asterix.common.transactions.LogicalLogLocator;
-import edu.uci.ics.asterix.common.transactions.PhysicalLogLocator;
-import edu.uci.ics.asterix.transaction.management.service.logging.LogManager;
-import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
-
-public class LogRecordReader {
-
- ILogManager logManager;
-
- public LogRecordReader(TransactionSubsystem factory) throws ACIDException {
- logManager = factory.getLogManager();
- }
-
- public LogRecordReader(ILogManager logManager) {
- this.logManager = logManager;
- }
-
- public void readLogs(long startingLsn) throws IOException, ACIDException {
- ILogRecordHelper parser = logManager.getLogRecordHelper();
- PhysicalLogLocator lsn = new PhysicalLogLocator(startingLsn, logManager);
- ILogCursor logCursor = logManager.readLog(lsn, new ILogFilter() {
- @Override
- public boolean accept(IBuffer buffer, long startOffset, int length) {
- return true;
- }
- });
- LogicalLogLocator currentLogLocator = LogUtil.getDummyLogicalLogLocator(logManager);
- int logCount = 0;
- while (true) {
- boolean logValidity = logCursor.next(currentLogLocator);
- if (logValidity) {
- System.out.println(++logCount + parser.getLogRecordForDisplay(currentLogLocator));
- } else {
- break;
- }
- }
- }
-
- public void readLogRecord(long lsnValue) throws IOException, ACIDException {
- LogicalLogLocator memLSN = null;
- logManager.readLog(lsnValue, memLSN);
- System.out.println(logManager.getLogRecordHelper().getLogRecordForDisplay(memLSN));
- }
-
- /**
- * @param args
- */
- public static void main(String[] args) throws ACIDException, Exception {
- LogManager logManager = new LogManager(null, "nc1");
- LogRecordReader logReader = new LogRecordReader(logManager);
- logReader.readLogs(0);
- }
-
-}
diff --git a/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/TransactionSimulator.java b/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/TransactionSimulator.java
deleted file mode 100644
index 4bebbc4..0000000
--- a/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/TransactionSimulator.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.transaction.management.test;
-
-import java.io.IOException;
-import java.util.Random;
-
-import edu.uci.ics.asterix.common.config.AsterixPropertiesAccessor;
-import edu.uci.ics.asterix.common.config.AsterixTransactionProperties;
-import edu.uci.ics.asterix.common.exceptions.ACIDException;
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.common.transactions.DatasetId;
-import edu.uci.ics.asterix.common.transactions.ILockManager;
-import edu.uci.ics.asterix.common.transactions.ILogManager;
-import edu.uci.ics.asterix.common.transactions.ILogger;
-import edu.uci.ics.asterix.common.transactions.IRecoveryManager;
-import edu.uci.ics.asterix.common.transactions.IResourceManager;
-import edu.uci.ics.asterix.common.transactions.IResourceManager.ResourceType;
-import edu.uci.ics.asterix.common.transactions.ITransactionContext;
-import edu.uci.ics.asterix.common.transactions.ITransactionManager;
-import edu.uci.ics.asterix.common.transactions.JobId;
-import edu.uci.ics.asterix.common.transactions.LogUtil;
-import edu.uci.ics.asterix.common.transactions.LogicalLogLocator;
-import edu.uci.ics.asterix.transaction.management.logging.IResource;
-import edu.uci.ics.asterix.transaction.management.service.logging.LogType;
-import edu.uci.ics.asterix.transaction.management.service.transaction.JobIdFactory;
-import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
-
-public class TransactionSimulator {
-
- private ITransactionManager transactionManager;
- private ILogManager logManager;
- private ILockManager lockManager;
- private IRecoveryManager recoveryManager;
- private IResourceManager resourceMgr;
- private ILogger logger;
- private IResource resource;
- private LogicalLogLocator memLSN;
- private TransactionSubsystem transactionProvider;
-
- public TransactionSimulator(IResource resource, IResourceManager resourceMgr) throws ACIDException, AsterixException {
- String id = "nc1";
- transactionProvider = new TransactionSubsystem(id, null, new AsterixTransactionProperties(
- new AsterixPropertiesAccessor()));
- transactionManager = transactionProvider.getTransactionManager();
- logManager = transactionProvider.getLogManager();
- lockManager = transactionProvider.getLockManager();
- recoveryManager = transactionProvider.getRecoveryManager();
- transactionProvider.getTransactionalResourceRepository().registerTransactionalResourceManager(
- resourceMgr.getResourceManagerId(), resourceMgr);
- this.resourceMgr = resourceMgr;
- this.logger = resource.getLogger();
- this.resource = resource;
- memLSN = LogUtil.getDummyLogicalLogLocator(transactionProvider.getLogManager());
- }
-
- public ITransactionContext beginTransaction() throws ACIDException {
- JobId jobId = JobIdFactory.generateJobId();
- return transactionManager.beginTransaction(jobId);
- }
-
- public void executeTransactionOperation(ITransactionContext txnContext, FileResource.CounterOperation operation)
- throws ACIDException {
- // lockManager.lock(txnContext, resourceId, 0);
- ILogManager logManager = transactionProvider.getLogManager();
- int currentValue = ((FileResource) resource).getMemoryCounter();
- int finalValue;
- switch (operation) {
- case INCREMENT:
- finalValue = currentValue + 1;
- int logRecordLength = ((FileLogger) logger).generateLogRecordContent(currentValue, finalValue);
- logManager.log(LogType.UPDATE, txnContext, 1, -1, 1, ResourceType.LSM_BTREE, 0, null, logger, memLSN);
- ((FileResource) resource).increment();
- break;
- case DECREMENT:
- finalValue = currentValue - 1;
- logRecordLength = ((FileLogger) logger).generateLogRecordContent(currentValue, finalValue);
- logManager.log(LogType.UPDATE, txnContext, 1, -1, 1, ResourceType.LSM_BTREE, 0, null, logger, memLSN);
- ((FileResource) resource).decrement();
- break;
- }
-
- }
-
- public void commitTransaction(ITransactionContext context) throws ACIDException {
- transactionManager.commitTransaction(context, new DatasetId(-1), -1);
- }
-
- public void recover() throws ACIDException, IOException {
- recoveryManager.startRecovery(true);
- ((FileResource) resource).sync();
- }
-
- /**
- * @param args
- */
- public static void main(String[] args) throws IOException, ACIDException, AsterixException {
- String fileDir = "testdata";
- String fileName = "counterFile";
- IResource resource = new FileResource(fileDir, fileName);
- FileResourceManager resourceMgr = new FileResourceManager();
- resourceMgr.registerTransactionalResource(resource);
- int existingValue = ((FileResource) resource).getDiskCounter();
-
- TransactionSimulator txnSimulator = new TransactionSimulator(((FileResource) resource), resourceMgr);
- int numTransactions = 2;
- Schedule schedule = new Schedule(numTransactions);
-
- for (int i = 0; i < numTransactions; i++) {
- ITransactionContext context = txnSimulator.beginTransaction();
- txnSimulator.executeTransactionOperation(context, schedule.getOperations()[i]);
- if (schedule.getWillCommit()[i]) {
- txnSimulator.commitTransaction(context);
- }
- }
-
- int finalExpectedValue = existingValue + schedule.getDeltaChange();
- txnSimulator.recover();
- boolean isCorrect = ((FileResource) resource).checkIfValueInSync(finalExpectedValue);
- System.out.println(" Did recovery happen correctly " + isCorrect);
- }
-
-}
-
-class ResourceMgrIds {
-
- public static final byte FileResourceMgrId = 1;
-
-}
-
-class Schedule {
-
- private int numCommittedIncrements;
- private int numCommittedDecrements;
-
- private FileResource.CounterOperation[] operations;
- private Boolean[] willCommit;
-
- public Boolean[] getWillCommit() {
- return willCommit;
- }
-
- private Random random = new Random();
-
- public int getDeltaChange() {
- return numCommittedIncrements - numCommittedDecrements;
- }
-
- public Schedule(int numTransactions) {
- operations = new FileResource.CounterOperation[numTransactions];
- willCommit = new Boolean[numTransactions];
- for (int i = 0; i < numTransactions; i++) {
- willCommit[i] = random.nextBoolean();
- int nextOp = random.nextInt(2);
- FileResource.CounterOperation op = nextOp == 0 ? FileResource.CounterOperation.INCREMENT
- : FileResource.CounterOperation.DECREMENT;
- operations[i] = op;
- if (willCommit[i]) {
- if (op.equals(FileResource.CounterOperation.INCREMENT)) {
- numCommittedIncrements++;
- } else {
- numCommittedDecrements++;
- }
- }
- }
-
- }
-
- public String toString() {
- StringBuilder builder = new StringBuilder();
- for (int i = 0; i < operations.length; i++) {
- builder.append(" operation " + operations[i]);
- if (willCommit[i]) {
- builder.append(" commit ");
- } else {
- builder.append(" abort ");
- }
- }
-
- builder.append(" number of committed increments " + numCommittedIncrements);
- builder.append(" number of committed decrements " + numCommittedDecrements);
- return new String(builder);
- }
-
- public FileResource.CounterOperation[] getOperations() {
- return operations;
- }
-
-}