ASTERIXDB-1696 - Big Object Spanning Log File Boundary
- Big objects spanning log file boundary produces
ArrayIndexOutOfBoundsException. Refactored large log page logic to
correctly handle big objects at log file boundaries.
- Add regression test
Change-Id: Ifd5ac08a8bcf4a1e0804aa05bd7e52169a0cf1bc
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1296
Reviewed-by: Murtadha Hubail <hubailmor@gmail.com>
Reviewed-by: Ian Maxon <imaxon@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java
index 15bb500..8ed833f 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTest.java
@@ -117,6 +117,7 @@
@Test
public void test() throws Exception {
+ int repeat = ExecutionTest.repeat * tcCtx.getRepeat();
try {
for (int i = 1; i <= repeat; i++) {
if (repeat > 1) {
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
index d0e342c..7a19102 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -6731,7 +6731,7 @@
<output-dir compare="Text">big_object_load_only_20M</output-dir>
</compilation-unit>
</test-case>
- <test-case FilePath="big-object">
+ <test-case FilePath="big-object" repeat="5">
<compilation-unit name="big_object_feed_20M">
<output-dir compare="Text">big_object_feed_20M</output-dir>
</compilation-unit>
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
index 29c0afd..741518a 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
@@ -66,6 +66,7 @@
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.client.StandardHttpRequestRetryHandler;
import org.apache.http.util.EntityUtils;
+import org.apache.hyracks.util.StorageUtil;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
@@ -85,6 +86,7 @@
private static final Pattern POLL_TIMEOUT_PATTERN =
Pattern.compile("polltimeoutsecs=(\\d+)(\\D|$)", Pattern.MULTILINE);
private static final Pattern POLL_DELAY_PATTERN = Pattern.compile("polldelaysecs=(\\d+)(\\D|$)", Pattern.MULTILINE);
+ public static final int TRUNCATE_THRESHOLD = 16384;
private static Method managixExecuteMethod = null;
private static final HashMap<Integer, ITestServer> runningTestServers = new HashMap<>();
@@ -147,30 +149,24 @@
if (lineExpected.isEmpty()) {
continue;
}
- throw new ComparisonException(
- "Result for " + scriptFile + " changed at line " + num + ":\n< " + lineExpected + "\n> ");
+ throwLineChanged(scriptFile, lineExpected, "<EOF>", num);
}
// Comparing result equality but ignore "Time"-prefixed fields. (for metadata tests.)
String[] lineSplitsExpected = lineExpected.split("Time");
String[] lineSplitsActual = lineActual.split("Time");
if (lineSplitsExpected.length != lineSplitsActual.length) {
- throw new ComparisonException(
- "Result for " + scriptFile + " changed at line " + num + ":\n< " + lineExpected
- + "\n> " + lineActual);
+ throwLineChanged(scriptFile, lineExpected, lineActual, num);
}
if (!equalStrings(lineSplitsExpected[0], lineSplitsActual[0], regex)) {
- throw new ComparisonException(
- "Result for " + scriptFile + " changed at line " + num + ":\n< " + lineExpected
- + "\n> " + lineActual);
+ throwLineChanged(scriptFile, lineExpected, lineActual, num);
}
for (int i = 1; i < lineSplitsExpected.length; i++) {
String[] splitsByCommaExpected = lineSplitsExpected[i].split(",");
String[] splitsByCommaActual = lineSplitsActual[i].split(",");
if (splitsByCommaExpected.length != splitsByCommaActual.length) {
- throw new ComparisonException("Result for " + scriptFile + " changed at line " + num + ":\n< "
- + lineExpected + "\n> " + lineActual);
+ throwLineChanged(scriptFile, lineExpected, lineActual, num);
}
for (int j = 1; j < splitsByCommaExpected.length; j++) {
if (splitsByCommaExpected[j].indexOf("DatasetId") >= 0) {
@@ -179,9 +175,7 @@
continue;
}
if (!equalStrings(splitsByCommaExpected[j], splitsByCommaActual[j], regex)) {
- throw new ComparisonException(
- "Result for " + scriptFile + " changed at line " + num + ":\n< "
- + lineExpected + "\n> " + lineActual);
+ throwLineChanged(scriptFile, lineExpected, lineActual, num);
}
}
}
@@ -203,6 +197,25 @@
}
+ private void throwLineChanged(File scriptFile, String lineExpected, String lineActual, int num)
+ throws ComparisonException {
+ throw new ComparisonException(
+ "Result for " + scriptFile + " changed at line " + num + ":\n< "
+ + truncateIfLong(lineExpected) + "\n> " + truncateIfLong(lineActual));
+ }
+
+ private String truncateIfLong(String string) {
+ if (string.length() < TRUNCATE_THRESHOLD) {
+ return string;
+ }
+ final StringBuilder truncatedString = new StringBuilder(string);
+ truncatedString.setLength(TRUNCATE_THRESHOLD);
+ truncatedString.append("\n<truncated ")
+ .append(StorageUtil.toHumanReadableSize(string.length() - TRUNCATE_THRESHOLD))
+ .append("...>");
+ return truncatedString.toString();
+ }
+
private boolean equalStrings(String expected, String actual, boolean regexMatch) {
String[] rowsExpected = expected.split("\n");
String[] rowsActual = actual.split("\n");
diff --git a/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/context/TestCaseContext.java b/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/context/TestCaseContext.java
index ac3fe3c..a696a0a 100644
--- a/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/context/TestCaseContext.java
+++ b/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/context/TestCaseContext.java
@@ -126,6 +126,10 @@
return testCase;
}
+ public int getRepeat() {
+ return testCase.getRepeat().intValue();
+ }
+
public List<TestFileContext> getFilesInDir(String basePath, String dirName, boolean withType) {
List<TestFileContext> testFileCtxs = new ArrayList<TestFileContext>();
diff --git a/asterixdb/asterix-test-framework/src/main/resources/Catalog.xsd b/asterixdb/asterix-test-framework/src/main/resources/Catalog.xsd
index a18f41c..f4ba0ee 100644
--- a/asterixdb/asterix-test-framework/src/main/resources/Catalog.xsd
+++ b/asterixdb/asterix-test-framework/src/main/resources/Catalog.xsd
@@ -164,7 +164,6 @@
<!-- This name is always equal to the name of the test case -->
<xs:attribute name="name" type="xs:string" use="required"/>
-
</xs:complexType>
</xs:element>
@@ -180,6 +179,7 @@
<xs:attribute name="FilePath" type="test:SimplifiedRelativeFilePath" use="required"/>
<xs:attribute name="category" type="test:category-enum"/>
+ <xs:attribute name="repeat" type="xs:positiveInteger" default="1" />
</xs:complexType>
<!-- category-enum type -->
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index 9a66aa5..7f74f52 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -73,6 +73,7 @@
private LogFlusher logFlusher;
private Future<? extends Object> futureLogFlusher;
private static final long SMALLEST_LOG_FILE_ID = 0;
+ private static final int INITIAL_LOG_SIZE = 0;
private final String nodeId;
protected LinkedBlockingQueue<ILogRecord> flushLogsQ;
private final FlushLogsLogger flushLogsLogger;
@@ -107,7 +108,7 @@
LOGGER.info("LogManager starts logging in LSN: " + appendLSN);
}
appendChannel = getFileChannel(appendLSN.get(), false);
- getAndInitNewPage();
+ getAndInitNewPage(INITIAL_LOG_SIZE);
logFlusher = new LogFlusher(this, emptyQ, flushQ);
futureLogFlusher = txnSubsystem.getAsterixAppRuntimeContextProvider().getThreadExecutor().submit(logFlusher);
if (!flushLogsLogger.isAlive()) {
@@ -155,17 +156,13 @@
* appendLSN = the first LSN of the next log file), we do not allow a log to be
* written at the last offset of the current file.
*/
- if (getLogFileOffset(appendLSN.get()) + logRecord.getLogSize() >= logFileSize) {
- prepareNextLogFile();
- appendPage.isFull(true);
- getAndInitNewPage();
- } else if (!appendPage.hasSpace(logRecord.getLogSize())) {
- appendPage.isFull(true);
- if (logRecord.getLogSize() > logPageSize) {
- getAndInitNewLargePage(logRecord.getLogSize());
- } else {
- getAndInitNewPage();
+ final int logSize = logRecord.getLogSize();
+ if (!appendPage.hasSpace(logSize)) {
+ if (getLogFileOffset(appendLSN.get()) + logSize >= logFileSize) {
+ prepareNextLogFile();
}
+ appendPage.isFull(true);
+ getAndInitNewPage(logSize);
}
appendPage.append(logRecord, appendLSN.get());
@@ -175,29 +172,29 @@
if (logRecord.isMarker()) {
logRecord.logAppended(appendLSN.get());
}
- appendLSN.addAndGet(logRecord.getLogSize());
+ appendLSN.addAndGet(logSize);
}
- protected void getAndInitNewLargePage(int logSize) {
- // for now, alloc a new buffer for each large page
- // TODO: pool large pages
- appendPage = new LogBuffer(txnSubsystem, logSize, flushLSN);
- appendPage.setFileChannel(appendChannel);
- flushQ.offer(appendPage);
- }
-
- protected void getAndInitNewPage() {
- appendPage = null;
- while (appendPage == null) {
- try {
- appendPage = emptyQ.take();
- } catch (InterruptedException e) {
- //ignore
+ protected void getAndInitNewPage(int logSize) {
+ if (logSize > logPageSize) {
+ // for now, alloc a new buffer for each large page
+ // TODO: pool large pages
+ appendPage = new LogBuffer(txnSubsystem, logSize, flushLSN);
+ appendPage.setFileChannel(appendChannel);
+ flushQ.offer(appendPage);
+ } else {
+ appendPage = null;
+ while (appendPage == null) {
+ try {
+ appendPage = emptyQ.take();
+ } catch (InterruptedException e) {
+ //ignore
+ }
}
+ appendPage.reset();
+ appendPage.setFileChannel(appendChannel);
+ flushQ.offer(appendPage);
}
- appendPage.reset();
- appendPage.setFileChannel(appendChannel);
- flushQ.offer(appendPage);
}
protected void prepareNextLogFile() {
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
index 0c4cb88..f27c13f 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
@@ -98,17 +98,13 @@
}
}
- if (getLogFileOffset(appendLSN.get()) + logRecord.getLogSize() > logFileSize) {
- prepareNextLogFile();
- appendPage.isFull(true);
- getAndInitNewPage();
- } else if (!appendPage.hasSpace(logRecord.getLogSize())) {
- appendPage.isFull(true);
- if (logRecord.getLogSize() > logPageSize) {
- getAndInitNewLargePage(logRecord.getLogSize());
- } else {
- getAndInitNewPage();
+ final int logRecordSize = logRecord.getLogSize();
+ if (!appendPage.hasSpace(logRecordSize)) {
+ if (getLogFileOffset(appendLSN.get()) + logRecordSize > logFileSize) {
+ prepareNextLogFile();
}
+ appendPage.isFull(true);
+ getAndInitNewPage(logRecordSize);
}
appendPage.appendWithReplication(logRecord, appendLSN.get());
@@ -116,7 +112,7 @@
logRecord.setLSN(appendLSN.get());
}
- appendLSN.addAndGet(logRecord.getLogSize());
+ appendLSN.addAndGet(logRecordSize);
}
@Override