[ASTERIXDB-2491][TXN] Recovery fixes
- user model changes: no
- storage format changes: yes[1]
- interface changes: no
Details:
- Change field offset type from integer16 to integer32
- Add recovery test
- Add version to log entries
[1] LogRecord format change with this patch; old LogRecord format can still
be processed, however old instances cannot read new log format
Change-Id: Iaf14b9a73a0239763bfeb0ce2d81cf952e6d72d3
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3065
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogRecordTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogRecordTest.java
new file mode 100644
index 0000000..b7924d7
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogRecordTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.txn;
+
+import static org.apache.asterix.common.transactions.LogConstants.LOG_SOURCE_MAX;
+import static org.apache.asterix.common.transactions.LogConstants.LOG_SOURCE_MIN;
+import static org.apache.asterix.common.transactions.LogConstants.VERSION_MAX;
+import static org.apache.asterix.common.transactions.LogConstants.VERSION_MIN;
+
+import java.nio.ByteBuffer;
+import java.util.stream.IntStream;
+
+import org.apache.asterix.common.transactions.LogRecord;
+import org.apache.asterix.common.transactions.LogSource;
+import org.apache.asterix.common.transactions.LogType;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runners.MethodSorters;
+
+@FixMethodOrder(MethodSorters.JVM)
+public class LogRecordTest {
+ private static ByteBuffer buffer;
+
+ @BeforeClass
+ public static void setup() {
+ buffer = ByteBuffer.allocate(100);
+ }
+
+ @Test
+ @SuppressWarnings("squid:S3415")
+ public void testVersionIdLogSourceRange() {
+ Assert.assertEquals("min version", 0, VERSION_MIN);
+ Assert.assertEquals("max version", 62, VERSION_MAX);
+ Assert.assertEquals("min source", 0, LOG_SOURCE_MIN);
+ Assert.assertEquals("max source", 3, LOG_SOURCE_MAX);
+ IntStream.rangeClosed(LOG_SOURCE_MIN, LOG_SOURCE_MAX).forEach(
+ s -> IntStream.rangeClosed(VERSION_MIN, VERSION_MAX).forEach(v -> testVersionSourceCombo(v, (byte) s)));
+ }
+
+ @Test
+ public void testIllegalVersionIds() {
+ try {
+ testVersionSourceCombo(63, LogSource.LOCAL);
+ Assert.fail("expected IllegalArgumentException on version overflow not found");
+ } catch (IllegalArgumentException e) {
+ // ignore - expected
+ }
+ try {
+ testVersionSourceCombo(-1, LogSource.LOCAL);
+ Assert.fail("expected IllegalArgumentException on version underflow not found");
+ } catch (IllegalArgumentException e) {
+ // ignore - expected
+ }
+ }
+
+ @Test
+ public void testIllegalLogSources() {
+ LogRecord record = new LogRecord();
+ try {
+ record.setLogSource((byte) -1);
+ Assert.fail("expected IllegalArgumentException on log source underflow not found");
+ } catch (IllegalArgumentException e) {
+ // ignore - expected
+ }
+ try {
+ record.setLogSource((byte) 4);
+ Assert.fail("expected IllegalArgumentException on log source overflow not found");
+ } catch (IllegalArgumentException e) {
+ // ignore - expected
+ }
+ }
+
+ private void testVersionSourceCombo(int version, byte source) {
+ buffer.clear();
+ LogRecord record = new LogRecord();
+ record.setLogType(LogType.FLUSH);
+ record.setVersion(version);
+ record.setLogSource(source);
+ record.computeAndSetLogSize();
+ Assert.assertEquals("input version", version, record.getVersion());
+ Assert.assertEquals("input source", source, record.getLogSource());
+ record.writeLogRecord(buffer);
+
+ buffer.flip();
+ LogRecord read = new LogRecord();
+ read.readLogRecord(buffer);
+ Assert.assertEquals("read version", version, read.getVersion());
+ Assert.assertEquals("read source", source, read.getLogSource());
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
index 04f9751..1dec3fe 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
@@ -31,42 +31,6 @@
LARGE_RECORD
}
- int CHKSUM_LEN = Long.BYTES;
- int FLDCNT_LEN = Integer.BYTES;
- int DS_LEN = Integer.BYTES;
- int LOG_SOURCE_LEN = Byte.BYTES;
- int LOGRCD_SZ_LEN = Integer.BYTES;
- int NEWOP_LEN = Byte.BYTES;
- int NEWVALSZ_LEN = Integer.BYTES;
- int PKHASH_LEN = Integer.BYTES;
- int PKSZ_LEN = Integer.BYTES;
- int PRVLSN_LEN = Long.BYTES;
- int RS_PARTITION_LEN = Integer.BYTES;
- int RSID_LEN = Long.BYTES;
- int SEQ_NUM_LEN = Long.BYTES;
- int TYPE_LEN = Byte.BYTES;
- int UUID_LEN = Long.BYTES;
- int FLUSHING_COMPONENT_MINID_LEN = Long.BYTES;
- int FLUSHING_COMPONENT_MAXID_LEN = Long.BYTES;
-
- int ALL_RECORD_HEADER_LEN = LOG_SOURCE_LEN + TYPE_LEN + TxnId.BYTES;
- int ENTITY_RESOURCE_HEADER_LEN = RS_PARTITION_LEN + DatasetId.BYTES;
- int ENTITY_VALUE_HEADER_LEN = PKHASH_LEN + PKSZ_LEN;
- int UPDATE_LSN_HEADER = RSID_LEN + LOGRCD_SZ_LEN;
- int UPDATE_BODY_HEADER = FLDCNT_LEN + NEWOP_LEN + NEWVALSZ_LEN;
-
- int JOB_TERMINATE_LOG_SIZE = ALL_RECORD_HEADER_LEN + CHKSUM_LEN;
- int ENTITY_COMMIT_LOG_BASE_SIZE =
- ALL_RECORD_HEADER_LEN + ENTITY_RESOURCE_HEADER_LEN + ENTITY_VALUE_HEADER_LEN + CHKSUM_LEN;
- int UPDATE_LOG_BASE_SIZE = ENTITY_COMMIT_LOG_BASE_SIZE + UPDATE_LSN_HEADER + UPDATE_BODY_HEADER;
- int FILTER_LOG_BASE_SIZE =
- ALL_RECORD_HEADER_LEN + ENTITY_RESOURCE_HEADER_LEN + UPDATE_BODY_HEADER + UPDATE_LSN_HEADER + CHKSUM_LEN;
- int FLUSH_LOG_SIZE = ALL_RECORD_HEADER_LEN + DS_LEN + RS_PARTITION_LEN + FLUSHING_COMPONENT_MINID_LEN
- + FLUSHING_COMPONENT_MAXID_LEN + CHKSUM_LEN;
- int WAIT_LOG_SIZE = ALL_RECORD_HEADER_LEN + CHKSUM_LEN;
- int MARKER_BASE_LOG_SIZE =
- ALL_RECORD_HEADER_LEN + CHKSUM_LEN + DS_LEN + RS_PARTITION_LEN + PRVLSN_LEN + LOGRCD_SZ_LEN;
-
RecordReadStatus readLogRecord(ByteBuffer buffer);
void writeLogRecord(ByteBuffer buffer);
@@ -93,7 +57,7 @@
int getPKHashValue();
- void setPKHashValue(int PKHashValue);
+ void setPKHashValue(int pkHashValue);
long getResourceId();
@@ -133,7 +97,7 @@
void computeAndSetPKValueSize();
- void setPKValue(ITupleReference PKValue);
+ void setPKValue(ITupleReference pKValue);
void readRemoteLog(ByteBuffer buffer);
@@ -191,4 +155,8 @@
long getFlushingComponentMaxId();
void setFlushingComponentMaxId(long flushingComponentMaxId);
+
+ int getVersion();
+
+ void setVersion(int version);
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogConstants.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogConstants.java
new file mode 100644
index 0000000..58d020f
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogConstants.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.transactions;
+
+public class LogConstants {
+
+ public static final int CHKSUM_LEN = Long.BYTES;
+ public static final int FLDCNT_LEN = Integer.BYTES;
+ public static final int DS_LEN = Integer.BYTES;
+ public static final int LOG_SOURCE_LEN = Byte.BYTES;
+ public static final int LOGRCD_SZ_LEN = Integer.BYTES;
+ public static final int NEWOP_LEN = Byte.BYTES;
+ public static final int NEWVALSZ_LEN = Integer.BYTES;
+ public static final int PKHASH_LEN = Integer.BYTES;
+ public static final int PKSZ_LEN = Integer.BYTES;
+ public static final int PRVLSN_LEN = Long.BYTES;
+ public static final int RS_PARTITION_LEN = Integer.BYTES;
+ public static final int RSID_LEN = Long.BYTES;
+ public static final int SEQ_NUM_LEN = Long.BYTES;
+ public static final int TYPE_LEN = Byte.BYTES;
+ public static final int UUID_LEN = Long.BYTES;
+ public static final int FLUSHING_COMPONENT_MINID_LEN = Long.BYTES;
+ public static final int FLUSHING_COMPONENT_MAXID_LEN = Long.BYTES;
+
+ public static final int ALL_RECORD_HEADER_LEN = LOG_SOURCE_LEN + TYPE_LEN + TxnId.BYTES;
+ public static final int ENTITY_RESOURCE_HEADER_LEN = RS_PARTITION_LEN + DatasetId.BYTES;
+ public static final int ENTITY_VALUE_HEADER_LEN = PKHASH_LEN + PKSZ_LEN;
+ public static final int UPDATE_LSN_HEADER = RSID_LEN + LOGRCD_SZ_LEN;
+ public static final int UPDATE_BODY_HEADER = FLDCNT_LEN + NEWOP_LEN + NEWVALSZ_LEN;
+
+ public static final int JOB_TERMINATE_LOG_SIZE = ALL_RECORD_HEADER_LEN + CHKSUM_LEN;
+ public static final int ENTITY_COMMIT_LOG_BASE_SIZE =
+ ALL_RECORD_HEADER_LEN + ENTITY_RESOURCE_HEADER_LEN + ENTITY_VALUE_HEADER_LEN + CHKSUM_LEN;
+ public static final int UPDATE_LOG_BASE_SIZE = ENTITY_COMMIT_LOG_BASE_SIZE + UPDATE_LSN_HEADER + UPDATE_BODY_HEADER;
+ public static final int FILTER_LOG_BASE_SIZE =
+ ALL_RECORD_HEADER_LEN + ENTITY_RESOURCE_HEADER_LEN + UPDATE_BODY_HEADER + UPDATE_LSN_HEADER + CHKSUM_LEN;
+ public static final int FLUSH_LOG_SIZE = ALL_RECORD_HEADER_LEN + DS_LEN + RS_PARTITION_LEN
+ + FLUSHING_COMPONENT_MINID_LEN + FLUSHING_COMPONENT_MAXID_LEN + CHKSUM_LEN;
+ public static final int WAIT_LOG_SIZE = ALL_RECORD_HEADER_LEN + CHKSUM_LEN;
+ public static final int MARKER_BASE_LOG_SIZE =
+ ALL_RECORD_HEADER_LEN + CHKSUM_LEN + DS_LEN + RS_PARTITION_LEN + PRVLSN_LEN + LOGRCD_SZ_LEN;
+
+ public static final int V_0 = 0;
+ public static final int V_1 = 1;
+ public static final int V_CURRENT = V_1;
+ public static final int VERSION_MIN = 0;
+ public static final int VERSION_MAX = (0xff >> 2) - 1;
+
+ public static final int LOG_SOURCE_MIN = 0;
+ public static final int LOG_SOURCE_MAX = (1 << 2) - 1;
+
+ private LogConstants() {
+ }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
index 7a1079d..cfacf71 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.common.transactions;
+import static org.apache.asterix.common.transactions.LogConstants.*;
+
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -25,14 +27,15 @@
import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-import org.apache.hyracks.storage.am.common.tuples.SimpleTupleReference;
+import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleReference;
+import org.apache.hyracks.storage.am.common.tuples.SimpleTupleReferenceV0;
import org.apache.hyracks.storage.am.common.tuples.SimpleTupleWriter;
/**
* == LogRecordFormat ==
* ---------------------------
* [Header1] (10 bytes) : for all log types
- * LogSource(1)
+ * LogSourceVersion(1) : high 5 bits are used for log record version; low 3 bits are reserved for LogSource
* LogType(1)
* TxnId(8)
* ---------------------------
@@ -60,15 +63,15 @@
* ---------------------------
*/
public class LogRecord implements ILogRecord {
-
// ------------- fields in a log record (begin) ------------//
- private byte logSource;
+ private int version = V_CURRENT;
+ private byte logSource = LogSource.LOCAL;
private byte logType;
private long txnId;
private int datasetId;
- private int PKHashValue;
- private int PKValueSize;
- private ITupleReference PKValue;
+ private int pKHashValue;
+ private int pKValueSize;
+ private ITupleReference pKValue;
private long resourceId;
private int resourcePartition;
private int logSize;
@@ -86,15 +89,17 @@
private long flushingComponentMaxId;
// ------------- fields in a log record (end) --------------//
private final ILogMarkerCallback callback; // A callback for log mark operations
- private int PKFieldCnt;
+ private int pKFieldCnt;
private ITransactionContext txnCtx;
private volatile long LSN;
private final AtomicBoolean isFlushed;
private final PrimaryKeyTupleReference readPKValue;
- private final SimpleTupleReference readNewValue;
- private final SimpleTupleReference readOldValue;
+ private final ITreeIndexTupleReference readNewValue;
+ private final ITreeIndexTupleReference readOldValue;
+ private ITreeIndexTupleReference readNewValueV0;
+ private ITreeIndexTupleReference readOldValueV0;
private final CRC32 checksumGen;
- private int[] PKFields;
+ private int[] pKFields;
private PrimaryIndexOperationTracker opTracker;
/**
@@ -113,7 +118,6 @@
readNewValue = SimpleTupleWriter.INSTANCE.createTupleReference();
readOldValue = SimpleTupleWriter.INSTANCE.createTupleReference();
checksumGen = new CRC32();
- logSource = LogSource.LOCAL;
}
public LogRecord() {
@@ -121,7 +125,7 @@
}
private void doWriteLogRecord(ByteBuffer buffer) {
- buffer.put(logSource);
+ buffer.put((byte) (version << 2 | (logSource & 0xff)));
buffer.put(logType);
buffer.putLong(txnId);
switch (logType) {
@@ -172,11 +176,11 @@
}
private void writeEntityValue(ByteBuffer buffer) {
- buffer.putInt(PKHashValue);
- if (PKValueSize <= 0) {
+ buffer.putInt(pKHashValue);
+ if (pKValueSize <= 0) {
throw new IllegalStateException("Primary Key Size is less than or equal to 0");
}
- buffer.putInt(PKValueSize);
+ buffer.putInt(pKValueSize);
writePKValue(buffer);
}
@@ -203,13 +207,13 @@
private void writePKValue(ByteBuffer buffer) {
if (logSource == LogSource.LOCAL) {
- for (int i = 0; i < PKFieldCnt; i++) {
- buffer.put(PKValue.getFieldData(0), PKValue.getFieldStart(PKFields[i]),
- PKValue.getFieldLength(PKFields[i]));
+ for (int i = 0; i < pKFieldCnt; i++) {
+ buffer.put(pKValue.getFieldData(0), pKValue.getFieldStart(pKFields[i]),
+ pKValue.getFieldLength(pKFields[i]));
}
} else {
- // since PKValue is already serialized in remote logs, just put it into buffer
- buffer.put(PKValue.getFieldData(0), 0, PKValueSize);
+ // since pKValue is already serialized in remote logs, just put it into buffer
+ buffer.put(pKValue.getFieldData(0), 0, pKValueSize);
}
}
@@ -253,7 +257,22 @@
if (buffer.remaining() < ALL_RECORD_HEADER_LEN) {
return RecordReadStatus.TRUNCATED;
}
- logSource = buffer.get();
+ byte logSourceVersion = buffer.get();
+ logSource = (byte) (logSourceVersion & 0x3);
+ version = (byte) ((logSourceVersion & 0xff) >> 2);
+ ITreeIndexTupleReference readOld;
+ ITreeIndexTupleReference readNew;
+ if (version == V_0) {
+ if (readOldValueV0 == null) {
+ readOldValueV0 = new SimpleTupleReferenceV0();
+ readNewValueV0 = new SimpleTupleReferenceV0();
+ }
+ readOld = readOldValueV0;
+ readNew = readNewValueV0;
+ } else {
+ readOld = readOldValue;
+ readNew = readNewValue;
+ }
logType = buffer.get();
txnId = buffer.getLong();
switch (logType) {
@@ -276,7 +295,7 @@
case LogType.JOB_COMMIT:
case LogType.ABORT:
datasetId = -1;
- PKHashValue = -1;
+ pKHashValue = -1;
computeAndSetLogSize();
break;
case LogType.ENTITY_COMMIT:
@@ -288,7 +307,7 @@
break;
case LogType.UPDATE:
if (readEntityResource(buffer) && readEntityValue(buffer)) {
- return readUpdateInfo(buffer);
+ return readUpdateInfo(buffer, readNew, readOld);
} else {
return RecordReadStatus.TRUNCATED;
}
@@ -316,7 +335,7 @@
break;
case LogType.FILTER:
if (readEntityResource(buffer)) {
- return readUpdateInfo(buffer);
+ return readUpdateInfo(buffer, readNew, readOld);
} else {
return RecordReadStatus.TRUNCATED;
}
@@ -331,16 +350,16 @@
if (buffer.remaining() < ENTITY_VALUE_HEADER_LEN) {
return false;
}
- PKHashValue = buffer.getInt();
- PKValueSize = buffer.getInt();
+ pKHashValue = buffer.getInt();
+ pKValueSize = buffer.getInt();
// attempt to read in the PK
- if (buffer.remaining() < PKValueSize) {
+ if (buffer.remaining() < pKValueSize) {
return false;
}
- if (PKValueSize <= 0) {
+ if (pKValueSize <= 0) {
throw new IllegalStateException("Primary Key Size is less than or equal to 0");
}
- PKValue = readPKValue(buffer);
+ pKValue = readPKValue(buffer);
return true;
}
@@ -354,7 +373,8 @@
return true;
}
- private RecordReadStatus readUpdateInfo(ByteBuffer buffer) {
+ private RecordReadStatus readUpdateInfo(ByteBuffer buffer, ITreeIndexTupleReference newRead,
+ ITreeIndexTupleReference oldRead) {
if (buffer.remaining() < UPDATE_LSN_HEADER + UPDATE_BODY_HEADER) {
return RecordReadStatus.TRUNCATED;
}
@@ -369,7 +389,7 @@
}
return RecordReadStatus.TRUNCATED;
}
- newValue = readTuple(buffer, readNewValue, newValueFieldCount, newValueSize);
+ newValue = readTuple(buffer, newRead, newValueFieldCount, newValueSize);
if (logSize > getUpdateLogSizeWithoutOldValue()) {
// Prev Image exists
if (buffer.remaining() < Integer.BYTES) {
@@ -383,7 +403,7 @@
if (buffer.remaining() < oldValueSize) {
return RecordReadStatus.TRUNCATED;
}
- oldValue = readTuple(buffer, readOldValue, oldValueFieldCount, oldValueSize);
+ oldValue = readTuple(buffer, oldRead, oldValueFieldCount, oldValueSize);
} else {
oldValueSize = 0;
oldValue = null;
@@ -402,15 +422,15 @@
}
private ITupleReference readPKValue(ByteBuffer buffer) {
- if (buffer.position() + PKValueSize > buffer.limit()) {
+ if (buffer.position() + pKValueSize > buffer.limit()) {
throw new BufferUnderflowException();
}
- readPKValue.reset(buffer.array(), buffer.position(), PKValueSize);
- buffer.position(buffer.position() + PKValueSize);
+ readPKValue.reset(buffer.array(), buffer.position(), pKValueSize);
+ buffer.position(buffer.position() + pKValueSize);
return readPKValue;
}
- private static ITupleReference readTuple(ByteBuffer srcBuffer, SimpleTupleReference destTuple, int fieldCnt,
+ private static ITupleReference readTuple(ByteBuffer srcBuffer, ITreeIndexTupleReference destTuple, int fieldCnt,
int size) {
if (srcBuffer.position() + size > srcBuffer.limit()) {
throw new BufferUnderflowException();
@@ -424,9 +444,9 @@
@Override
public void computeAndSetPKValueSize() {
int i;
- PKValueSize = 0;
- for (i = 0; i < PKFieldCnt; i++) {
- PKValueSize += PKValue.getFieldLength(PKFields[i]);
+ pKValueSize = 0;
+ for (i = 0; i < pKFieldCnt; i++) {
+ pKValueSize += pKValue.getFieldLength(pKFields[i]);
}
}
@@ -442,7 +462,7 @@
}
private int getUpdateLogSizeWithoutOldValue() {
- return UPDATE_LOG_BASE_SIZE + PKValueSize + newValueSize;
+ return UPDATE_LOG_BASE_SIZE + pKValueSize + newValueSize;
}
@Override
@@ -456,7 +476,7 @@
logSize = JOB_TERMINATE_LOG_SIZE;
break;
case LogType.ENTITY_COMMIT:
- logSize = ENTITY_COMMIT_LOG_BASE_SIZE + PKValueSize;
+ logSize = ENTITY_COMMIT_LOG_BASE_SIZE + pKValueSize;
break;
case LogType.FLUSH:
logSize = FLUSH_LOG_SIZE;
@@ -491,13 +511,14 @@
if (logType == LogType.ENTITY_COMMIT || logType == LogType.UPDATE) {
builder.append(" DatasetId : ").append(datasetId);
builder.append(" ResourcePartition : ").append(resourcePartition);
- builder.append(" PKHashValue : ").append(PKHashValue);
- builder.append(" PKFieldCnt : ").append(PKFieldCnt);
- builder.append(" PKSize: ").append(PKValueSize);
+ builder.append(" PKHashValue : ").append(pKHashValue);
+ builder.append(" PKFieldCnt : ").append(pKFieldCnt);
+ builder.append(" PKSize: ").append(pKValueSize);
}
if (logType == LogType.UPDATE) {
builder.append(" ResourceId : ").append(resourceId);
}
+ builder.append(" Version : ").append(version);
return builder.toString();
}
@@ -557,12 +578,12 @@
@Override
public int getPKHashValue() {
- return PKHashValue;
+ return pKHashValue;
}
@Override
- public void setPKHashValue(int PKHashValue) {
- this.PKHashValue = PKHashValue;
+ public void setPKHashValue(int pKHashValue) {
+ this.pKHashValue = pKHashValue;
}
@Override
@@ -644,23 +665,23 @@
@Override
public int getPKValueSize() {
- return PKValueSize;
+ return pKValueSize;
}
@Override
public ITupleReference getPKValue() {
- return PKValue;
+ return pKValue;
}
@Override
public void setPKFields(int[] primaryKeyFields) {
- PKFields = primaryKeyFields;
- PKFieldCnt = PKFields.length;
+ pKFields = primaryKeyFields;
+ pKFieldCnt = pKFields.length;
}
@Override
- public void setPKValue(ITupleReference PKValue) {
- this.PKValue = PKValue;
+ public void setPKValue(ITupleReference pKValue) {
+ this.pKValue = pKValue;
}
public PrimaryIndexOperationTracker getOpTracker() {
@@ -669,6 +690,11 @@
@Override
public void setLogSource(byte logSource) {
+ if (logSource < LOG_SOURCE_MIN) {
+ throw new IllegalArgumentException("logSource underflow: " + logSource);
+ } else if (logSource > LOG_SOURCE_MAX) {
+ throw new IllegalArgumentException("logSource overflow: " + logSource);
+ }
this.logSource = logSource;
}
@@ -678,7 +704,7 @@
}
public void setPKFieldCnt(int pKFieldCnt) {
- PKFieldCnt = pKFieldCnt;
+ this.pKFieldCnt = pKFieldCnt;
}
public void setOpTracker(PrimaryIndexOperationTracker opTracker) {
@@ -782,4 +808,19 @@
public void setFlushingComponentMaxId(long flushingComponentMaxId) {
this.flushingComponentMaxId = flushingComponentMaxId;
}
+
+ @Override
+ public int getVersion() {
+ return version;
+ }
+
+ @Override
+ public void setVersion(int version) {
+ if (version < VERSION_MIN) {
+ throw new IllegalArgumentException("version underflow: " + version);
+ } else if (version > VERSION_MAX) {
+ throw new IllegalArgumentException("version overflow: " + version);
+ }
+ this.version = (byte) version;
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java
index 690eeb6..25ff401 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java
@@ -58,15 +58,15 @@
}
public static void formEntityCommitLogRecord(LogRecord logRecord, ITransactionContext txnCtx, int datasetId,
- int PKHashValue, ITupleReference PKValue, int[] PKFields, int resourcePartition, byte entityCommitType) {
+ int pKHashValue, ITupleReference pKValue, int[] pKFields, int resourcePartition, byte entityCommitType) {
logRecord.setTxnCtx(txnCtx);
logRecord.setLogType(entityCommitType);
logRecord.setTxnId(txnCtx.getTxnId().getId());
logRecord.setDatasetId(datasetId);
- logRecord.setPKHashValue(PKHashValue);
- logRecord.setPKFieldCnt(PKFields.length);
- logRecord.setPKValue(PKValue);
- logRecord.setPKFields(PKFields);
+ logRecord.setPKHashValue(pKHashValue);
+ logRecord.setPKFieldCnt(pKFields.length);
+ logRecord.setPKValue(pKValue);
+ logRecord.setPKFields(pKFields);
logRecord.setResourcePartition(resourcePartition);
logRecord.computeAndSetPKValueSize();
logRecord.computeAndSetLogSize();
diff --git a/asterixdb/asterix-server/src/test/java/org/apache/asterix/test/server/RecoveryIT.java b/asterixdb/asterix-server/src/test/java/org/apache/asterix/test/server/RecoveryIT.java
index 308920b..9123a5d 100644
--- a/asterixdb/asterix-server/src/test/java/org/apache/asterix/test/server/RecoveryIT.java
+++ b/asterixdb/asterix-server/src/test/java/org/apache/asterix/test/server/RecoveryIT.java
@@ -19,7 +19,6 @@
package org.apache.asterix.test.server;
import java.io.File;
-import java.io.FilenameFilter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
diff --git a/asterixdb/asterix-server/src/test/resources/transactionts/queries/recover_after_abort/large_object_100K/large_object_100K.1.script.aql b/asterixdb/asterix-server/src/test/resources/transactionts/queries/recover_after_abort/large_object_100K/large_object_100K.1.script.aql
new file mode 100644
index 0000000..7d441cd
--- /dev/null
+++ b/asterixdb/asterix-server/src/test/resources/transactionts/queries/recover_after_abort/large_object_100K/large_object_100K.1.script.aql
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+create_and_start.sh
\ No newline at end of file
diff --git a/asterixdb/asterix-server/src/test/resources/transactionts/queries/recover_after_abort/large_object_100K/large_object_100K.2.ddl.aql b/asterixdb/asterix-server/src/test/resources/transactionts/queries/recover_after_abort/large_object_100K/large_object_100K.2.ddl.aql
new file mode 100644
index 0000000..02f3a72
--- /dev/null
+++ b/asterixdb/asterix-server/src/test/resources/transactionts/queries/recover_after_abort/large_object_100K/large_object_100K.2.ddl.aql
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name : large_object_100K
+ * Description : ASTERIXDB-2491 (Recovery fails for objects with size larger than 32KB)
+ * Expected Result : Success
+ * Date : December 25 2018
+ */
+
+drop dataverse recovery if exists;
+create dataverse recovery;
+use dataverse recovery;
+
+create type RecoveryType as {
+ uid: uuid,
+ numbers: [int]
+};
+
+create dataset RecoveryDataset (RecoveryType)
+primary key uid autogenerated;
diff --git a/asterixdb/asterix-server/src/test/resources/transactionts/queries/recover_after_abort/large_object_100K/large_object_100K.3.update.aql b/asterixdb/asterix-server/src/test/resources/transactionts/queries/recover_after_abort/large_object_100K/large_object_100K.3.update.aql
new file mode 100644
index 0000000..1e24bcf
--- /dev/null
+++ b/asterixdb/asterix-server/src/test/resources/transactionts/queries/recover_after_abort/large_object_100K/large_object_100K.3.update.aql
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name : large_object_100K
+ * Description : ASTERIXDB-2491 (Recovery fails for objects with size larger than 32KB)
+ * Expected Result : Success
+ * Date : December 25 2018
+ */
+
+use dataverse recovery;
+
+//Create a 100KB record (8-bytes * 12800 = 102400 bytes)
+insert into dataset RecoveryDataset (
+ {"numbers": range(1, 12800)}
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-server/src/test/resources/transactionts/queries/recover_after_abort/large_object_100K/large_object_100K.4.txnqbc.aql b/asterixdb/asterix-server/src/test/resources/transactionts/queries/recover_after_abort/large_object_100K/large_object_100K.4.txnqbc.aql
new file mode 100644
index 0000000..2e19c39
--- /dev/null
+++ b/asterixdb/asterix-server/src/test/resources/transactionts/queries/recover_after_abort/large_object_100K/large_object_100K.4.txnqbc.aql
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name : large_object_100K
+ * Description : ASTERIXDB-2491 (Recovery fails for objects with size larger than 32KB)
+ * Expected Result : Success
+ * Date : December 25 2018
+ */
+
+use dataverse recovery;
+
+for $x in dataset RecoveryDataset
+return count($x.numbers);
\ No newline at end of file
diff --git a/asterixdb/asterix-server/src/test/resources/transactionts/queries/recover_after_abort/large_object_100K/large_object_100K.5.script.aql b/asterixdb/asterix-server/src/test/resources/transactionts/queries/recover_after_abort/large_object_100K/large_object_100K.5.script.aql
new file mode 100644
index 0000000..4583455
--- /dev/null
+++ b/asterixdb/asterix-server/src/test/resources/transactionts/queries/recover_after_abort/large_object_100K/large_object_100K.5.script.aql
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+kill_cc_and_nc.sh
\ No newline at end of file
diff --git a/asterixdb/asterix-server/src/test/resources/transactionts/queries/recover_after_abort/large_object_100K/large_object_100K.6.script.aql b/asterixdb/asterix-server/src/test/resources/transactionts/queries/recover_after_abort/large_object_100K/large_object_100K.6.script.aql
new file mode 100644
index 0000000..7087cd3
--- /dev/null
+++ b/asterixdb/asterix-server/src/test/resources/transactionts/queries/recover_after_abort/large_object_100K/large_object_100K.6.script.aql
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+stop_and_start.sh
\ No newline at end of file
diff --git a/asterixdb/asterix-server/src/test/resources/transactionts/queries/recover_after_abort/large_object_100K/large_object_100K.7.txnqar.aql b/asterixdb/asterix-server/src/test/resources/transactionts/queries/recover_after_abort/large_object_100K/large_object_100K.7.txnqar.aql
new file mode 100644
index 0000000..2e19c39
--- /dev/null
+++ b/asterixdb/asterix-server/src/test/resources/transactionts/queries/recover_after_abort/large_object_100K/large_object_100K.7.txnqar.aql
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name : large_object_100K
+ * Description : ASTERIXDB-2491 (Recovery fails for objects with size larger than 32KB)
+ * Expected Result : Success
+ * Date : December 25 2018
+ */
+
+use dataverse recovery;
+
+for $x in dataset RecoveryDataset
+return count($x.numbers);
\ No newline at end of file
diff --git a/asterixdb/asterix-server/src/test/resources/transactionts/queries/recover_after_abort/large_object_100K/large_object_100K.8.script.aql b/asterixdb/asterix-server/src/test/resources/transactionts/queries/recover_after_abort/large_object_100K/large_object_100K.8.script.aql
new file mode 100644
index 0000000..40df6fb
--- /dev/null
+++ b/asterixdb/asterix-server/src/test/resources/transactionts/queries/recover_after_abort/large_object_100K/large_object_100K.8.script.aql
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+stop_and_delete.sh
\ No newline at end of file
diff --git a/asterixdb/asterix-server/src/test/resources/transactionts/scripts/recover_after_abort/large_object_100K/create_and_start.sh b/asterixdb/asterix-server/src/test/resources/transactionts/scripts/recover_after_abort/large_object_100K/create_and_start.sh
new file mode 100755
index 0000000..e358618
--- /dev/null
+++ b/asterixdb/asterix-server/src/test/resources/transactionts/scripts/recover_after_abort/large_object_100K/create_and_start.sh
@@ -0,0 +1,18 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+$NCSERVICE_HOME/opt/local/bin/start-sample-cluster.sh 1>/dev/null 2>&1;
diff --git a/asterixdb/asterix-server/src/test/resources/transactionts/scripts/recover_after_abort/large_object_100K/kill_cc_and_nc.sh b/asterixdb/asterix-server/src/test/resources/transactionts/scripts/recover_after_abort/large_object_100K/kill_cc_and_nc.sh
new file mode 100755
index 0000000..b6326cc
--- /dev/null
+++ b/asterixdb/asterix-server/src/test/resources/transactionts/scripts/recover_after_abort/large_object_100K/kill_cc_and_nc.sh
@@ -0,0 +1,19 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+ps -ef | awk '/java.*org\.apache\.hyracks\.control\.[cn]c\.[CN]CDriver/ {print $2}' | xargs -n 1 kill -9
+ps -ef | awk '/java.*org\.apache\.hyracks\.control\.nc\.service\.NCService/ {print $2}' | xargs -n 1 kill -9
diff --git a/asterixdb/asterix-server/src/test/resources/transactionts/scripts/recover_after_abort/large_object_100K/stop_and_delete.sh b/asterixdb/asterix-server/src/test/resources/transactionts/scripts/recover_after_abort/large_object_100K/stop_and_delete.sh
new file mode 100755
index 0000000..818d17d
--- /dev/null
+++ b/asterixdb/asterix-server/src/test/resources/transactionts/scripts/recover_after_abort/large_object_100K/stop_and_delete.sh
@@ -0,0 +1,20 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+$NCSERVICE_HOME/opt/local/bin/stop-sample-cluster.sh;
+rm -rf $NCSERVICE_HOME/opt/local/data;
+
diff --git a/asterixdb/asterix-server/src/test/resources/transactionts/scripts/recover_after_abort/large_object_100K/stop_and_start.sh b/asterixdb/asterix-server/src/test/resources/transactionts/scripts/recover_after_abort/large_object_100K/stop_and_start.sh
new file mode 100755
index 0000000..9a0c506
--- /dev/null
+++ b/asterixdb/asterix-server/src/test/resources/transactionts/scripts/recover_after_abort/large_object_100K/stop_and_start.sh
@@ -0,0 +1,19 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+$NCSERVICE_HOME/opt/local/bin/stop-sample-cluster.sh;
+$NCSERVICE_HOME/opt/local/bin/start-sample-cluster.sh;
diff --git a/asterixdb/asterix-server/src/test/resources/transactionts/testsuite.xml b/asterixdb/asterix-server/src/test/resources/transactionts/testsuite.xml
index 38179b2..8343e85 100644
--- a/asterixdb/asterix-server/src/test/resources/transactionts/testsuite.xml
+++ b/asterixdb/asterix-server/src/test/resources/transactionts/testsuite.xml
@@ -163,6 +163,12 @@
<!-- <expected-error>org.apache.hyracks.algebricks.common.exceptions.AlgebricksException</expected-error> -->
</compilation-unit>
</test-case>
+
+ <test-case FilePath="recover_after_abort">
+ <compilation-unit name="large_object_100K">
+ <output-dir compare="Text">large_object_100K</output-dir>
+ </compilation-unit>
+ </test-case>
</test-group>
<test-group name="recovery_ddl">
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
index 28290bb..1aa040d 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
@@ -31,6 +31,7 @@
import org.apache.asterix.common.transactions.ILogRequester;
import org.apache.asterix.common.transactions.ITransactionContext;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.asterix.common.transactions.LogConstants;
import org.apache.asterix.common.transactions.LogRecord;
import org.apache.asterix.common.transactions.LogSource;
import org.apache.asterix.common.transactions.LogType;
@@ -74,7 +75,7 @@
full = new AtomicBoolean(false);
appendOffset = 0;
flushOffset = 0;
- syncCommitQ = new LinkedBlockingQueue<>(logPageSize / ILogRecord.JOB_TERMINATE_LOG_SIZE);
+ syncCommitQ = new LinkedBlockingQueue<>(logPageSize / LogConstants.JOB_TERMINATE_LOG_SIZE);
flushQ = new LinkedBlockingQueue<>();
remoteJobsQ = new LinkedBlockingQueue<>();
reusableTxnId = new MutableTxnId(-1);
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index a990379..e66185c 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -19,7 +19,6 @@
package org.apache.asterix.transaction.management.service.logging;
import java.io.File;
-import java.io.FilenameFilter;
import java.io.IOException;
import java.io.OutputStream;
import java.io.RandomAccessFile;
@@ -47,6 +46,7 @@
import org.apache.asterix.common.transactions.ITransactionContext;
import org.apache.asterix.common.transactions.ITransactionManager;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.asterix.common.transactions.LogConstants;
import org.apache.asterix.common.transactions.LogManagerProperties;
import org.apache.asterix.common.transactions.LogSource;
import org.apache.asterix.common.transactions.LogType;
@@ -83,8 +83,8 @@
private FileChannel appendChannel;
private ILogBuffer appendPage;
private LogFlusher logFlusher;
- private Future<? extends Object> futureLogFlusher;
- protected LinkedBlockingQueue<ILogRecord> flushLogsQ;
+ private Future<?> futureLogFlusher;
+ private LinkedBlockingQueue<ILogRecord> flushLogsQ;
private long currentLogFileId;
public LogManager(ITransactionSubsystem txnSubsystem) {
@@ -448,15 +448,7 @@
if (!fileLogDir.isDirectory()) {
throw new IllegalStateException("log dir " + logDir + " exists but it is not a directory");
}
- logFileNames = fileLogDir.list(new FilenameFilter() {
- @Override
- public boolean accept(File dir, String name) {
- if (name.startsWith(logFilePrefix)) {
- return true;
- }
- return false;
- }
- });
+ logFileNames = fileLogDir.list((dir, name) -> name.startsWith(logFilePrefix));
if (logFileNames == null) {
throw new IllegalStateException("listing of log dir (" + logDir + ") files returned null. "
+ "Either an IO error occurred or the dir was just deleted by another process/thread");
@@ -627,7 +619,7 @@
class LogFlusher implements Callable<Boolean> {
private static final Logger LOGGER = org.apache.logging.log4j.LogManager.getLogger();
- private static final ILogBuffer POISON_PILL = new LogBuffer(null, ILogRecord.JOB_TERMINATE_LOG_SIZE, null);
+ private static final ILogBuffer POISON_PILL = new LogBuffer(null, LogConstants.JOB_TERMINATE_LOG_SIZE, null);
private final LogManager logMgr;//for debugging
private final LinkedBlockingQueue<ILogBuffer> emptyQ;
private final LinkedBlockingQueue<ILogBuffer> flushQ;
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java
index 3d79adc..30caab7 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java
@@ -239,37 +239,37 @@
throw new ACIDException(e);
}
- ByteBuffer readBuffer = this.readBuffer;
+ readRecord(lsn);
+ logRecord.setLSN(readLSN);
+ readLSN += logRecord.getLogSize();
+ return logRecord;
+ }
+
+ private void readRecord(long lsn) {
+ ByteBuffer buffer = this.readBuffer;
while (true) {
- RecordReadStatus status = logRecord.readLogRecord(readBuffer);
+ RecordReadStatus status = logRecord.readLogRecord(buffer);
switch (status) {
- case LARGE_RECORD: {
- readBuffer = ByteBuffer.allocate(logRecord.getLogSize());
- fillLogReadBuffer(logRecord.getLogSize(), readBuffer);
+ case LARGE_RECORD:
+ buffer = ByteBuffer.allocate(logRecord.getLogSize());
+ fillLogReadBuffer(logRecord.getLogSize(), buffer);
//now see what we have in the refilled buffer
- continue;
- }
- case TRUNCATED: {
+ break;
+ case TRUNCATED:
if (!fillLogReadBuffer()) {
throw new IllegalStateException(
"Could not read LSN(" + lsn + ") from log file id " + logFile.getLogFileId());
}
//now read the complete log record
- continue;
- }
- case BAD_CHKSUM: {
- throw new ACIDException("Log record has incorrect checksum");
- }
- case OK:
break;
+ case BAD_CHKSUM:
+ throw new ACIDException("Log record has incorrect checksum");
+ case OK:
+ return;
default:
throw new IllegalStateException("Unexpected log read status: " + status);
}
- break;
}
- logRecord.setLSN(readLSN);
- readLSN += logRecord.getLogSize();
- return logRecord;
}
private void getLogFile() {
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/TxnEntityId.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/TxnEntityId.java
index af74b13..cd13de5 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/TxnEntityId.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/TxnEntityId.java
@@ -18,10 +18,10 @@
*/
package org.apache.asterix.transaction.management.service.recovery;
+import static org.apache.asterix.common.transactions.LogConstants.*;
+
import java.nio.ByteBuffer;
-import org.apache.asterix.common.transactions.ILogRecord;
-import org.apache.asterix.common.transactions.LogRecord;
import org.apache.asterix.common.transactions.TxnId;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -165,7 +165,7 @@
public int getCurrentSize() {
//txn id, dataset id, pkHashValue, arraySize, isByteArrayPKValue
- int size = TxnId.BYTES + ILogRecord.DS_LEN + LogRecord.PKHASH_LEN + LogRecord.PKSZ_LEN + Byte.BYTES;
+ int size = TxnId.BYTES + DS_LEN + PKHASH_LEN + PKSZ_LEN + Byte.BYTES;
//byte arraySize
if (isByteArrayPKValue && byteArrayPKValue != null) {
size += byteArrayPKValue.length;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/SimpleTupleReference.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/SimpleTupleReference.java
index e82b037..cc485ff 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/SimpleTupleReference.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/SimpleTupleReference.java
@@ -19,7 +19,7 @@
package org.apache.hyracks.storage.am.common.tuples;
-import org.apache.hyracks.data.std.primitive.ShortPointable;
+import org.apache.hyracks.data.std.primitive.IntegerPointable;
import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleReference;
import org.apache.hyracks.storage.am.common.util.BitOperationUtils;
@@ -71,10 +71,10 @@
@Override
public int getFieldLength(int fIdx) {
if (fIdx == 0) {
- return ShortPointable.getShort(buf, tupleStartOff + nullFlagsBytes);
+ return IntegerPointable.getInteger(buf, tupleStartOff + nullFlagsBytes);
} else {
- return ShortPointable.getShort(buf, tupleStartOff + nullFlagsBytes + fIdx * 2)
- - ShortPointable.getShort(buf, tupleStartOff + nullFlagsBytes + ((fIdx - 1) * 2));
+ return IntegerPointable.getInteger(buf, tupleStartOff + nullFlagsBytes + fIdx * Integer.BYTES)
+ - IntegerPointable.getInteger(buf, tupleStartOff + nullFlagsBytes + ((fIdx - 1) * Integer.BYTES));
}
}
@@ -84,7 +84,7 @@
return tupleStartOff + nullFlagsBytes + fieldSlotsBytes;
} else {
return tupleStartOff + nullFlagsBytes + fieldSlotsBytes
- + ShortPointable.getShort(buf, tupleStartOff + nullFlagsBytes + ((fIdx - 1) * 2));
+ + IntegerPointable.getInteger(buf, tupleStartOff + nullFlagsBytes + ((fIdx - 1) * Integer.BYTES));
}
}
@@ -93,12 +93,12 @@
}
protected int getFieldSlotsBytes() {
- return fieldCount * 2;
+ return fieldCount * Integer.BYTES;
}
@Override
public int getTupleSize() {
return nullFlagsBytes + fieldSlotsBytes
- + ShortPointable.getShort(buf, tupleStartOff + nullFlagsBytes + (fieldCount - 1) * 2);
+ + IntegerPointable.getInteger(buf, tupleStartOff + nullFlagsBytes + (fieldCount - 1) * Integer.BYTES);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/SimpleTupleReferenceV0.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/SimpleTupleReferenceV0.java
new file mode 100644
index 0000000..0b4e8c5
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/SimpleTupleReferenceV0.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.am.common.tuples;
+
+import org.apache.hyracks.data.std.primitive.ShortPointable;
+
+/**
+ * This class is only used to read tuple references from log version 0
+ */
+public class SimpleTupleReferenceV0 extends SimpleTupleReference {
+
+ @Override
+ public int getFieldLength(int fIdx) {
+ if (fIdx == 0) {
+ return ShortPointable.getShort(buf, tupleStartOff + nullFlagsBytes);
+ } else {
+ return ShortPointable.getShort(buf, tupleStartOff + nullFlagsBytes + fIdx * 2)
+ - ShortPointable.getShort(buf, tupleStartOff + nullFlagsBytes + ((fIdx - 1) * 2));
+ }
+ }
+
+ @Override
+ public int getFieldStart(int fIdx) {
+ if (fIdx == 0) {
+ return tupleStartOff + nullFlagsBytes + fieldSlotsBytes;
+ } else {
+ return tupleStartOff + nullFlagsBytes + fieldSlotsBytes
+ + ShortPointable.getShort(buf, tupleStartOff + nullFlagsBytes + ((fIdx - 1) * 2));
+ }
+ }
+
+ @Override
+ protected int getFieldSlotsBytes() {
+ return fieldCount * 2;
+ }
+
+ @Override
+ public int getTupleSize() {
+ return nullFlagsBytes + fieldSlotsBytes
+ + ShortPointable.getShort(buf, tupleStartOff + nullFlagsBytes + (fieldCount - 1) * 2);
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/SimpleTupleWriter.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/SimpleTupleWriter.java
index 410a0e3..ca7217e 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/SimpleTupleWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/SimpleTupleWriter.java
@@ -21,6 +21,7 @@
import java.nio.ByteBuffer;
+import org.apache.hyracks.data.std.primitive.IntegerPointable;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
import org.apache.hyracks.storage.am.common.util.BitOperationUtils;
@@ -35,12 +36,6 @@
private SimpleTupleWriter() {
}
- // Write short in little endian to target byte array at given offset.
- private static void writeShortL(short s, byte[] buf, int targetOff) {
- buf[targetOff] = (byte) (s >> 8);
- buf[targetOff + 1] = (byte) (s >> 0);
- }
-
@Override
public int bytesRequired(ITupleReference tuple) {
int bytes = getNullFlagsBytes(tuple) + getFieldSlotsBytes(tuple);
@@ -83,7 +78,7 @@
System.arraycopy(tuple.getFieldData(i), tuple.getFieldStart(i), targetBuf, runner, tuple.getFieldLength(i));
fieldEndOff += tuple.getFieldLength(i);
runner += tuple.getFieldLength(i);
- writeShortL((short) fieldEndOff, targetBuf, targetOff + nullFlagsBytes + i * 2);
+ IntegerPointable.setInteger(targetBuf, targetOff + nullFlagsBytes + i * Integer.BYTES, fieldEndOff);
}
return runner - targetOff;
}
@@ -103,7 +98,8 @@
System.arraycopy(tuple.getFieldData(i), tuple.getFieldStart(i), targetBuf, runner, tuple.getFieldLength(i));
fieldEndOff += tuple.getFieldLength(i);
runner += tuple.getFieldLength(i);
- writeShortL((short) fieldEndOff, targetBuf, targetOff + nullFlagsBytes + fieldCounter * 2);
+ IntegerPointable.setInteger(targetBuf, targetOff + nullFlagsBytes + fieldCounter * Integer.BYTES,
+ fieldEndOff);
fieldCounter++;
}
@@ -115,7 +111,7 @@
}
protected int getFieldSlotsBytes(ITupleReference tuple) {
- return tuple.getFieldCount() * 2;
+ return tuple.getFieldCount() * Integer.BYTES;
}
protected int getNullFlagsBytes(ITupleReference tuple, int startField, int numFields) {
@@ -123,7 +119,7 @@
}
protected int getFieldSlotsBytes(ITupleReference tuple, int startField, int numFields) {
- return numFields * 2;
+ return numFields * Integer.BYTES;
}
@Override