[ASTERIXDB-2506][STO] Checkpoint Datasets
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- Add new config to specify dataset checkpoint interval
and default its value to 10 minutes.
- Flush datasets which were not flushed for the dataset
checkpoint interval.
- Run dataset checkpoint logic as part of the CheckpointThread.
- Improve dataset async flush API to accept any LSM index predicate.
- Cleanup exception handling in CheckpoinThread.
- Add test case for dataset checkpoint.
Change-Id: I38a73a43a4b1b7d3a8ac79dd579ed4ef8c9c6a9b
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3106
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/StorageExecutionTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/StorageExecutionTest.java
new file mode 100644
index 0000000..8c050a5
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/StorageExecutionTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.runtime;
+
+import java.util.Collection;
+
+import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Runs storage runtime tests
+ */
+@RunWith(Parameterized.class)
+public class StorageExecutionTest {
+ protected static final String TEST_CONFIG_FILE_NAME = "src/test/resources/cc-storage.conf";
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, new TestExecutor());
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ LangExecutionUtil.tearDown();
+ }
+
+ @Parameters(name = "StorageExecutionTest {index}: {0}")
+ public static Collection<Object[]> tests() throws Exception {
+ return LangExecutionUtil.tests("only_testsuite_storage.xml", "testsuite_storage.xml");
+ }
+
+ protected TestCaseContext tcCtx;
+
+ public StorageExecutionTest(TestCaseContext tcCtx) {
+ this.tcCtx = tcCtx;
+ }
+
+ @Test
+ public void test() throws Exception {
+ LangExecutionUtil.test(tcCtx);
+ }
+}
diff --git a/asterixdb/asterix-app/src/test/resources/cc-storage.conf b/asterixdb/asterix-app/src/test/resources/cc-storage.conf
new file mode 100644
index 0000000..b6bed24
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/cc-storage.conf
@@ -0,0 +1,57 @@
+; Licensed to the Apache Software Foundation (ASF) under one
+; or more contributor license agreements. See the NOTICE file
+; distributed with this work for additional information
+; regarding copyright ownership. The ASF licenses this file
+; to you under the Apache License, Version 2.0 (the
+; "License"); you may not use this file except in compliance
+; with the License. You may obtain a copy of the License at
+;
+; http://www.apache.org/licenses/LICENSE-2.0
+;
+; Unless required by applicable law or agreed to in writing,
+; software distributed under the License is distributed on an
+; "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+; KIND, either express or implied. See the License for the
+; specific language governing permissions and limitations
+; under the License.
+
+[nc/asterix_nc1]
+txn.log.dir=target/tmp/asterix_nc1/txnlog
+core.dump.dir=target/tmp/asterix_nc1/coredump
+iodevices=target/tmp/asterix_nc1/iodevice1,../asterix-server/target/tmp/asterix_nc1/iodevice2
+nc.api.port=19004
+
+[nc/asterix_nc2]
+ncservice.port=9091
+txn.log.dir=target/tmp/asterix_nc2/txnlog
+core.dump.dir=target/tmp/asterix_nc2/coredump
+iodevices=target/tmp/asterix_nc2/iodevice1,../asterix-server/target/tmp/asterix_nc2/iodevice2
+nc.api.port=19005
+
+[nc]
+address=127.0.0.1
+command=asterixnc
+app.class=org.apache.asterix.hyracks.bootstrap.NCApplication
+jvm.args=-Xmx4096m -Dnode.Resolver="org.apache.asterix.external.util.IdentitiyResolverFactory"
+storage.buffercache.pagesize=32KB
+storage.buffercache.size=21MB
+storage.memorycomponent.globalbudget=512MB
+storage.compression.block=snappy
+
+[cc]
+address = 127.0.0.1
+app.class=org.apache.asterix.hyracks.bootstrap.CCApplication
+heartbeat.period=2000
+heartbeat.max.misses=25
+
+[common]
+log.level = INFO
+compiler.framesize=32KB
+compiler.sortmemory=320KB
+compiler.groupmemory=160KB
+compiler.joinmemory=256KB
+compiler.textsearchmemory=160KB
+messaging.frame.size=4096
+messaging.frame.count=512
+txn.log.checkpoint.pollfrequency=10
+txn.dataset.checkpoint.interval=10
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/only_testsuite_storage.xml b/asterixdb/asterix-app/src/test/resources/runtimets/only_testsuite_storage.xml
new file mode 100644
index 0000000..bd34ae0
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/only_testsuite_storage.xml
@@ -0,0 +1,20 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements. See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership. The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License. You may obtain a copy of the License at
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied. See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<test-suite xmlns="urn:xml.testframework.asterix.apache.org" ResultOffsetPath="results" QueryOffsetPath="queries_sqlpp" QueryFileExtension=".sqlpp" SourceLocation="true">
+</test-suite>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/storage/dataset-checkpoint/dataset-checkpoint.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/storage/dataset-checkpoint/dataset-checkpoint.1.ddl.sqlpp
new file mode 100644
index 0000000..de04c53
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/storage/dataset-checkpoint/dataset-checkpoint.1.ddl.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+
+USE test;
+
+CREATE TYPE KeyType IF NOT EXISTS AS { id: int };
+CREATE DATASET ds(KeyType) PRIMARY KEY id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/storage/dataset-checkpoint/dataset-checkpoint.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/storage/dataset-checkpoint/dataset-checkpoint.2.update.sqlpp
new file mode 100644
index 0000000..e7c84d4
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/storage/dataset-checkpoint/dataset-checkpoint.2.update.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+USE test;
+UPSERT INTO ds ({"id": 1});
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/storage/dataset-checkpoint/dataset-checkpoint.3.pollquery.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/storage/dataset-checkpoint/dataset-checkpoint.3.pollquery.sqlpp
new file mode 100644
index 0000000..821b2f3
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/storage/dataset-checkpoint/dataset-checkpoint.3.pollquery.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+-- polltimeoutsecs=180
+SET `import-private-functions` `true`;
+USE test;
+SELECT VALUE COUNT(*)
+FROM storage_components('test','ds') resource
+WHERE array_length(resource.components) > 0;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/storage/dataset-checkpoint/dataset-checkpoint.4.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/storage/dataset-checkpoint/dataset-checkpoint.4.ddl.sqlpp
new file mode 100644
index 0000000..d195714
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/storage/dataset-checkpoint/dataset-checkpoint.4.ddl.sqlpp
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+DROP DATAVERSE test;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
index 5faf4d8..da464c7 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
@@ -38,6 +38,7 @@
"storage.max.active.writable.datasets" : 8,
"txn\.commitprofiler\.enabled" : false,
"txn\.commitprofiler\.reportinterval" : 5,
+ "txn\.dataset\.checkpoint\.interval" : 600,
"txn\.job\.recovery\.memorysize" : 67108864,
"txn\.lock\.escalationthreshold" : 1000,
"txn\.lock\.shrinktimer" : 5000,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
index e30c879..fa8f48e 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
@@ -38,6 +38,7 @@
"storage.max.active.writable.datasets" : 8,
"txn\.commitprofiler\.enabled" : false,
"txn\.commitprofiler\.reportinterval" : 5,
+ "txn\.dataset\.checkpoint\.interval" : 600,
"txn\.job\.recovery\.memorysize" : 67108864,
"txn\.lock\.escalationthreshold" : 1000,
"txn\.lock\.shrinktimer" : 5000,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
index ce5add1..801900c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
@@ -38,6 +38,7 @@
"storage.max.active.writable.datasets" : 8,
"txn\.commitprofiler\.enabled" : false,
"txn\.commitprofiler\.reportinterval" : 5,
+ "txn\.dataset\.checkpoint\.interval" : 600,
"txn\.job\.recovery\.memorysize" : 67108864,
"txn\.lock\.escalationthreshold" : 1000,
"txn\.lock\.shrinktimer" : 5000,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/storage/dataset-checkpoint/dataset-checkpoint.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/storage/dataset-checkpoint/dataset-checkpoint.3.adm
new file mode 100644
index 0000000..56a6051
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/storage/dataset-checkpoint/dataset-checkpoint.3.adm
@@ -0,0 +1 @@
+1
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_storage.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_storage.xml
new file mode 100644
index 0000000..4f2797f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_storage.xml
@@ -0,0 +1,27 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements. See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership. The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License. You may obtain a copy of the License at
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied. See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<test-suite xmlns="urn:xml.testframework.asterix.apache.org" ResultOffsetPath="results" QueryOffsetPath="queries_sqlpp" QueryFileExtension=".sqlpp" SourceLocation="true">
+ <test-group name="storage">
+ <test-case FilePath="storage">
+ <compilation-unit name="dataset-checkpoint">
+ <output-dir compare="Text">dataset-checkpoint</output-dir>
+ </compilation-unit>
+ </test-case>
+ </test-group>
+</test-suite>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index d18b6ab..954c209 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -19,6 +19,7 @@
package org.apache.asterix.common.api;
import java.util.List;
+import java.util.function.Predicate;
import org.apache.asterix.common.context.DatasetInfo;
import org.apache.asterix.common.context.IndexInfo;
@@ -26,6 +27,7 @@
import org.apache.asterix.common.replication.IReplicationStrategy;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import org.apache.hyracks.storage.common.IIndex;
import org.apache.hyracks.storage.common.IResourceLifecycleManager;
@@ -56,12 +58,12 @@
void flushAllDatasets() throws HyracksDataException;
/**
- * Schedules asynchronous flush on datasets that have memory components with first LSN < nonSharpCheckpointTargetLSN.
+ * Schedules asynchronous flush on indexes matching the predicate {@code indexPredicate}
*
- * @param nonSharpCheckpointTargetLSN
+ * @param indexPredicate
* @throws HyracksDataException
*/
- void scheduleAsyncFlushForLaggingDatasets(long nonSharpCheckpointTargetLSN) throws HyracksDataException;
+ void asyncFlushMatchingIndexes(Predicate<ILSMIndex> indexPredicate) throws HyracksDataException;
/**
* creates (if necessary) and returns the dataset info.
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/TransactionProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/TransactionProperties.java
index 6ff51ca..fb6ca6b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/TransactionProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/TransactionProperties.java
@@ -26,6 +26,7 @@
import static org.apache.hyracks.util.StorageUtil.StorageUnit.MEGABYTE;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.apache.hyracks.api.config.IOption;
import org.apache.hyracks.api.config.IOptionType;
@@ -35,6 +36,10 @@
public class TransactionProperties extends AbstractProperties {
public enum Option implements IOption {
+ TXN_DATASET_CHECKPOINT_INTERVAL(
+ POSITIVE_INTEGER,
+ (int) TimeUnit.MINUTES.toSeconds(10),
+ "The interval (in seconds) after which a dataset is considered idle and persisted to disk"),
TXN_LOG_BUFFER_NUMPAGES(POSITIVE_INTEGER, 8, "The number of pages in the transaction log tail"),
TXN_LOG_BUFFER_PAGESIZE(
INTEGER_BYTE_UNIT,
@@ -173,4 +178,8 @@
public long getJobRecoveryMemorySize() {
return accessor.getLong(Option.TXN_JOB_RECOVERY_MEMORYSIZE);
}
+
+ public int getDatasetCheckpointInterval() {
+ return accessor.getInt(Option.TXN_DATASET_CHECKPOINT_INTERVAL);
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 486cd45..61ffadc 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -28,6 +28,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Predicate;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.api.IDatasetMemoryManager;
@@ -390,35 +391,27 @@
}
@Override
- public synchronized void scheduleAsyncFlushForLaggingDatasets(long targetLSN) throws HyracksDataException {
- //schedule flush for datasets with min LSN (Log Serial Number) < targetLSN
+ public synchronized void asyncFlushMatchingIndexes(Predicate<ILSMIndex> indexPredicate)
+ throws HyracksDataException {
for (DatasetResource dsr : datasets.values()) {
for (PrimaryIndexOperationTracker opTracker : dsr.getOpTrackers()) {
- // check all partitions
synchronized (opTracker) {
- scheduleAsyncFlushForLaggingDatasetPartition(dsr, opTracker, targetLSN);
+ asyncFlush(dsr, opTracker, indexPredicate);
}
}
}
}
- private void scheduleAsyncFlushForLaggingDatasetPartition(DatasetResource dsr,
- PrimaryIndexOperationTracker opTracker, long targetLSN) throws HyracksDataException {
- int partition = opTracker.getPartition();
+ private void asyncFlush(DatasetResource dsr, PrimaryIndexOperationTracker opTracker,
+ Predicate<ILSMIndex> indexPredicate) throws HyracksDataException {
+ final int partition = opTracker.getPartition();
for (ILSMIndex lsmIndex : dsr.getDatasetInfo().getDatasetPartitionOpenIndexes(partition)) {
LSMIOOperationCallback ioCallback = (LSMIOOperationCallback) lsmIndex.getIOOperationCallback();
- if (!(lsmIndex.isCurrentMutableComponentEmpty() || ioCallback.hasPendingFlush()
- || opTracker.isFlushLogCreated() || opTracker.isFlushOnExit())) {
- long firstLSN = ioCallback.getPersistenceLsn();
- if (firstLSN < targetLSN) {
- LOGGER.info("Checkpoint flush dataset {} partition {}", dsr.getDatasetID(), partition);
- opTracker.setFlushOnExit(true);
- if (opTracker.getNumActiveOperations() == 0) {
- // No Modify operations currently, we need to trigger the flush and we can do so safely
- opTracker.flushIfRequested();
- }
- break;
- }
+ if (needsFlush(opTracker, lsmIndex, ioCallback) && indexPredicate.test(lsmIndex)) {
+ LOGGER.info("Async flushing {}", opTracker);
+ opTracker.setFlushOnExit(true);
+ opTracker.flushIfNeeded();
+ break;
}
}
}
@@ -623,4 +616,10 @@
throw new IllegalStateException(e);
}
}
+
+ private static boolean needsFlush(PrimaryIndexOperationTracker opTracker, ILSMIndex lsmIndex,
+ LSMIOOperationCallback ioCallback) throws HyracksDataException {
+ return !(lsmIndex.isCurrentMutableComponentEmpty() || ioCallback.hasPendingFlush()
+ || opTracker.isFlushLogCreated() || opTracker.isFlushOnExit());
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index 59e19ca..806a6d4 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -51,9 +51,11 @@
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
import org.apache.hyracks.storage.common.IModificationOperationCallback;
import org.apache.hyracks.storage.common.ISearchOperationCallback;
+import org.apache.hyracks.util.annotations.NotThreadSafe;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+@NotThreadSafe
public class PrimaryIndexOperationTracker extends BaseOperationTracker implements IoOperationCompleteListener {
private static final Logger LOGGER = LogManager.getLogger();
private final int partition;
@@ -64,6 +66,7 @@
private boolean flushOnExit = false;
private boolean flushLogCreated = false;
private final Map<String, FlushOperation> scheduledFlushes = new HashMap<>();
+ private long lastFlushTime = System.nanoTime();
public PrimaryIndexOperationTracker(int datasetID, int partition, ILogManager logManager, DatasetInfo dsInfo,
ILSMComponentIdGenerator idGenerator) {
@@ -213,6 +216,7 @@
ILSMIndexAccessor accessor = lsmIndex.createAccessor(NoOpIndexAccessParameters.INSTANCE);
accessor.getOpContext().setParameters(flushMap);
ILSMIOOperation flush = accessor.scheduleFlush();
+ lastFlushTime = System.nanoTime();
scheduledFlushes.put(flush.getTarget().getRelativePath(), (FlushOperation) flush);
flush.addCompleteListener(this);
}
@@ -276,6 +280,15 @@
return partition;
}
+ public long getLastFlushTime() {
+ return lastFlushTime;
+ }
+
+ @Override
+ public String toString() {
+ return "Dataset (" + datasetID + "), Partition (" + partition + ")";
+ }
+
private boolean canSafelyFlush() {
return numActiveOperations.get() == 0;
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/CheckpointProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/CheckpointProperties.java
index 72f987a..a6951b3 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/CheckpointProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/CheckpointProperties.java
@@ -26,6 +26,7 @@
private final int lsnThreshold;
private final int pollFrequency;
private final int historyToKeep;
+ private final int datasetCheckpointInterval;
public CheckpointProperties(TransactionProperties txnProperties, String nodeId) {
// Currently we use the log files directory for checkpoints
@@ -33,6 +34,7 @@
lsnThreshold = txnProperties.getCheckpointLSNThreshold();
pollFrequency = txnProperties.getCheckpointPollFrequency();
historyToKeep = txnProperties.getCheckpointHistory();
+ datasetCheckpointInterval = txnProperties.getDatasetCheckpointInterval();
}
public int getLsnThreshold() {
@@ -51,10 +53,15 @@
return checkpointDirPath;
}
+ public int getDatasetCheckpointInterval() {
+ return datasetCheckpointInterval;
+ }
+
@Override
public String toString() {
return "{\"class\" : \"" + getClass().getSimpleName() + "\", \"checkpoint-dir-path\" : \"" + checkpointDirPath
+ "\", \"lsn-threshold\" : " + lsnThreshold + ", \"poll-frequency\" : " + pollFrequency
- + ", \"history-to-keep\" : " + historyToKeep + " }";
+ + ", \"history-to-keep\" : " + historyToKeep + ", \"dataset-checkpoint-interval\" : "
+ + datasetCheckpointInterval + "}";
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
index 36cea55..ac652e3 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
@@ -58,4 +58,11 @@
* @param id
*/
void completed(TxnId id);
+
+ /**
+ * Checkpoints idle datasets by flushing their in-memory component to disk if needed.
+ *
+ * @throws HyracksDataException
+ */
+ void checkpointIdleDatasets() throws HyracksDataException;
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
index ce523db..ba945be 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
@@ -18,19 +18,23 @@
*/
package org.apache.asterix.transaction.management.service.recovery;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Predicate;
+
import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
+import org.apache.asterix.common.ioopcallbacks.LSMIOOperationCallback;
import org.apache.asterix.common.transactions.CheckpointProperties;
import org.apache.asterix.common.transactions.ICheckpointManager;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
import org.apache.asterix.common.transactions.TxnId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
/**
* An implementation of {@link ICheckpointManager} that defines the logic
* of checkpoints.
@@ -39,10 +43,12 @@
private static final Logger LOGGER = LogManager.getLogger();
private static final long NO_SECURED_LSN = -1L;
+ private final long datasetCheckpointInterval;
private final Map<TxnId, Long> securedLSNs;
public CheckpointManager(ITransactionSubsystem txnSubsystem, CheckpointProperties checkpointProperties) {
super(txnSubsystem, checkpointProperties);
+ datasetCheckpointInterval = checkpointProperties.getDatasetCheckpointInterval();
securedLSNs = new HashMap<>();
}
@@ -78,9 +84,8 @@
boolean checkpointSucceeded = minFirstLSN >= checkpointTargetLSN;
if (!checkpointSucceeded) {
// Flush datasets with indexes behind target checkpoint LSN
- IDatasetLifecycleManager datasetLifecycleManager =
- txnSubsystem.getApplicationContext().getDatasetLifecycleManager();
- datasetLifecycleManager.scheduleAsyncFlushForLaggingDatasets(checkpointTargetLSN);
+ final IDatasetLifecycleManager dlcm = txnSubsystem.getApplicationContext().getDatasetLifecycleManager();
+ dlcm.asyncFlushMatchingIndexes(newLaggingDatasetPredicate(checkpointTargetLSN));
}
capture(minFirstLSN, false);
if (checkpointSucceeded) {
@@ -100,7 +105,31 @@
securedLSNs.remove(id);
}
+ @Override
+ public synchronized void checkpointIdleDatasets() throws HyracksDataException {
+ final IDatasetLifecycleManager dlcm = txnSubsystem.getApplicationContext().getDatasetLifecycleManager();
+ dlcm.asyncFlushMatchingIndexes(newIdleDatasetPredicate());
+ }
+
private synchronized long getMinSecuredLSN() {
return securedLSNs.isEmpty() ? NO_SECURED_LSN : Collections.min(securedLSNs.values());
}
+
+ private Predicate<ILSMIndex> newIdleDatasetPredicate() {
+ final long currentTime = System.nanoTime();
+ return lsmIndex -> {
+ if (lsmIndex.isPrimaryIndex()) {
+ PrimaryIndexOperationTracker opTracker = (PrimaryIndexOperationTracker) lsmIndex.getOperationTracker();
+ return currentTime - opTracker.getLastFlushTime() >= datasetCheckpointInterval;
+ }
+ return false;
+ };
+ }
+
+ private Predicate<ILSMIndex> newLaggingDatasetPredicate(long checkpointTargetLSN) {
+ return lsmIndex -> {
+ final LSMIOOperationCallback ioCallback = (LSMIOOperationCallback) lsmIndex.getIOOperationCallback();
+ return ioCallback.getPersistenceLsn() < checkpointTargetLSN;
+ };
+ }
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java
index 1992057..446eec5 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java
@@ -20,8 +20,6 @@
import org.apache.asterix.common.transactions.ICheckpointManager;
import org.apache.asterix.common.transactions.ILogManager;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -59,29 +57,22 @@
while (shouldRun) {
try {
sleep(checkpointTermInSecs * 1000);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- if (!shouldRun) {
- return;
- }
- if (lastCheckpointLSN == -1) {
- try {
+ if (!shouldRun) {
+ return;
+ }
+ if (lastCheckpointLSN == -1) {
//Since the system just started up after sharp checkpoint,
//last checkpoint LSN is considered as the min LSN of the current log partition
lastCheckpointLSN = logManager.getReadableSmallestLSN();
- } catch (Exception e) {
- LOGGER.log(Level.WARN, "Error getting smallest readable LSN", e);
- lastCheckpointLSN = 0;
}
- }
+ checkpointManager.checkpointIdleDatasets();
- //1. get current log LSN
- currentLogLSN = logManager.getAppendLSN();
+ //1. get current log LSN
+ currentLogLSN = logManager.getAppendLSN();
- //2. if current log LSN - previous checkpoint > threshold, do checkpoint
- if (currentLogLSN - lastCheckpointLSN > lsnThreshold) {
- try {
+ //2. if current log LSN - previous checkpoint > threshold, do checkpoint
+ if (currentLogLSN - lastCheckpointLSN > lsnThreshold) {
+
// in check point:
//1. get minimum first LSN (MFL) from open indexes.
//2. if current MinFirstLSN < targetCheckpointLSN, schedule async flush for any open index witch has first LSN < force flush delta
@@ -94,9 +85,13 @@
if (currentCheckpointAttemptMinLSN >= targetCheckpointLSN) {
lastCheckpointLSN = currentCheckpointAttemptMinLSN;
}
- } catch (HyracksDataException e) {
- LOGGER.log(Level.ERROR, "Error during checkpoint", e);
+
}
+ } catch (InterruptedException e) {
+ LOGGER.info("Checkpoint thread interrupted", e);
+ Thread.currentThread().interrupt();
+ } catch (Exception e) {
+ LOGGER.error("Error during checkpoint", e);
}
}
}