Remove log buffer factory
Change-Id: I814dac8ae5fc49b88470ab115b17bf023494afe9
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1765
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
index 9f86a26..1e920e7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
@@ -36,10 +36,8 @@
import org.apache.asterix.common.transactions.ITransactionSubsystem;
import org.apache.asterix.common.utils.StorageConstants;
import org.apache.asterix.transaction.management.service.locking.ConcurrentLockManager;
-import org.apache.asterix.transaction.management.service.logging.LogBufferFactory;
import org.apache.asterix.transaction.management.service.logging.LogManager;
import org.apache.asterix.transaction.management.service.logging.LogManagerWithReplication;
-import org.apache.asterix.transaction.management.service.logging.ReplicatingLogBufferFactory;
import org.apache.asterix.transaction.management.service.recovery.CheckpointManagerFactory;
import org.apache.asterix.transaction.management.service.transaction.TransactionManager;
import org.apache.hyracks.api.application.INCServiceContext;
@@ -85,10 +83,9 @@
}
if (replicationEnabled) {
- this.logManager =
- new LogManagerWithReplication(this, ReplicatingLogBufferFactory.INSTANCE, replicationStrategy);
+ this.logManager = new LogManagerWithReplication(this, replicationStrategy);
} else {
- this.logManager = new LogManager(this, LogBufferFactory.INSTANCE);
+ this.logManager = new LogManager(this);
}
this.recoveryManager = new RecoveryManager(this, serviceCtx);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index 4b24108..1770f1b 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -236,7 +236,7 @@
}
private ComparisonException createLineChangedException(File scriptFile, String lineExpected, String lineActual,
- int num) {
+ int num) {
return new ComparisonException("Result for " + scriptFile + " changed at line " + num + ":\n< "
+ truncateIfLong(lineExpected) + "\n> " + truncateIfLong(lineActual));
}
@@ -396,7 +396,7 @@
if (!pattern.matcher(actual.toString()).matches()) {
// figure out where the problem first occurs...
StringBuilder builder = new StringBuilder();
- String [] lines = expected.toString().split("\\n");
+ String[] lines = expected.toString().split("\\n");
int endOfMatch = 0;
final StringBuffer actualBuffer = actual.getBuffer();
for (int i = 0; i < lines.length; i++) {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBufferFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBufferFactory.java
deleted file mode 100644
index 4a88dcc..0000000
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBufferFactory.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.transactions;
-
-@FunctionalInterface
-public interface ILogBufferFactory {
-
- /**
- * Create a log buffer
- *
- * @param txnSubsystem
- * the transaction subsystem
- * @param logPageSize
- * the default log page size
- * @param flushLsn
- * a mutable long used to communicate progress
- * @return a in instance of ILogBuffer
- */
- ILogBuffer create(ITransactionSubsystem txnSubsystem, int logPageSize, MutableLong flushLsn);
-}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
index 831bace..081cf02 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
@@ -88,24 +88,32 @@
////////////////////////////////////
@Override
- public void append(ILogRecord logRecord, long appendLSN) {
+ public void append(ILogRecord logRecord, long appendLsn) {
logRecord.writeLogRecord(appendBuffer);
- if (logRecord.getLogType() != LogType.FLUSH && logRecord.getLogType() != LogType.WAIT) {
- logRecord.getTxnCtx().setLastLSN(appendLSN);
+
+ if (logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLogType() != LogType.FLUSH
+ && logRecord.getLogType() != LogType.WAIT) {
+ logRecord.getTxnCtx().setLastLSN(appendLsn);
}
+
synchronized (this) {
appendOffset += logRecord.getLogSize();
if (IS_DEBUG_MODE) {
LOGGER.info("append()| appendOffset: " + appendOffset);
}
- if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT
- || logRecord.getLogType() == LogType.WAIT) {
- logRecord.isFlushed(false);
- syncCommitQ.offer(logRecord);
- }
- if (logRecord.getLogType() == LogType.FLUSH) {
- logRecord.isFlushed(false);
- flushQ.offer(logRecord);
+ if (logRecord.getLogSource() == LogSource.LOCAL) {
+ if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT
+ || logRecord.getLogType() == LogType.WAIT) {
+ logRecord.isFlushed(false);
+ syncCommitQ.offer(logRecord);
+ }
+ if (logRecord.getLogType() == LogType.FLUSH) {
+ logRecord.isFlushed(false);
+ flushQ.offer(logRecord);
+ }
+ } else if (logRecord.getLogSource() == LogSource.REMOTE
+ && (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT)) {
+ remoteJobsQ.offer(logRecord);
}
this.notify();
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBufferFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBufferFactory.java
deleted file mode 100644
index 27fdfd1..0000000
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBufferFactory.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.transaction.management.service.logging;
-
-import org.apache.asterix.common.transactions.ILogBuffer;
-import org.apache.asterix.common.transactions.ILogBufferFactory;
-import org.apache.asterix.common.transactions.ITransactionSubsystem;
-import org.apache.asterix.common.transactions.MutableLong;
-
-public class LogBufferFactory implements ILogBufferFactory {
- public static final LogBufferFactory INSTANCE = new LogBufferFactory();
-
- private LogBufferFactory() {
- }
-
- @Override
- public ILogBuffer create(ITransactionSubsystem txnSubsystem, int logPageSize, MutableLong flushLsn) {
- return new LogBuffer(txnSubsystem, logPageSize, flushLsn);
- }
-}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index d3e6baf..e5e91e8 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -41,7 +41,6 @@
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.replication.IReplicationManager;
import org.apache.asterix.common.transactions.ILogBuffer;
-import org.apache.asterix.common.transactions.ILogBufferFactory;
import org.apache.asterix.common.transactions.ILogManager;
import org.apache.asterix.common.transactions.ILogReader;
import org.apache.asterix.common.transactions.ILogRecord;
@@ -67,7 +66,6 @@
* Finals
*/
private final ITransactionSubsystem txnSubsystem;
- private final ILogBufferFactory logBufferFactory;
private final LogManagerProperties logManagerProperties;
private final int numLogPages;
private final String logDir;
@@ -91,9 +89,8 @@
private Future<? extends Object> futureLogFlusher;
protected LinkedBlockingQueue<ILogRecord> flushLogsQ;
- public LogManager(ITransactionSubsystem txnSubsystem, ILogBufferFactory logBufferFactory) {
+ public LogManager(ITransactionSubsystem txnSubsystem) {
this.txnSubsystem = txnSubsystem;
- this.logBufferFactory = logBufferFactory;
logManagerProperties =
new LogManagerProperties(this.txnSubsystem.getTransactionProperties(), this.txnSubsystem.getId());
logFileSize = logManagerProperties.getLogPartitionSize();
@@ -114,7 +111,7 @@
flushQ = new LinkedBlockingQueue<>(numLogPages);
stashQ = new LinkedBlockingQueue<>(numLogPages);
for (int i = 0; i < numLogPages; i++) {
- emptyQ.offer(logBufferFactory.create(txnSubsystem, logPageSize, flushLSN));
+ emptyQ.offer(new LogBuffer(txnSubsystem, logPageSize, flushLSN));
}
appendLSN.set(initializeLogAnchor(nextLogFileId));
flushLSN.set(appendLSN.get());
@@ -208,7 +205,7 @@
}
// for now, alloc a new buffer for each large page
// TODO: pool large pages??
- appendPage = logBufferFactory.create(txnSubsystem, logSize, flushLSN);
+ appendPage = new LogBuffer(txnSubsystem, logSize, flushLSN);
appendPage.setFileChannel(appendChannel);
flushQ.offer(appendPage);
} else {
@@ -639,8 +636,7 @@
class LogFlusher implements Callable<Boolean> {
private static final Logger LOGGER = Logger.getLogger(LogFlusher.class.getName());
- private static final ILogBuffer POISON_PILL =
- LogBufferFactory.INSTANCE.create(null, ILogRecord.JOB_TERMINATE_LOG_SIZE, null);
+ private static final ILogBuffer POISON_PILL = new LogBuffer(null, ILogRecord.JOB_TERMINATE_LOG_SIZE, null);
private final LogManager logMgr;//for debugging
private final LinkedBlockingQueue<ILogBuffer> emptyQ;
private final LinkedBlockingQueue<ILogBuffer> flushQ;
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
index f86eea5..dd8fb6e 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
@@ -24,7 +24,6 @@
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.replication.IReplicationManager;
import org.apache.asterix.common.replication.IReplicationStrategy;
-import org.apache.asterix.common.transactions.ILogBufferFactory;
import org.apache.asterix.common.transactions.ILogRecord;
import org.apache.asterix.common.transactions.ITransactionContext;
import org.apache.asterix.common.transactions.ITransactionManager;
@@ -38,9 +37,8 @@
private final IReplicationStrategy replicationStrategy;
private final Set<Integer> replicatedJob = ConcurrentHashMap.newKeySet();
- public LogManagerWithReplication(ITransactionSubsystem txnSubsystem, ILogBufferFactory logBufferFactory,
- IReplicationStrategy replicationStrategy) {
- super(txnSubsystem, logBufferFactory);
+ public LogManagerWithReplication(ITransactionSubsystem txnSubsystem, IReplicationStrategy replicationStrategy) {
+ super(txnSubsystem);
this.replicationStrategy = replicationStrategy;
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/ReplicatingLogBuffer.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/ReplicatingLogBuffer.java
deleted file mode 100644
index 0eceb2e..0000000
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/ReplicatingLogBuffer.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.transaction.management.service.logging;
-
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.transactions.ILogRecord;
-import org.apache.asterix.common.transactions.ITransactionSubsystem;
-import org.apache.asterix.common.transactions.LogSource;
-import org.apache.asterix.common.transactions.LogType;
-import org.apache.asterix.common.transactions.MutableLong;
-
-public class ReplicatingLogBuffer extends LogBuffer {
- private static final Logger LOGGER = Logger.getLogger(ReplicatingLogBuffer.class.getName());
-
- public ReplicatingLogBuffer(ITransactionSubsystem txnSubsystem, int logPageSize, MutableLong flushLsn) {
- super(txnSubsystem, logPageSize, flushLsn);
- }
-
- @Override
- public void append(ILogRecord logRecord, long appendLsn) {
- logRecord.writeLogRecord(appendBuffer);
-
- if (logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLogType() != LogType.FLUSH
- && logRecord.getLogType() != LogType.WAIT) {
- logRecord.getTxnCtx().setLastLSN(appendLsn);
- }
-
- synchronized (this) {
- appendOffset += logRecord.getLogSize();
- if (IS_DEBUG_MODE) {
- LOGGER.info("append()| appendOffset: " + appendOffset);
- }
- if (logRecord.getLogSource() == LogSource.LOCAL) {
- if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT
- || logRecord.getLogType() == LogType.WAIT) {
- logRecord.isFlushed(false);
- syncCommitQ.offer(logRecord);
- }
- if (logRecord.getLogType() == LogType.FLUSH) {
- logRecord.isFlushed(false);
- flushQ.offer(logRecord);
- }
- } else if (logRecord.getLogSource() == LogSource.REMOTE
- && (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT)) {
- remoteJobsQ.offer(logRecord);
- }
- this.notify();
- }
- }
-
-}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/ReplicatingLogBufferFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/ReplicatingLogBufferFactory.java
deleted file mode 100644
index 2abc474..0000000
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/ReplicatingLogBufferFactory.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.transaction.management.service.logging;
-
-import org.apache.asterix.common.transactions.ILogBuffer;
-import org.apache.asterix.common.transactions.ILogBufferFactory;
-import org.apache.asterix.common.transactions.ITransactionSubsystem;
-import org.apache.asterix.common.transactions.MutableLong;
-
-public class ReplicatingLogBufferFactory implements ILogBufferFactory {
- public static final ReplicatingLogBufferFactory INSTANCE = new ReplicatingLogBufferFactory();
-
- private ReplicatingLogBufferFactory() {
-
- }
-
- @Override
- public ILogBuffer create(ITransactionSubsystem txnSubsystem, int logPageSize, MutableLong flushLsn) {
- return new ReplicatingLogBuffer(txnSubsystem, logPageSize, flushLsn);
- }
-}