Optionally log image before when before image found in memory component
In addition, this change fixes an issue with one of the test cases for
FrameSpiller.
Change-Id: Iaaed48f4c2ca8d83253e81cd7c60aad998b67b1e
Reviewed-on: https://asterix-gerrit.ics.uci.edu/900
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <michael.blow@couchbase.com>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
index 45e3b06..507a393 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
@@ -132,7 +132,7 @@
/**
* Constructor which wraps an IApplicationConfig.
*/
- public AsterixPropertiesAccessor (IApplicationConfig cfg) {
+ public AsterixPropertiesAccessor(IApplicationConfig cfg) {
this.cfg = cfg;
instanceName = cfg.getString("asterix", "instance", "DEFAULT_INSTANCE");
String mdNode = null;
@@ -234,7 +234,8 @@
return interpreter.interpret(value);
} catch (IllegalArgumentException e) {
if (LOGGER.isLoggable(Level.SEVERE)) {
- StringBuilder msg = new StringBuilder("Invalid property value '" + value + "' for property '" + property + "'.\n");
+ StringBuilder msg =
+ new StringBuilder("Invalid property value '" + value + "' for property '" + property + "'.\n");
if (p != null) {
msg.append("See the description: \n" + p.getDescription() + "\n");
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
index 5445b11..9a76b40 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
@@ -48,7 +48,7 @@
/**
* The following three variables are used to keep track of the information regarding flushing partial frame such as
* 1. whether there was a partial frame flush for the current frame,
- * ==> captured in flushedPartialTuples variabl
+ * ==> captured in flushedPartialTuples variable
* 2. the last flushed tuple index in the frame if there was a partial frame flush,
* ==> captured in lastFlushedTupleIdx variable
* 3. the current tuple index the frame, where this operator is working on the current tuple.
@@ -89,8 +89,8 @@
tupleFilter = tupleFilterFactory.createTupleFilter(indexHelper.getTaskContext());
frameTuple = new FrameTupleReference();
}
- IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
- .getApplicationContext().getApplicationObject();
+ IAsterixAppRuntimeContext runtimeCtx =
+ (IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
AsterixLSMIndexUtil.checkAndSetFirstLSN(lsmIndex, runtimeCtx.getTransactionSubsystem().getLogManager());
} catch (Throwable th) {
throw new HyracksDataException(th);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
index 1992a00..cd05ba0 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
@@ -78,8 +78,6 @@
public void setNewOp(byte newOp);
- public int getNewValueSize();
-
public void setNewValueSize(int newValueSize);
public ITupleReference getNewValue();
@@ -134,4 +132,10 @@
public boolean isReplicated();
public void writeRemoteLogRecord(ByteBuffer buffer);
+
+ public ITupleReference getOldValue();
+
+ public void setOldValue(ITupleReference oldValue);
+
+ public void setOldValueSize(int oldValueSize);
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
index 4823a92..23fdd0f 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
@@ -100,10 +100,13 @@
private long resourceId;
private int resourcePartition;
private int logSize;
- private int fieldCnt;
+ private int newValueFieldCount;
private byte newOp;
private int newValueSize;
private ITupleReference newValue;
+ private int oldValueSize;
+ private ITupleReference oldValue;
+ private int oldValueFieldCount;
private long checksum;
// ------------- fields in a log record (end) --------------//
@@ -111,9 +114,9 @@
private ITransactionContext txnCtx;
private long LSN;
private final AtomicBoolean isFlushed;
- private final SimpleTupleWriter tupleWriter;
private final PrimaryKeyTupleReference readPKValue;
private final SimpleTupleReference readNewValue;
+ private final SimpleTupleReference readOldValue;
private final CRC32 checksumGen;
private int[] PKFields;
private PrimaryIndexOperationTracker opTracker;
@@ -128,9 +131,9 @@
public LogRecord() {
isFlushed = new AtomicBoolean(false);
- tupleWriter = new SimpleTupleWriter();
readPKValue = new PrimaryKeyTupleReference();
- readNewValue = (SimpleTupleReference) tupleWriter.createTupleReference();
+ readNewValue = SimpleTupleWriter.INSTANCE.createTupleReference();
+ readOldValue = SimpleTupleWriter.INSTANCE.createTupleReference();
checksumGen = new CRC32();
logSource = LogSource.LOCAL;
}
@@ -152,10 +155,15 @@
if (logType == LogType.UPDATE) {
buffer.putLong(resourceId);
buffer.putInt(logSize);
- buffer.putInt(fieldCnt);
+ buffer.putInt(newValueFieldCount);
buffer.put(newOp);
buffer.putInt(newValueSize);
writeTuple(buffer, newValue, newValueSize);
+ if (oldValueSize > 0) {
+ buffer.putInt(oldValueSize);
+ buffer.putInt(oldValueFieldCount);
+ writeTuple(buffer, oldValue, oldValueSize);
+ }
}
if (logType == LogType.FLUSH) {
buffer.putInt(datasetId);
@@ -195,7 +203,7 @@
private void writeTuple(ByteBuffer buffer, ITupleReference tuple, int size) {
if (logSource == LogSource.LOCAL) {
- tupleWriter.writeTuple(tuple, buffer.array(), buffer.position());
+ SimpleTupleWriter.INSTANCE.writeTuple(tuple, buffer.array(), buffer.position());
} else {
//since the tuple is already serialized in remote logs, just copy it from beginning to end.
System.arraycopy(tuple.getFieldData(0), 0, buffer.array(), buffer.position(), size);
@@ -241,64 +249,93 @@
logSource = buffer.get();
logType = buffer.get();
jobId = buffer.getInt();
-
- if (logType == LogType.FLUSH) {
- if (buffer.remaining() < DatasetId.BYTES) {
- return RecordReadStatus.TRUNCATED;
- }
- datasetId = buffer.getInt();
- resourceId = 0l;
- computeAndSetLogSize();
- } else if (logType == LogType.WAIT) {
- computeAndSetLogSize();
- } else {
- if (logType == LogType.JOB_COMMIT || logType == LogType.ABORT) {
+ switch (logType) {
+ case LogType.FLUSH:
+ if (buffer.remaining() < DatasetId.BYTES) {
+ return RecordReadStatus.TRUNCATED;
+ }
+ datasetId = buffer.getInt();
+ resourceId = 0L;
+ break;
+ case LogType.ABORT:
+ case LogType.JOB_COMMIT:
datasetId = -1;
PKHashValue = -1;
- } else {
- //attempt to read in the resourcePartition, dsid, PK hash and PK length
- if (buffer.remaining() < ENTITYCOMMIT_UPDATE_HEADER_LEN) {
+ break;
+ case LogType.ENTITY_COMMIT:
+ case LogType.UPSERT_ENTITY_COMMIT:
+ if (!readEntityInfo(buffer)) {
return RecordReadStatus.TRUNCATED;
}
- resourcePartition = buffer.getInt();
- datasetId = buffer.getInt();
- PKHashValue = buffer.getInt();
- PKValueSize = buffer.getInt();
- // attempt to read in the PK
- if (buffer.remaining() < PKValueSize) {
+ break;
+ case LogType.UPDATE:
+ if (!readEntityInfo(buffer)) {
return RecordReadStatus.TRUNCATED;
}
- if (PKValueSize <= 0) {
- throw new IllegalStateException("Primary Key Size is less than or equal to 0");
- }
- PKValue = readPKValue(buffer);
- }
-
- if (logType == LogType.UPDATE) {
- // attempt to read in the previous LSN, log size, new value size, and new record type
if (buffer.remaining() < UPDATE_LSN_HEADER + UPDATE_BODY_HEADER) {
return RecordReadStatus.TRUNCATED;
}
resourceId = buffer.getLong();
logSize = buffer.getInt();
- fieldCnt = buffer.getInt();
+ newValueFieldCount = buffer.getInt();
newOp = buffer.get();
newValueSize = buffer.getInt();
- if (buffer.remaining() < newValueSize) {
- if (logSize > buffer.capacity()) {
- return RecordReadStatus.LARGE_RECORD;
- }
- return RecordReadStatus.TRUNCATED;
- }
- newValue = readTuple(buffer, readNewValue, fieldCnt, newValueSize);
- } else {
- computeAndSetLogSize();
- }
+ return readEntity(buffer);
+ default:
+ break;
}
-
+ computeAndSetLogSize();
return RecordReadStatus.OK;
}
+ private RecordReadStatus readEntity(ByteBuffer buffer) {
+ if (buffer.remaining() < newValueSize) {
+ if (logSize > buffer.capacity()) {
+ return RecordReadStatus.LARGE_RECORD;
+ }
+ return RecordReadStatus.TRUNCATED;
+ }
+ newValue = readTuple(buffer, readNewValue, newValueFieldCount, newValueSize);
+ if (logSize > getUpdateLogSizeWithoutOldValue()) {
+ // Prev Image exists
+ if (buffer.remaining() < Integer.BYTES) {
+ return RecordReadStatus.TRUNCATED;
+ }
+ oldValueSize = buffer.getInt();
+ if (buffer.remaining() < Integer.BYTES) {
+ return RecordReadStatus.TRUNCATED;
+ }
+ oldValueFieldCount = buffer.getInt();
+ if (buffer.remaining() < oldValueSize) {
+ return RecordReadStatus.TRUNCATED;
+ }
+ oldValue = readTuple(buffer, readOldValue, oldValueFieldCount, oldValueSize);
+ } else {
+ oldValueSize = 0;
+ }
+ return RecordReadStatus.OK;
+ }
+
+ private boolean readEntityInfo(ByteBuffer buffer) {
+ //attempt to read in the resourcePartition, dsid, PK hash and PK length
+ if (buffer.remaining() < ENTITYCOMMIT_UPDATE_HEADER_LEN) {
+ return false;
+ }
+ resourcePartition = buffer.getInt();
+ datasetId = buffer.getInt();
+ PKHashValue = buffer.getInt();
+ PKValueSize = buffer.getInt();
+ // attempt to read in the PK
+ if (buffer.remaining() < PKValueSize) {
+ return false;
+ }
+ if (PKValueSize <= 0) {
+ throw new IllegalStateException("Primary Key Size is less than or equal to 0");
+ }
+ PKValue = readPKValue(buffer);
+ return true;
+ }
+
@Override
public void readRemoteLog(ByteBuffer buffer) {
//read common fields
@@ -345,7 +382,14 @@
}
private void setUpdateLogSize() {
- logSize = UPDATE_LOG_BASE_SIZE + PKValueSize + newValueSize;
+ logSize = getUpdateLogSizeWithoutOldValue();
+ if (oldValueSize > 0) {
+ logSize += /*size*/Integer.BYTES + /*fieldCount*/Integer.BYTES + /*tuple*/oldValueSize;
+ }
+ }
+
+ private int getUpdateLogSizeWithoutOldValue() {
+ return UPDATE_LOG_BASE_SIZE + PKValueSize + newValueSize;
}
@Override
@@ -504,11 +548,6 @@
}
@Override
- public int getNewValueSize() {
- return newValueSize;
- }
-
- @Override
public void setNewValueSize(int newValueSize) {
this.newValueSize = newValueSize;
}
@@ -521,7 +560,7 @@
@Override
public void setNewValue(ITupleReference newValue) {
this.newValue = newValue;
- this.fieldCnt = newValue.getFieldCount();
+ this.newValueFieldCount = newValue.getFieldCount();
}
@Override
@@ -633,4 +672,20 @@
public boolean isReplicated() {
return replicated;
}
+
+ @Override
+ public ITupleReference getOldValue() {
+ return oldValue;
+ }
+
+ @Override
+ public void setOldValue(ITupleReference oldValue) {
+ this.oldValue = oldValue;
+ this.oldValueFieldCount = oldValue.getFieldCount();
+ }
+
+ @Override
+ public void setOldValueSize(int oldValueSize) {
+ this.oldValueSize = oldValueSize;
+ }
}
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
index 3becd96..bc1c328 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
@@ -679,47 +679,50 @@
*/
@org.junit.Test
public void testMemoryVarSizeFrameWithSpillNoDiscard() {
- try {
- Random random = new Random();
- IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
- // Spill budget = Memory budget, No discard
- FeedPolicyAccessor fpa =
- createFeedPolicyAccessor(true, false, DEFAULT_FRAME_SIZE * NUM_FRAMES, DISCARD_ALLOWANCE);
- // Non-Active Writer
- TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
- writer.freeze();
- // FramePool
- ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
- FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
- handler.open();
- ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
- int multiplier = 1;
- // add NUM_FRAMES times
- while ((multiplier <= framePool.remaining())) {
- handler.nextFrame(buffer);
- multiplier = random.nextInt(10) + 1;
- buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * multiplier);
+ for (int k = 0; k < 1000; k++) {
+ try {
+ Random random = new Random();
+ IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+ // Spill budget = Memory budget, No discard
+ FeedPolicyAccessor fpa =
+ createFeedPolicyAccessor(true, false, DEFAULT_FRAME_SIZE * NUM_FRAMES, DISCARD_ALLOWANCE);
+ // Non-Active Writer
+ TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
+ writer.freeze();
+ // FramePool
+ ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
+ FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool);
+ handler.open();
+ ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE);
+ int multiplier = 1;
+ // add NUM_FRAMES times
+ while ((multiplier <= framePool.remaining())) {
+ handler.nextFrame(buffer);
+ multiplier = random.nextInt(10) + 1;
+ buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * multiplier);
+ }
+ // Next call should Not block. we will do it in a different thread
+ Future<?> result = EXECUTOR.submit(new Pusher(buffer, handler));
+ result.get();
+ // Check that no records were discarded
+ assertEquals(handler.getNumDiscarded(), 0);
+ // Check that one frame is spilled
+ assertEquals(handler.getNumSpilled(), 1);
+ int numOfBuffersInMemory = handler.getInternalBuffer().size();
+ // consume memory frames
+ while (numOfBuffersInMemory > 0) {
+ writer.kick();
+ numOfBuffersInMemory--;
+ }
+ // There should be 1 frame on disk
+ Assert.assertEquals(1, handler.framesOnDisk());
+ writer.unfreeze();
+ handler.close();
+ Assert.assertEquals(0, handler.framesOnDisk());
+ } catch (Throwable th) {
+ th.printStackTrace();
+ Assert.fail();
}
- // Next call should Not block. we will do it in a different thread
- Future<?> result = EXECUTOR.submit(new Pusher(buffer, handler));
- result.get();
- // Check that no records were discarded
- assertEquals(handler.getNumDiscarded(), 0);
- // Check that one frame is spilled
- assertEquals(handler.getNumSpilled(), 1);
- // consume memory frames
- while (!handler.getInternalBuffer().isEmpty()) {
- writer.kick();
- }
- // There should be 1 frame on disk
- Assert.assertEquals(1, handler.framesOnDisk());
- writer.unfreeze();
- result.get();
- handler.close();
- Assert.assertEquals(0, handler.framesOnDisk());
- } catch (Throwable th) {
- th.printStackTrace();
- Assert.fail();
}
Assert.assertNull(cause);
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index b82c9ee..3b432a6 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -315,7 +315,7 @@
// locks and secondary index doesn't.
return new SecondaryIndexModificationOperationCallback(metadataIndex.getDatasetId().getId(),
metadataIndex.getPrimaryKeyIndexes(), txnCtx, transactionSubsystem.getLockManager(),
- transactionSubsystem, resourceId, metadataStoragePartition, ResourceType.LSM_BTREE, indexOp);
+ transactionSubsystem, resourceId, metadataStoragePartition, ResourceType.LSM_BTREE, indexOp, false);
}
@Override
@@ -996,10 +996,8 @@
try {
while (rangeCursor.hasNext()) {
rangeCursor.next();
- sb.append(TupleUtils.printTuple(rangeCursor.getTuple(),
- new ISerializerDeserializer[] {
- AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(
- BuiltinType.ASTRING),
+ sb.append(TupleUtils.printTuple(rangeCursor.getTuple(), new ISerializerDeserializer[] {
+ AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING),
AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING),
AqlSerializerDeserializerProvider.INSTANCE
.getSerializerDeserializer(BuiltinType.ASTRING) }));
@@ -1016,7 +1014,7 @@
private <ResultType> void searchIndex(JobId jobId, IMetadataIndex index, ITupleReference searchKey,
IValueExtractor<ResultType> valueExtractor, List<ResultType> results)
- throws MetadataException, IndexException, IOException {
+ throws MetadataException, IndexException, IOException {
IBinaryComparatorFactory[] comparatorFactories = index.getKeyBinaryComparatorFactory();
String resourceName = index.getFile().toString();
IIndex indexInstance = datasetLifecycleManager.getIndex(resourceName);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
index 28f8a79..323ea6d 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
@@ -824,8 +824,8 @@
RTreeSearchOperatorDescriptor rtreeSearchOp;
if (dataset.getDatasetType() == DatasetType.INTERNAL) {
- IBinaryComparatorFactory[] deletedKeyBTreeCompFactories = getMergedComparatorFactories(
- comparatorFactories, primaryComparatorFactories);
+ IBinaryComparatorFactory[] deletedKeyBTreeCompFactories =
+ getMergedComparatorFactories(comparatorFactories, primaryComparatorFactories);
IIndexDataflowHelperFactory idff = new LSMRTreeWithAntiMatterTuplesDataflowHelperFactory(
valueProviderFactories, RTreePolicyType.RTREE, deletedKeyBTreeCompFactories,
new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), compactionInfo.first,
@@ -1135,7 +1135,7 @@
? new TempDatasetPrimaryIndexModificationOperationCallbackFactory(jobId, datasetId,
primaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE)
: new PrimaryIndexModificationOperationCallbackFactory(jobId, datasetId, primaryKeyFields,
- txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE);
+ txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE, dataset.hasMetaPart());
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
DatasetUtils.getMergePolicyFactory(dataset, mdTxnCtx);
@@ -1614,8 +1614,8 @@
? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE)
: new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
- modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
- ResourceType.LSM_BTREE);
+ modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE,
+ dataset.hasMetaPart());
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
DatasetUtils.getMergePolicyFactory(dataset, mdTxnCtx);
@@ -1819,7 +1819,7 @@
ResourceType.LSM_INVERTED_INDEX)
: new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
- ResourceType.LSM_INVERTED_INDEX);
+ ResourceType.LSM_INVERTED_INDEX, dataset.hasMetaPart());
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
DatasetUtils.getMergePolicyFactory(dataset, mdTxnCtx);
@@ -1889,8 +1889,8 @@
Pair<IAType, Boolean> keyPairType =
Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0), secondaryKeyExprs.get(0), recType);
IAType spatialType = keyPairType.first;
- boolean isPointMBR = spatialType.getTypeTag() == ATypeTag.POINT
- || spatialType.getTypeTag() == ATypeTag.POINT3D;
+ boolean isPointMBR =
+ spatialType.getTypeTag() == ATypeTag.POINT || spatialType.getTypeTag() == ATypeTag.POINT3D;
int dimension = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
int numSecondaryKeys = dimension * 2;
int numPrimaryKeys = primaryKeys.size();
@@ -1969,14 +1969,14 @@
? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_RTREE)
: new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
- modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
- ResourceType.LSM_RTREE);
+ modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_RTREE,
+ dataset.hasMetaPart());
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
DatasetUtils.getMergePolicyFactory(dataset, mdTxnCtx);
- IBinaryComparatorFactory[] deletedKeyBTreeCompFactories = getMergedComparatorFactories(comparatorFactories,
- primaryComparatorFactories);
+ IBinaryComparatorFactory[] deletedKeyBTreeCompFactories =
+ getMergedComparatorFactories(comparatorFactories, primaryComparatorFactories);
IIndexDataflowHelperFactory idff = new LSMRTreeWithAntiMatterTuplesDataflowHelperFactory(
valueProviderFactories, RTreePolicyType.RTREE, deletedKeyBTreeCompFactories,
new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), compactionInfo.first,
@@ -2300,7 +2300,7 @@
? new TempDatasetPrimaryIndexModificationOperationCallbackFactory(jobId, datasetId,
primaryKeyFields, txnSubsystemProvider, IndexOperation.UPSERT, ResourceType.LSM_BTREE)
: new UpsertOperationCallbackFactory(jobId, datasetId, primaryKeyFields, txnSubsystemProvider,
- IndexOperation.UPSERT, ResourceType.LSM_BTREE);
+ IndexOperation.UPSERT, ResourceType.LSM_BTREE, dataset.hasMetaPart());
LockThenSearchOperationCallbackFactory searchCallbackFactory = new LockThenSearchOperationCallbackFactory(
jobId, datasetId, primaryKeyFields, txnSubsystemProvider, ResourceType.LSM_BTREE);
@@ -2601,7 +2601,7 @@
ResourceType.LSM_INVERTED_INDEX)
: new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
modificationCallbackPrimaryKeyFields, txnSubsystemProvider, IndexOperation.UPSERT,
- ResourceType.LSM_INVERTED_INDEX);
+ ResourceType.LSM_INVERTED_INDEX, dataset.hasMetaPart());
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
DatasetUtils.getMergePolicyFactory(dataset, mdTxnCtx);
@@ -2666,8 +2666,8 @@
Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0), secondaryKeyExprs.get(0), recType);
IAType spatialType = keyPairType.first;
- boolean isPointMBR = spatialType.getTypeTag() == ATypeTag.POINT
- || spatialType.getTypeTag() == ATypeTag.POINT3D;
+ boolean isPointMBR =
+ spatialType.getTypeTag() == ATypeTag.POINT || spatialType.getTypeTag() == ATypeTag.POINT3D;
int dimension = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
int numSecondaryKeys = dimension * 2;
int numPrimaryKeys = primaryKeys.size();
@@ -2770,12 +2770,12 @@
ResourceType.LSM_RTREE)
: new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
modificationCallbackPrimaryKeyFields, txnSubsystemProvider, IndexOperation.UPSERT,
- ResourceType.LSM_RTREE);
+ ResourceType.LSM_RTREE, dataset.hasMetaPart());
- Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
- .getMergePolicyFactory(dataset, mdTxnCtx);
- IBinaryComparatorFactory[] deletedKeyBTreeCompFactories = getMergedComparatorFactories(comparatorFactories,
- primaryComparatorFactories);
+ Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
+ DatasetUtils.getMergePolicyFactory(dataset, mdTxnCtx);
+ IBinaryComparatorFactory[] deletedKeyBTreeCompFactories =
+ getMergedComparatorFactories(comparatorFactories, primaryComparatorFactories);
IIndexDataflowHelperFactory idff = new LSMRTreeWithAntiMatterTuplesDataflowHelperFactory(
valueProviderFactories, RTreePolicyType.RTREE, deletedKeyBTreeCompFactories,
new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), compactionInfo.first,
@@ -2922,7 +2922,7 @@
ResourceType.LSM_BTREE)
: new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
modificationCallbackPrimaryKeyFields, txnSubsystemProvider, IndexOperation.UPSERT,
- ResourceType.LSM_BTREE);
+ ResourceType.LSM_BTREE, dataset.hasMetaPart());
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
DatasetUtils.getMergePolicyFactory(dataset, mdTxnCtx);
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
index 65c9a49..2a3467e 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
@@ -40,7 +40,6 @@
protected final byte resourceType;
protected final IndexOperation indexOp;
protected final ITransactionSubsystem txnSubsystem;
- protected final SimpleTupleWriter tupleWriter;
protected final ILogRecord logRecord;
protected AbstractIndexModificationOperationCallback(int datasetId, int[] primaryKeyFields,
@@ -51,7 +50,6 @@
this.resourceType = resourceType;
this.indexOp = indexOp;
this.txnSubsystem = txnSubsystem;
- tupleWriter = new SimpleTupleWriter();
logRecord = new LogRecord();
logRecord.setTxnCtx(txnCtx);
logRecord.setLogType(LogType.UPDATE);
@@ -62,17 +60,23 @@
logRecord.setNewOp((byte) (indexOp.ordinal()));
}
- protected void log(int PKHash, ITupleReference newValue) throws ACIDException {
+ protected void log(int PKHash, ITupleReference newValue, ITupleReference oldValue) throws ACIDException {
logRecord.setPKHashValue(PKHash);
logRecord.setPKFields(primaryKeyFields);
logRecord.setPKValue(newValue);
logRecord.computeAndSetPKValueSize();
if (newValue != null) {
- logRecord.setNewValueSize(tupleWriter.bytesRequired(newValue));
+ logRecord.setNewValueSize(SimpleTupleWriter.INSTANCE.bytesRequired(newValue));
logRecord.setNewValue(newValue);
} else {
logRecord.setNewValueSize(0);
}
+ if (oldValue != null) {
+ logRecord.setOldValueSize(SimpleTupleWriter.INSTANCE.bytesRequired(oldValue));
+ logRecord.setOldValue(oldValue);
+ } else {
+ logRecord.setOldValueSize(0);
+ }
logRecord.computeAndSetLogSize();
txnSubsystem.getLogManager().log(logRecord);
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
index 4bde490..5b89bb5 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
@@ -40,13 +40,16 @@
implements IModificationOperationCallback {
private final AsterixLSMInsertDeleteOperatorNodePushable operatorNodePushable;
+ private final boolean logBeforeImage;
public PrimaryIndexModificationOperationCallback(int datasetId, int[] primaryKeyFields, ITransactionContext txnCtx,
ILockManager lockManager, ITransactionSubsystem txnSubsystem, long resourceId, int resourcePartition,
- byte resourceType, IndexOperation indexOp, IOperatorNodePushable operatorNodePushable) {
+ byte resourceType, IndexOperation indexOp, IOperatorNodePushable operatorNodePushable,
+ boolean logBeforeImage) {
super(datasetId, primaryKeyFields, txnCtx, lockManager, txnSubsystem, resourceId, resourcePartition,
resourceType, indexOp);
this.operatorNodePushable = (AsterixLSMInsertDeleteOperatorNodePushable) operatorNodePushable;
+ this.logBeforeImage = logBeforeImage;
}
@Override
@@ -99,7 +102,11 @@
public void found(ITupleReference before, ITupleReference after) throws HyracksDataException {
try {
int pkHash = computePrimaryKeyHashValue(after, primaryKeyFields);
- log(pkHash, after);
+ if (logBeforeImage) {
+ log(pkHash, after, before);
+ } else {
+ log(pkHash, after, null);
+ }
} catch (ACIDException e) {
throw new HyracksDataException(e);
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
index c406812..52e2818 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
@@ -43,11 +43,14 @@
private static final long serialVersionUID = 1L;
private final IndexOperation indexOp;
+ private final boolean logBeforeImage;
public PrimaryIndexModificationOperationCallbackFactory(JobId jobId, int datasetId, int[] primaryKeyFields,
- ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation indexOp, byte resourceType) {
+ ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation indexOp, byte resourceType,
+ boolean logBeforeImage) {
super(jobId, datasetId, primaryKeyFields, txnSubsystemProvider, resourceType);
this.indexOp = indexOp;
+ this.logBeforeImage = logBeforeImage;
}
@Override
@@ -56,8 +59,8 @@
throws HyracksDataException {
ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
- IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
- .getDatasetLifecycleManager();
+ IIndexLifecycleManager indexLifeCycleManager =
+ txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager();
ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourcePath);
if (index == null) {
throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
@@ -67,7 +70,7 @@
ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false);
IModificationOperationCallback modCallback = new PrimaryIndexModificationOperationCallback(datasetId,
primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resourceId,
- resourcePartition, resourceType, indexOp, operatorNodePushable);
+ resourcePartition, resourceType, indexOp, operatorNodePushable, logBeforeImage);
txnCtx.registerIndexAndCallback(resourceId, index, (AbstractOperationCallback) modCallback, true);
return modCallback;
} catch (ACIDException e) {
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
index 8044d90..974e631 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
@@ -37,13 +37,15 @@
implements IModificationOperationCallback {
protected final IndexOperation oldOp;
+ private final boolean logBeforeImage;
public SecondaryIndexModificationOperationCallback(int datasetId, int[] primaryKeyFields,
ITransactionContext txnCtx, ILockManager lockManager, ITransactionSubsystem txnSubsystem, long resourceId,
- int resourcePartition, byte resourceType, IndexOperation indexOp) {
+ int resourcePartition, byte resourceType, IndexOperation indexOp, boolean logBeforeImage) {
super(datasetId, primaryKeyFields, txnCtx, lockManager, txnSubsystem, resourceId, resourcePartition,
resourceType, indexOp);
oldOp = (indexOp == IndexOperation.DELETE) ? IndexOperation.INSERT : IndexOperation.DELETE;
+ this.logBeforeImage = logBeforeImage;
}
@Override
@@ -55,7 +57,7 @@
public void found(ITupleReference before, ITupleReference after) throws HyracksDataException {
try {
int pkHash = computePrimaryKeyHashValue(after, primaryKeyFields);
- this.log(pkHash, after);
+ this.log(pkHash, after, logBeforeImage ? before : null);
} catch (ACIDException e) {
throw new HyracksDataException(e);
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
index 168da99..c6743dd 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
@@ -40,11 +40,14 @@
private static final long serialVersionUID = 1L;
private final IndexOperation indexOp;
+ private final boolean logBeforeImage;
public SecondaryIndexModificationOperationCallbackFactory(JobId jobId, int datasetId, int[] primaryKeyFields,
- ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation indexOp, byte resourceType) {
+ ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation indexOp, byte resourceType,
+ boolean logBeforeImage) {
super(jobId, datasetId, primaryKeyFields, txnSubsystemProvider, resourceType);
this.indexOp = indexOp;
+ this.logBeforeImage = logBeforeImage;
}
@Override
@@ -63,7 +66,7 @@
ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false);
IModificationOperationCallback modCallback = new SecondaryIndexModificationOperationCallback(datasetId,
primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resourceId,
- resourcePartition, resourceType, indexOp);
+ resourcePartition, resourceType, indexOp, logBeforeImage);
txnCtx.registerIndexAndCallback(resourceId, index, (AbstractOperationCallback) modCallback, false);
return modCallback;
} catch (ACIDException e) {
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java
index f98083a..13d2d57 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java
@@ -29,12 +29,14 @@
public class UpsertOperationCallback extends AbstractIndexModificationOperationCallback
implements IModificationOperationCallback {
+ private final boolean logBeforeImage;
public UpsertOperationCallback(int datasetId, int[] primaryKeyFields, ITransactionContext txnCtx,
ILockManager lockManager, ITransactionSubsystem txnSubsystem, long resourceId, int resourcePartition,
- byte resourceType, IndexOperation indexOp) {
+ byte resourceType, IndexOperation indexOp, boolean logBeforeImage) {
super(datasetId, primaryKeyFields, txnCtx, lockManager, txnSubsystem, resourceId, resourcePartition,
resourceType, indexOp);
+ this.logBeforeImage = logBeforeImage;
}
@Override
@@ -46,7 +48,7 @@
public void found(ITupleReference before, ITupleReference after) throws HyracksDataException {
try {
int pkHash = computePrimaryKeyHashValue(after, primaryKeyFields);
- log(pkHash, after);
+ log(pkHash, after, logBeforeImage ? before : null);
} catch (ACIDException e) {
throw new HyracksDataException(e);
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
index 87cb8e7..5bf1505 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
@@ -39,11 +39,14 @@
private static final long serialVersionUID = 1L;
private final IndexOperation indexOp;
+ private final boolean logBeforeImage;
public UpsertOperationCallbackFactory(JobId jobId, int datasetId, int[] primaryKeyFields,
- ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation indexOp, byte resourceType) {
+ ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation indexOp, byte resourceType,
+ boolean logBeforeImage) {
super(jobId, datasetId, primaryKeyFields, txnSubsystemProvider, resourceType);
this.indexOp = indexOp;
+ this.logBeforeImage = logBeforeImage;
}
@Override
@@ -52,8 +55,8 @@
throws HyracksDataException {
ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
- IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
- .getDatasetLifecycleManager();
+ IIndexLifecycleManager indexLifeCycleManager =
+ txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager();
ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceName);
if (index == null) {
throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
@@ -61,9 +64,9 @@
try {
ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false);
- IModificationOperationCallback modCallback = new UpsertOperationCallback(datasetId, primaryKeyFields,
- txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resourceId, resourcePartition, resourceType,
- indexOp);
+ IModificationOperationCallback modCallback =
+ new UpsertOperationCallback(datasetId, primaryKeyFields, txnCtx, txnSubsystem.getLockManager(),
+ txnSubsystem, resourceId, resourcePartition, resourceType, indexOp, logBeforeImage);
txnCtx.registerIndexAndCallback(resourceId, index, (AbstractOperationCallback) modCallback, true);
return modCallback;
} catch (ACIDException e) {
diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml
index f474690..7f5c612 100644
--- a/asterixdb/pom.xml
+++ b/asterixdb/pom.xml
@@ -199,7 +199,7 @@
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-plugin-plugin</artifactId>
- <versionRange>[3.4,)</versionRange>
+ <versionRange>[3.3,)</versionRange>
<goals>
<goal>descriptor</goal>
</goals>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/SimpleTupleWriter.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/SimpleTupleWriter.java
index 11b0010..7aaa983 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/SimpleTupleWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/SimpleTupleWriter.java
@@ -22,11 +22,18 @@
import java.nio.ByteBuffer;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleReference;
import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
+/*
+ * This class should be replaced by a Util class
+ */
public class SimpleTupleWriter implements ITreeIndexTupleWriter {
+ public static final SimpleTupleWriter INSTANCE = new SimpleTupleWriter();
+
+ private SimpleTupleWriter() {
+ }
+
// Write short in little endian to target byte array at given offset.
private static void writeShortL(short s, byte[] buf, int targetOff) {
buf[targetOff] = (byte) (s >> 8);
@@ -52,7 +59,7 @@
}
@Override
- public ITreeIndexTupleReference createTupleReference() {
+ public SimpleTupleReference createTupleReference() {
return new SimpleTupleReference();
}
@@ -103,7 +110,7 @@
}
protected int getNullFlagsBytes(ITupleReference tuple) {
- return (int) Math.ceil((double) tuple.getFieldCount() / 8.0);
+ return (int) Math.ceil(tuple.getFieldCount() / 8.0);
}
protected int getFieldSlotsBytes(ITupleReference tuple) {
@@ -111,7 +118,7 @@
}
protected int getNullFlagsBytes(ITupleReference tuple, int startField, int numFields) {
- return (int) Math.ceil((double) numFields / 8.0);
+ return (int) Math.ceil(numFields / 8.0);
}
protected int getFieldSlotsBytes(ITupleReference tuple, int startField, int numFields) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/SimpleTupleWriterFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/SimpleTupleWriterFactory.java
deleted file mode 100644
index be0688a..0000000
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/SimpleTupleWriterFactory.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hyracks.storage.am.common.tuples;
-
-import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
-import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleWriterFactory;
-
-public class SimpleTupleWriterFactory implements ITreeIndexTupleWriterFactory {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public ITreeIndexTupleWriter createTupleWriter() {
- return new SimpleTupleWriter();
- }
-
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index b58cc29..8ddab88 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -511,8 +511,8 @@
protected void triggerReplication(List<ILSMComponent> lsmComponents, boolean bulkload, LSMOperationType opType)
throws HyracksDataException {
- ILSMIndexAccessorInternal accessor = lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE,
- NoOpOperationCallback.INSTANCE);
+ ILSMIndexAccessorInternal accessor =
+ lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
accessor.scheduleReplication(lsmComponents, bulkload, opType);
}