Add Checkpoint Test
This change adds a unit test case which validates that
checkpoints do not delete log files that are still required
for recovery, and delete those that are no longer needed.
Change-Id: I4cb4743fe488deb5ad10f65604adc2231948795e
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1270
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ian Maxon <imaxon@apache.org>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index a806532..88ca736 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -32,11 +32,8 @@
import org.apache.asterix.common.context.TransactionSubsystemProvider;
import org.apache.asterix.common.dataflow.AsterixLSMInsertDeleteOperatorNodePushable;
import org.apache.asterix.common.dataflow.AsterixLSMTreeInsertDeleteOperatorDescriptor;
-import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
-import org.apache.asterix.runtime.util.AsterixRuntimeComponentsProvider;
import org.apache.asterix.common.transactions.ITransactionManager;
import org.apache.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
@@ -46,6 +43,7 @@
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
+import org.apache.asterix.runtime.util.AsterixRuntimeComponentsProvider;
import org.apache.asterix.test.runtime.ExecutionTestUtil;
import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallbackFactory;
import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerProvider;
@@ -67,7 +65,6 @@
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.util.HyracksConstants;
@@ -115,8 +112,12 @@
private JobId jobId;
private long jobCounter = 0L;
private IHyracksJobletContext jobletCtx;
+ private final String testConfigFileName;
+ private final boolean runHDFS;
- public TestNodeController() throws AsterixException, HyracksException, ACIDException {
+ public TestNodeController(String testConfigFileName, boolean runHDFS) {
+ this.testConfigFileName = testConfigFileName;
+ this.runHDFS = runHDFS;
}
public void init() throws Exception {
@@ -125,7 +126,9 @@
outdir.mkdirs();
// remove library directory
TestLibrarian.removeLibraryDir();
- ExecutionTestUtil.setUp(cleanupOnStart);
+ ExecutionTestUtil.setUp(cleanupOnStart,
+ testConfigFileName == null ? TEST_CONFIG_FILE_NAME : testConfigFileName,
+ ExecutionTestUtil.integrationUtil, runHDFS);
} catch (Throwable th) {
th.printStackTrace();
throw th;
@@ -299,7 +302,7 @@
PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType,
mergePolicyFactory, mergePolicyProperties, filterFields);
TreeIndexCreateOperatorDescriptor indexOpDesc = getIndexCreateOpDesc(primaryIndexInfo);
- return getPrimaryIndexDataflowHelper(createTestContext(), primaryIndexInfo, indexOpDesc);
+ return getPrimaryIndexDataflowHelper(createTestContext(true), primaryIndexInfo, indexOpDesc);
}
public void createPrimaryIndex(Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType,
@@ -308,7 +311,7 @@
PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType,
mergePolicyFactory, mergePolicyProperties, filterFields);
TreeIndexCreateOperatorDescriptor indexOpDesc = getIndexCreateOpDesc(primaryIndexInfo);
- LSMBTreeDataflowHelper dataflowHelper = getPrimaryIndexDataflowHelper(createTestContext(), primaryIndexInfo,
+ LSMBTreeDataflowHelper dataflowHelper = getPrimaryIndexDataflowHelper(createTestContext(true), primaryIndexInfo,
indexOpDesc);
dataflowHelper.create();
}
@@ -359,9 +362,11 @@
return primaryIndexTypeTraits;
}
- public IHyracksTaskContext createTestContext() throws HyracksDataException {
+ public IHyracksTaskContext createTestContext(boolean withMessaging) throws HyracksDataException {
IHyracksTaskContext ctx = TestUtils.create(KB32);
- TaskUtils.putInSharedMap(HyracksConstants.KEY_MESSAGE, new VSizeFrame(ctx), ctx);
+ if (withMessaging) {
+ TaskUtils.putInSharedMap(HyracksConstants.KEY_MESSAGE, new VSizeFrame(ctx), ctx);
+ }
ctx = Mockito.spy(ctx);
Mockito.when(ctx.getJobletContext()).thenReturn(jobletCtx);
Mockito.when(ctx.getIOManager())
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestHelper.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestHelper.java
index 5661258..c1399fb 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestHelper.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestHelper.java
@@ -19,6 +19,7 @@
package org.apache.asterix.test.common;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -28,11 +29,22 @@
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+import javax.xml.bind.Unmarshaller;
+
+import org.apache.asterix.common.configuration.AsterixConfiguration;
+import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.commons.compress.utils.IOUtils;
+import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
public final class TestHelper {
+ private static final String TEST_DIR_BASE_PATH = System.getProperty("user.dir") + File.separator + "target";
+ private static final String[] TEST_DIRS = new String[] { "txnLogDir", "IODevice", "spill_area", "config" };
+
public static boolean isInPrefixList(List<String> prefixList, String s) {
for (String s2 : prefixList) {
if (s.startsWith(s2)) {
@@ -52,11 +64,11 @@
Enumeration<? extends ZipEntry> entries = zipFile.entries();
while (entries.hasMoreElements()) {
ZipEntry entry = entries.nextElement();
- File entryDestination = new File(outputDir, entry.getName());
+ File entryDestination = new File(outputDir, entry.getName());
if (!entry.isDirectory()) {
entryDestination.getParentFile().mkdirs();
try (InputStream in = zipFile.getInputStream(entry);
- OutputStream out = new FileOutputStream(entryDestination)) {
+ OutputStream out = new FileOutputStream(entryDestination)) {
IOUtils.copy(in, out);
}
}
@@ -72,4 +84,43 @@
}
}
}
-}
+
+ public static AsterixConfiguration getConfigurations(String fileName)
+ throws IOException, JAXBException, AsterixException {
+ try (InputStream is = TestHelper.class.getClassLoader().getResourceAsStream(fileName)) {
+ if (is != null) {
+ JAXBContext ctx = JAXBContext.newInstance(AsterixConfiguration.class);
+ Unmarshaller unmarshaller = ctx.createUnmarshaller();
+ return (AsterixConfiguration) unmarshaller.unmarshal(is);
+ } else {
+ throw new AsterixException("Could not find configuration file " + fileName);
+ }
+ }
+ }
+
+ public static void writeConfigurations(AsterixConfiguration ac, String fileName)
+ throws FileNotFoundException, IOException, JAXBException {
+ File configFile = new File(fileName);
+ if (!configFile.exists()) {
+ configFile.getParentFile().mkdirs();
+ configFile.createNewFile();
+ } else {
+ configFile.delete();
+ }
+ try (FileOutputStream os = new FileOutputStream(fileName)) {
+ JAXBContext ctx = JAXBContext.newInstance(AsterixConfiguration.class);
+ Marshaller marshaller = ctx.createMarshaller();
+ marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);
+ marshaller.marshal(ac, os);
+ }
+ }
+
+ public static void deleteExistingInstanceFiles() {
+ for (String dirName : TEST_DIRS) {
+ File f = new File(joinPath(TEST_DIR_BASE_PATH, dirName));
+ if (FileUtils.deleteQuietly(f)) {
+ System.out.println("Dir " + f.getName() + " deleted");
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
index a0ef31e..f41666d 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
@@ -18,7 +18,6 @@
*/
package org.apache.asterix.test.dataflow;
-import java.io.File;
import java.util.Collection;
import java.util.Collections;
@@ -27,7 +26,6 @@
import org.apache.asterix.app.data.gen.TupleGenerator;
import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.common.dataflow.AsterixLSMInsertDeleteOperatorNodePushable;
import org.apache.asterix.common.transactions.DatasetId;
import org.apache.asterix.common.transactions.ILogRecord;
@@ -39,8 +37,8 @@
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.test.common.TestHelper;
import org.apache.asterix.transaction.management.service.logging.LogReader;
-import org.apache.commons.io.FileUtils;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -63,12 +61,11 @@
public class LogMarkerTest {
- private static final String TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml";
private static final IAType[] KEY_TYPES = { BuiltinType.AINT32 };
private static final ARecordType RECORD_TYPE = new ARecordType("TestRecordType", new String[] { "key", "value" },
new IAType[] { BuiltinType.AINT32, BuiltinType.AINT64 }, false);
- private static final GenerationFunction[] RECORD_GEN_FUNCTION =
- { GenerationFunction.DETERMINISTIC, GenerationFunction.DETERMINISTIC };
+ private static final GenerationFunction[] RECORD_GEN_FUNCTION = { GenerationFunction.DETERMINISTIC,
+ GenerationFunction.DETERMINISTIC };
private static final boolean[] UNIQUE_RECORD_FIELDS = { true, false };
private static final ARecordType META_TYPE = null;
private static final GenerationFunction[] META_GEN_FUNCTION = null;
@@ -78,7 +75,6 @@
private static final int NUM_OF_RECORDS = 100000;
private static final int SNAPSHOT_SIZE = 1000;
private static final int DATASET_ID = 101;
- private static final String SPILL_AREA = "target" + File.separator + "spill_area";
private static final String DATAVERSE_NAME = "TestDV";
private static final String DATASET_NAME = "TestDS";
private static final String DATA_TYPE_NAME = "DUMMY";
@@ -86,37 +82,20 @@
@Before
public void setUp() throws Exception {
- System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, TEST_CONFIG_FILE_NAME);
System.out.println("SetUp: ");
- File f = new File(System.getProperty("user.dir") + File.separator + "target" + File.separator + "txnLogDir");
- FileUtils.deleteQuietly(f);
- System.out.println("Dir " + f.getName() + " deleted");
- f = new File(System.getProperty("user.dir") + File.separator + "target" + File.separator + "IODevice");
- FileUtils.deleteQuietly(f);
- System.out.println("Dir " + f.getName() + " deleted");
- f = new File(System.getProperty("user.dir") + File.separator + SPILL_AREA);
- FileUtils.deleteQuietly(f);
- System.out.println("Dir " + f.getName() + " deleted");
+ TestHelper.deleteExistingInstanceFiles();
}
@After
public void tearDown() throws Exception {
System.out.println("TearDown");
- File f = new File(System.getProperty("user.dir") + File.separator + "target" + File.separator + "txnLogDir");
- FileUtils.deleteQuietly(f);
- System.out.println("Dir " + f.getName() + " deleted");
- f = new File(System.getProperty("user.dir") + File.separator + "target" + File.separator + "IODevice");
- FileUtils.deleteQuietly(f);
- System.out.println("Dir " + f.getName() + " deleted");
- f = new File(System.getProperty("user.dir") + File.separator + SPILL_AREA);
- FileUtils.deleteQuietly(f);
- System.out.println("Dir " + f.getName() + " deleted");
+ TestHelper.deleteExistingInstanceFiles();
}
@Test
public void testInsertWithSnapshot() {
try {
- TestNodeController nc = new TestNodeController();
+ TestNodeController nc = new TestNodeController(null, false);
nc.init();
Dataset dataset = new Dataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME,
NODE_GROUP_NAME, null, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
@@ -125,7 +104,7 @@
try {
nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(), null,
null);
- IHyracksTaskContext ctx = nc.createTestContext();
+ IHyracksTaskContext ctx = nc.createTestContext(true);
nc.newJobId();
ITransactionContext txnCtx = nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(), true);
AsterixLSMInsertDeleteOperatorNodePushable insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES,
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
new file mode 100644
index 0000000..e3932ca
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.logging;
+
+import java.io.File;
+import java.util.Collections;
+
+import org.apache.asterix.app.bootstrap.TestNodeController;
+import org.apache.asterix.app.data.gen.TupleGenerator;
+import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
+import org.apache.asterix.common.config.AsterixTransactionProperties;
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.configuration.AsterixConfiguration;
+import org.apache.asterix.common.configuration.Property;
+import org.apache.asterix.common.dataflow.AsterixLSMInsertDeleteOperatorNodePushable;
+import org.apache.asterix.common.transactions.DatasetId;
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.external.util.DataflowUtils;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningStrategy;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.test.common.TestHelper;
+import org.apache.asterix.transaction.management.service.logging.LogManager;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
+import org.apache.hyracks.util.StorageUtil;
+import org.apache.hyracks.util.StorageUtil.StorageUnit;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CheckpointingTest {
+
+ private static final String DEFAULT_TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml";
+ private static final String TEST_CONFIG_FILE_NAME = "asterix-test-configuration.xml";
+ private static final String TEST_CONFIG_PATH = System.getProperty("user.dir") + File.separator + "target"
+ + File.separator + "config";
+ private static final String TEST_CONFIG_FILE_PATH = TEST_CONFIG_PATH + File.separator + TEST_CONFIG_FILE_NAME;
+ private static final IAType[] KEY_TYPES = { BuiltinType.AINT32 };
+ private static final ARecordType RECORD_TYPE = new ARecordType("TestRecordType", new String[] { "key", "value" },
+ new IAType[] { BuiltinType.AINT32, BuiltinType.AINT64 }, false);
+ private static final GenerationFunction[] RECORD_GEN_FUNCTION = { GenerationFunction.DETERMINISTIC,
+ GenerationFunction.DETERMINISTIC };
+ private static final boolean[] UNIQUE_RECORD_FIELDS = { true, false };
+ private static final ARecordType META_TYPE = null;
+ private static final GenerationFunction[] META_GEN_FUNCTION = null;
+ private static final boolean[] UNIQUE_META_FIELDS = null;
+ private static final int[] KEY_INDEXES = { 0 };
+ private static final int[] KEY_INDICATORS = { 0 };
+ private static final int DATASET_ID = 101;
+ private static final String DATAVERSE_NAME = "TestDV";
+ private static final String DATASET_NAME = "TestDS";
+ private static final String DATA_TYPE_NAME = "DUMMY";
+ private static final String NODE_GROUP_NAME = "DEFAULT";
+ private static final int TXN_LOG_PARTITION_SIZE = StorageUtil.getSizeInBytes(2, StorageUnit.MEGABYTE);
+
+ @Before
+ public void setUp() throws Exception {
+ System.out.println("SetUp: ");
+ TestHelper.deleteExistingInstanceFiles();
+ // Read default test configurations
+ AsterixConfiguration ac = TestHelper.getConfigurations(DEFAULT_TEST_CONFIG_FILE_NAME);
+ // Set log file size to 2MB
+ ac.getProperty().add(new Property(AsterixTransactionProperties.TXN_LOG_PARTITIONSIZE_KEY,
+ String.valueOf(TXN_LOG_PARTITION_SIZE), ""));
+ // Disable checkpointing by making checkpoint thread wait max wait time
+ ac.getProperty().add(new Property(AsterixTransactionProperties.TXN_LOG_CHECKPOINT_POLLFREQUENCY_KEY,
+ String.valueOf(Integer.MAX_VALUE), ""));
+ // Write test config file
+ TestHelper.writeConfigurations(ac, TEST_CONFIG_FILE_PATH);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ System.out.println("TearDown");
+ TestHelper.deleteExistingInstanceFiles();
+ }
+
+ @Test
+ public void testDeleteOldLogFiles() {
+ try {
+ TestNodeController nc = new TestNodeController(new File(TEST_CONFIG_FILE_PATH).getAbsolutePath(), false);
+ nc.init();
+ Dataset dataset = new Dataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME,
+ NODE_GROUP_NAME, null, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
+ Collections.emptyList(), null, null, null, false, null, false),
+ null, DatasetType.INTERNAL, DATASET_ID, 0);
+ try {
+ nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(), null,
+ null);
+ IHyracksTaskContext ctx = nc.createTestContext(false);
+ nc.newJobId();
+ ITransactionContext txnCtx = nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(), true);
+ // Prepare insert operation
+ AsterixLSMInsertDeleteOperatorNodePushable insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES,
+ RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(), null, null);
+ insertOp.open();
+ TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
+ RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+ VSizeFrame frame = new VSizeFrame(ctx);
+ FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
+
+ LogManager logManager = (LogManager) nc.getTransactionSubsystem().getLogManager();
+ // Number of log files after node startup should be one
+ int numberOfLogFiles = logManager.getLogFileIds().size();
+ Assert.assertEquals(1, numberOfLogFiles);
+
+ // Low-water mark LSN
+ long lowWaterMarkLSN = nc.getTransactionSubsystem().getRecoveryManager().getMinFirstLSN();
+ // Low-water mark log file id
+ long initialLowWaterMarkFileId = logManager.getLogFileId(lowWaterMarkLSN);
+ // Initial Low-water mark should be in the only available log file
+ Assert.assertEquals(initialLowWaterMarkFileId, logManager.getLogFileIds().get(0).longValue());
+
+ // Insert records until a new log file is created
+ while (logManager.getLogFileIds().size() == 1) {
+ ITupleReference tuple = tupleGenerator.next();
+ DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
+ }
+
+ // Check if the new low-water mark is still in the initial low-water mark log file
+ lowWaterMarkLSN = nc.getTransactionSubsystem().getRecoveryManager().getMinFirstLSN();
+ long currentLowWaterMarkLogFileId = logManager.getLogFileId(lowWaterMarkLSN);
+
+ if (currentLowWaterMarkLogFileId == initialLowWaterMarkFileId) {
+ /*
+ * Make sure checkpoint will not delete the initial log file since
+ * the low-water mark is still in it (i.e. it is still required for
+ * recovery)
+ */
+ int numberOfLogFilesBeforeCheckpoint = logManager.getLogFileIds().size();
+ nc.getTransactionSubsystem().getRecoveryManager().checkpoint(false, logManager.getAppendLSN());
+ int numberOfLogFilesAfterCheckpoint = logManager.getLogFileIds().size();
+ Assert.assertEquals(numberOfLogFilesBeforeCheckpoint, numberOfLogFilesAfterCheckpoint);
+
+ /*
+ * Insert records until the low-water mark is not in the initialLowWaterMarkFileId
+ * either because of the asynchronous flush caused by the previous checkpoint or a flush
+ * due to the dataset memory budget getting full.
+ */
+ while (currentLowWaterMarkLogFileId == initialLowWaterMarkFileId) {
+ ITupleReference tuple = tupleGenerator.next();
+ DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
+ lowWaterMarkLSN = nc.getTransactionSubsystem().getRecoveryManager().getMinFirstLSN();
+ currentLowWaterMarkLogFileId = logManager.getLogFileId(lowWaterMarkLSN);
+ }
+ }
+
+ /*
+ * At this point, the low-water mark is not in the initialLowWaterMarkFileId, so
+ * a checkpoint should delete it.
+ */
+ nc.getTransactionSubsystem().getRecoveryManager().checkpoint(false, logManager.getAppendLSN());
+
+ // Validate initialLowWaterMarkFileId was deleted
+ for (Long fileId : logManager.getLogFileIds()) {
+ Assert.assertNotEquals(initialLowWaterMarkFileId, fileId.longValue());
+ }
+
+ if (tupleAppender.getTupleCount() > 0) {
+ tupleAppender.write(insertOp, true);
+ }
+ insertOp.close();
+ nc.getTransactionManager().completedTransaction(txnCtx, new DatasetId(-1), -1, true);
+ } finally {
+ nc.deInit();
+ }
+ } catch (Throwable e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixTransactionProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixTransactionProperties.java
index afc9e4f..0480887 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixTransactionProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixTransactionProperties.java
@@ -18,13 +18,13 @@
*/
package org.apache.asterix.common.config;
+import static org.apache.hyracks.util.StorageUtil.StorageUnit.KILOBYTE;
+import static org.apache.hyracks.util.StorageUtil.StorageUnit.MEGABYTE;
+
import java.util.Map;
import org.apache.hyracks.util.StorageUtil;
-import static org.apache.hyracks.util.StorageUtil.StorageUnit.KILOBYTE;
-import static org.apache.hyracks.util.StorageUtil.StorageUnit.MEGABYTE;
-
public class AsterixTransactionProperties extends AbstractAsterixProperties {
private static final String TXN_LOG_BUFFER_NUMPAGES_KEY = "txn.log.buffer.numpages";
@@ -33,13 +33,13 @@
private static final String TXN_LOG_BUFFER_PAGESIZE_KEY = "txn.log.buffer.pagesize";
private static final int TXN_LOG_BUFFER_PAGESIZE_DEFAULT = StorageUtil.getSizeInBytes(128, KILOBYTE);
- private static final String TXN_LOG_PARTITIONSIZE_KEY = "txn.log.partitionsize";
+ public static final String TXN_LOG_PARTITIONSIZE_KEY = "txn.log.partitionsize";
private static final long TXN_LOG_PARTITIONSIZE_DEFAULT = StorageUtil.getSizeInBytes(256L, MEGABYTE);
private static final String TXN_LOG_CHECKPOINT_LSNTHRESHOLD_KEY = "txn.log.checkpoint.lsnthreshold";
private static final int TXN_LOG_CHECKPOINT_LSNTHRESHOLD_DEFAULT = StorageUtil.getSizeInBytes(64, MEGABYTE);
- private static final String TXN_LOG_CHECKPOINT_POLLFREQUENCY_KEY = "txn.log.checkpoint.pollfrequency";
+ public static final String TXN_LOG_CHECKPOINT_POLLFREQUENCY_KEY = "txn.log.checkpoint.pollfrequency";
private static final int TXN_LOG_CHECKPOINT_POLLFREQUENCY_DEFAULT = 120; // 120s
private static final String TXN_LOG_CHECKPOINT_HISTORY_KEY = "txn.log.checkpoint.history";
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index 7d61462..947ebc7 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -423,7 +423,7 @@
}
}
- private List<Long> getLogFileIds() {
+ public List<Long> getLogFileIds() {
File fileLogDir = new File(logDir);
String[] logFileNames = null;
List<Long> logFileIds = null;