Remove log buffer factory

Change-Id: I814dac8ae5fc49b88470ab115b17bf023494afe9
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1765
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
index 9f86a26..1e920e7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
@@ -36,10 +36,8 @@
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.asterix.transaction.management.service.locking.ConcurrentLockManager;
-import org.apache.asterix.transaction.management.service.logging.LogBufferFactory;
 import org.apache.asterix.transaction.management.service.logging.LogManager;
 import org.apache.asterix.transaction.management.service.logging.LogManagerWithReplication;
-import org.apache.asterix.transaction.management.service.logging.ReplicatingLogBufferFactory;
 import org.apache.asterix.transaction.management.service.recovery.CheckpointManagerFactory;
 import org.apache.asterix.transaction.management.service.transaction.TransactionManager;
 import org.apache.hyracks.api.application.INCServiceContext;
@@ -85,10 +83,9 @@
         }
 
         if (replicationEnabled) {
-            this.logManager =
-                    new LogManagerWithReplication(this, ReplicatingLogBufferFactory.INSTANCE, replicationStrategy);
+            this.logManager = new LogManagerWithReplication(this, replicationStrategy);
         } else {
-            this.logManager = new LogManager(this, LogBufferFactory.INSTANCE);
+            this.logManager = new LogManager(this);
         }
         this.recoveryManager = new RecoveryManager(this, serviceCtx);
 
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index 4b24108..1770f1b 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -236,7 +236,7 @@
     }
 
     private ComparisonException createLineChangedException(File scriptFile, String lineExpected, String lineActual,
-                                                           int num) {
+            int num) {
         return new ComparisonException("Result for " + scriptFile + " changed at line " + num + ":\n< "
                 + truncateIfLong(lineExpected) + "\n> " + truncateIfLong(lineActual));
     }
@@ -396,7 +396,7 @@
         if (!pattern.matcher(actual.toString()).matches()) {
             // figure out where the problem first occurs...
             StringBuilder builder = new StringBuilder();
-            String [] lines = expected.toString().split("\\n");
+            String[] lines = expected.toString().split("\\n");
             int endOfMatch = 0;
             final StringBuffer actualBuffer = actual.getBuffer();
             for (int i = 0; i < lines.length; i++) {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBufferFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBufferFactory.java
deleted file mode 100644
index 4a88dcc..0000000
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBufferFactory.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.transactions;
-
-@FunctionalInterface
-public interface ILogBufferFactory {
-
-    /**
-     * Create a log buffer
-     *
-     * @param txnSubsystem
-     *            the transaction subsystem
-     * @param logPageSize
-     *            the default log page size
-     * @param flushLsn
-     *            a mutable long used to communicate progress
-     * @return a in instance of ILogBuffer
-     */
-    ILogBuffer create(ITransactionSubsystem txnSubsystem, int logPageSize, MutableLong flushLsn);
-}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
index 831bace..081cf02 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
@@ -88,24 +88,32 @@
     ////////////////////////////////////
 
     @Override
-    public void append(ILogRecord logRecord, long appendLSN) {
+    public void append(ILogRecord logRecord, long appendLsn) {
         logRecord.writeLogRecord(appendBuffer);
-        if (logRecord.getLogType() != LogType.FLUSH && logRecord.getLogType() != LogType.WAIT) {
-            logRecord.getTxnCtx().setLastLSN(appendLSN);
+
+        if (logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLogType() != LogType.FLUSH
+                && logRecord.getLogType() != LogType.WAIT) {
+            logRecord.getTxnCtx().setLastLSN(appendLsn);
         }
+
         synchronized (this) {
             appendOffset += logRecord.getLogSize();
             if (IS_DEBUG_MODE) {
                 LOGGER.info("append()| appendOffset: " + appendOffset);
             }
-            if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT
-                    || logRecord.getLogType() == LogType.WAIT) {
-                logRecord.isFlushed(false);
-                syncCommitQ.offer(logRecord);
-            }
-            if (logRecord.getLogType() == LogType.FLUSH) {
-                logRecord.isFlushed(false);
-                flushQ.offer(logRecord);
+            if (logRecord.getLogSource() == LogSource.LOCAL) {
+                if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT
+                        || logRecord.getLogType() == LogType.WAIT) {
+                    logRecord.isFlushed(false);
+                    syncCommitQ.offer(logRecord);
+                }
+                if (logRecord.getLogType() == LogType.FLUSH) {
+                    logRecord.isFlushed(false);
+                    flushQ.offer(logRecord);
+                }
+            } else if (logRecord.getLogSource() == LogSource.REMOTE
+                    && (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT)) {
+                remoteJobsQ.offer(logRecord);
             }
             this.notify();
         }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBufferFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBufferFactory.java
deleted file mode 100644
index 27fdfd1..0000000
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBufferFactory.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.transaction.management.service.logging;
-
-import org.apache.asterix.common.transactions.ILogBuffer;
-import org.apache.asterix.common.transactions.ILogBufferFactory;
-import org.apache.asterix.common.transactions.ITransactionSubsystem;
-import org.apache.asterix.common.transactions.MutableLong;
-
-public class LogBufferFactory implements ILogBufferFactory {
-    public static final LogBufferFactory INSTANCE = new LogBufferFactory();
-
-    private LogBufferFactory() {
-    }
-
-    @Override
-    public ILogBuffer create(ITransactionSubsystem txnSubsystem, int logPageSize, MutableLong flushLsn) {
-        return new LogBuffer(txnSubsystem, logPageSize, flushLsn);
-    }
-}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index d3e6baf..e5e91e8 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -41,7 +41,6 @@
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.replication.IReplicationManager;
 import org.apache.asterix.common.transactions.ILogBuffer;
-import org.apache.asterix.common.transactions.ILogBufferFactory;
 import org.apache.asterix.common.transactions.ILogManager;
 import org.apache.asterix.common.transactions.ILogReader;
 import org.apache.asterix.common.transactions.ILogRecord;
@@ -67,7 +66,6 @@
      * Finals
      */
     private final ITransactionSubsystem txnSubsystem;
-    private final ILogBufferFactory logBufferFactory;
     private final LogManagerProperties logManagerProperties;
     private final int numLogPages;
     private final String logDir;
@@ -91,9 +89,8 @@
     private Future<? extends Object> futureLogFlusher;
     protected LinkedBlockingQueue<ILogRecord> flushLogsQ;
 
-    public LogManager(ITransactionSubsystem txnSubsystem, ILogBufferFactory logBufferFactory) {
+    public LogManager(ITransactionSubsystem txnSubsystem) {
         this.txnSubsystem = txnSubsystem;
-        this.logBufferFactory = logBufferFactory;
         logManagerProperties =
                 new LogManagerProperties(this.txnSubsystem.getTransactionProperties(), this.txnSubsystem.getId());
         logFileSize = logManagerProperties.getLogPartitionSize();
@@ -114,7 +111,7 @@
         flushQ = new LinkedBlockingQueue<>(numLogPages);
         stashQ = new LinkedBlockingQueue<>(numLogPages);
         for (int i = 0; i < numLogPages; i++) {
-            emptyQ.offer(logBufferFactory.create(txnSubsystem, logPageSize, flushLSN));
+            emptyQ.offer(new LogBuffer(txnSubsystem, logPageSize, flushLSN));
         }
         appendLSN.set(initializeLogAnchor(nextLogFileId));
         flushLSN.set(appendLSN.get());
@@ -208,7 +205,7 @@
             }
             // for now, alloc a new buffer for each large page
             // TODO: pool large pages??
-            appendPage = logBufferFactory.create(txnSubsystem, logSize, flushLSN);
+            appendPage = new LogBuffer(txnSubsystem, logSize, flushLSN);
             appendPage.setFileChannel(appendChannel);
             flushQ.offer(appendPage);
         } else {
@@ -639,8 +636,7 @@
 
 class LogFlusher implements Callable<Boolean> {
     private static final Logger LOGGER = Logger.getLogger(LogFlusher.class.getName());
-    private static final ILogBuffer POISON_PILL =
-            LogBufferFactory.INSTANCE.create(null, ILogRecord.JOB_TERMINATE_LOG_SIZE, null);
+    private static final ILogBuffer POISON_PILL = new LogBuffer(null, ILogRecord.JOB_TERMINATE_LOG_SIZE, null);
     private final LogManager logMgr;//for debugging
     private final LinkedBlockingQueue<ILogBuffer> emptyQ;
     private final LinkedBlockingQueue<ILogBuffer> flushQ;
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
index f86eea5..dd8fb6e 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
@@ -24,7 +24,6 @@
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.replication.IReplicationManager;
 import org.apache.asterix.common.replication.IReplicationStrategy;
-import org.apache.asterix.common.transactions.ILogBufferFactory;
 import org.apache.asterix.common.transactions.ILogRecord;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionManager;
@@ -38,9 +37,8 @@
     private final IReplicationStrategy replicationStrategy;
     private final Set<Integer> replicatedJob = ConcurrentHashMap.newKeySet();
 
-    public LogManagerWithReplication(ITransactionSubsystem txnSubsystem, ILogBufferFactory logBufferFactory,
-            IReplicationStrategy replicationStrategy) {
-        super(txnSubsystem, logBufferFactory);
+    public LogManagerWithReplication(ITransactionSubsystem txnSubsystem, IReplicationStrategy replicationStrategy) {
+        super(txnSubsystem);
         this.replicationStrategy = replicationStrategy;
     }
 
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/ReplicatingLogBuffer.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/ReplicatingLogBuffer.java
deleted file mode 100644
index 0eceb2e..0000000
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/ReplicatingLogBuffer.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.transaction.management.service.logging;
-
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.transactions.ILogRecord;
-import org.apache.asterix.common.transactions.ITransactionSubsystem;
-import org.apache.asterix.common.transactions.LogSource;
-import org.apache.asterix.common.transactions.LogType;
-import org.apache.asterix.common.transactions.MutableLong;
-
-public class ReplicatingLogBuffer extends LogBuffer {
-    private static final Logger LOGGER = Logger.getLogger(ReplicatingLogBuffer.class.getName());
-
-    public ReplicatingLogBuffer(ITransactionSubsystem txnSubsystem, int logPageSize, MutableLong flushLsn) {
-        super(txnSubsystem, logPageSize, flushLsn);
-    }
-
-    @Override
-    public void append(ILogRecord logRecord, long appendLsn) {
-        logRecord.writeLogRecord(appendBuffer);
-
-        if (logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLogType() != LogType.FLUSH
-                && logRecord.getLogType() != LogType.WAIT) {
-            logRecord.getTxnCtx().setLastLSN(appendLsn);
-        }
-
-        synchronized (this) {
-            appendOffset += logRecord.getLogSize();
-            if (IS_DEBUG_MODE) {
-                LOGGER.info("append()| appendOffset: " + appendOffset);
-            }
-            if (logRecord.getLogSource() == LogSource.LOCAL) {
-                if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT
-                        || logRecord.getLogType() == LogType.WAIT) {
-                    logRecord.isFlushed(false);
-                    syncCommitQ.offer(logRecord);
-                }
-                if (logRecord.getLogType() == LogType.FLUSH) {
-                    logRecord.isFlushed(false);
-                    flushQ.offer(logRecord);
-                }
-            } else if (logRecord.getLogSource() == LogSource.REMOTE
-                    && (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT)) {
-                remoteJobsQ.offer(logRecord);
-            }
-            this.notify();
-        }
-    }
-
-}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/ReplicatingLogBufferFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/ReplicatingLogBufferFactory.java
deleted file mode 100644
index 2abc474..0000000
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/ReplicatingLogBufferFactory.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.transaction.management.service.logging;
-
-import org.apache.asterix.common.transactions.ILogBuffer;
-import org.apache.asterix.common.transactions.ILogBufferFactory;
-import org.apache.asterix.common.transactions.ITransactionSubsystem;
-import org.apache.asterix.common.transactions.MutableLong;
-
-public class ReplicatingLogBufferFactory implements ILogBufferFactory {
-    public static final ReplicatingLogBufferFactory INSTANCE = new ReplicatingLogBufferFactory();
-
-    private ReplicatingLogBufferFactory() {
-
-    }
-
-    @Override
-    public ILogBuffer create(ITransactionSubsystem txnSubsystem, int logPageSize, MutableLong flushLsn) {
-        return new ReplicatingLogBuffer(txnSubsystem, logPageSize, flushLsn);
-    }
-}