[ASTERIXDB-1952][TX][IDX] Filter logs pt.2
- user model changes: no
- storage format changes: yes
- interface changes: yes
Details:
- Add a log type specifically for filters
- Only log change when filter actually widens
- Stop logging of index + filter tuple during modification
- Redo index and filter tuples separately via their logs
Change-Id: Ie9e7795d9c8c212e8610dcb9bb5d26ec9fbbee8a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1857
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ian Maxon <imaxon@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 74277ce..4a2cf2d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -77,6 +77,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
import org.apache.hyracks.storage.common.IIndex;
@@ -221,6 +222,7 @@
case LogType.FLUSH:
case LogType.WAIT:
case LogType.MARKER:
+ case LogType.FILTER:
break;
default:
throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
@@ -315,59 +317,64 @@
foundWinner = true;
}
}
- if (foundWinner) {
- resourceId = logRecord.getResourceId();
- localResource = resourcesMap.get(resourceId);
- /*******************************************************************
- * [Notice]
- * -> Issue
- * Delete index may cause a problem during redo.
- * The index operation to be redone couldn't be redone because the corresponding index
- * may not exist in NC due to the possible index drop DDL operation.
- * -> Approach
- * Avoid the problem during redo.
- * More specifically, the problem will be detected when the localResource of
- * the corresponding index is retrieved, which will end up with 'null'.
- * If null is returned, then just go and process the next
- * log record.
- *******************************************************************/
- if (localResource == null) {
- LOGGER.log(Level.WARN, "resource was not found for resource id " + resourceId);
- logRecord = logReader.next();
- continue;
- }
- /*******************************************************************/
+ if (!foundWinner) {
+ break;
+ }
+ }
+ //fall through as FILTER is a subset of UPDATE
+ case LogType.FILTER:
+ if (partitions.contains(logRecord.getResourcePartition())) {
+ resourceId = logRecord.getResourceId();
+ localResource = resourcesMap.get(resourceId);
+ /*******************************************************************
+ * [Notice]
+ * -> Issue
+ * Delete index may cause a problem during redo.
+ * The index operation to be redone couldn't be redone because the corresponding index
+ * may not exist in NC due to the possible index drop DDL operation.
+ * -> Approach
+ * Avoid the problem during redo.
+ * More specifically, the problem will be detected when the localResource of
+ * the corresponding index is retrieved, which will end up with 'null'.
+ * If null is returned, then just go and process the next
+ * log record.
+ *******************************************************************/
+ if (localResource == null) {
+ LOGGER.log(Level.WARN, "resource was not found for resource id " + resourceId);
+ logRecord = logReader.next();
+ continue;
+ }
+ /*******************************************************************/
- //get index instance from IndexLifeCycleManager
- //if index is not registered into IndexLifeCycleManager,
- //create the index using LocalMetadata stored in LocalResourceRepository
- //get partition path in this node
- localResourceMetadata = (DatasetLocalResource) localResource.getResource();
- index = (ILSMIndex) datasetLifecycleManager.get(localResource.getPath());
- if (index == null) {
- //#. create index instance and register to indexLifeCycleManager
- index = (ILSMIndex) localResourceMetadata.createInstance(serviceCtx);
- datasetLifecycleManager.register(localResource.getPath(), index);
- datasetLifecycleManager.open(localResource.getPath());
- try {
- final DatasetResourceReference resourceReference =
- DatasetResourceReference.of(localResource);
- maxDiskLastLsn =
- indexCheckpointManagerProvider.get(resourceReference).getLowWatermark();
- } catch (HyracksDataException e) {
- datasetLifecycleManager.close(localResource.getPath());
- throw e;
- }
- //#. set resourceId and maxDiskLastLSN to the map
- resourceId2MaxLSNMap.put(resourceId, maxDiskLastLsn);
- } else {
- maxDiskLastLsn = resourceId2MaxLSNMap.get(resourceId);
+ //get index instance from IndexLifeCycleManager
+ //if index is not registered into IndexLifeCycleManager,
+ //create the index using LocalMetadata stored in LocalResourceRepository
+ //get partition path in this node
+ localResourceMetadata = (DatasetLocalResource) localResource.getResource();
+ index = (ILSMIndex) datasetLifecycleManager.get(localResource.getPath());
+ if (index == null) {
+ //#. create index instance and register to indexLifeCycleManager
+ index = (ILSMIndex) localResourceMetadata.createInstance(serviceCtx);
+ datasetLifecycleManager.register(localResource.getPath(), index);
+ datasetLifecycleManager.open(localResource.getPath());
+ try {
+ final DatasetResourceReference resourceReference =
+ DatasetResourceReference.of(localResource);
+ maxDiskLastLsn =
+ indexCheckpointManagerProvider.get(resourceReference).getLowWatermark();
+ } catch (HyracksDataException e) {
+ datasetLifecycleManager.close(localResource.getPath());
+ throw e;
}
- // lsn @ maxDiskLastLsn is either a flush log or a master replica log
- if (lsn >= maxDiskLastLsn) {
- redo(logRecord, datasetLifecycleManager);
- redoCount++;
- }
+ //#. set resourceId and maxDiskLastLSN to the map
+ resourceId2MaxLSNMap.put(resourceId, maxDiskLastLsn);
+ } else {
+ maxDiskLastLsn = resourceId2MaxLSNMap.get(resourceId);
+ }
+ // lsn @ maxDiskLastLsn is either a flush log or a master replica log
+ if (lsn >= maxDiskLastLsn) {
+ redo(logRecord, datasetLifecycleManager);
+ redoCount++;
}
}
break;
@@ -659,6 +666,7 @@
throw new ACIDException("Unexpected LogType(" + logRecord.getLogType() + ") during abort.");
case LogType.ABORT:
case LogType.FLUSH:
+ case LogType.FILTER:
case LogType.WAIT:
case LogType.MARKER:
//ignore
@@ -741,6 +749,9 @@
// undo, upsert the old value if found, otherwise, physical delete
undoUpsertOrDelete(indexAccessor, logRecord);
break;
+ case AbstractIndexModificationOperationCallback.FILTER_BYTE:
+ //do nothing, can't undo filters
+ break;
default:
throw new IllegalStateException("Unsupported OperationType: " + logRecord.getNewOp());
}
@@ -775,6 +786,9 @@
long resourceId = logRecord.getResourceId();
ILSMIndex index = (ILSMIndex) datasetLifecycleManager.getIndex(datasetId, resourceId);
ILSMIndexAccessor indexAccessor = index.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+ ILSMIndexOperationContext opCtx = indexAccessor.getOpContext();
+ opCtx.setFilterSkip(true);
+ opCtx.setRecovery(true);
if (logRecord.getNewOp() == AbstractIndexModificationOperationCallback.INSERT_BYTE) {
indexAccessor.forceInsert(logRecord.getNewValue());
} else if (logRecord.getNewOp() == AbstractIndexModificationOperationCallback.DELETE_BYTE) {
@@ -782,6 +796,9 @@
} else if (logRecord.getNewOp() == AbstractIndexModificationOperationCallback.UPSERT_BYTE) {
// redo, upsert the new value
indexAccessor.forceUpsert(logRecord.getNewValue());
+ } else if (logRecord.getNewOp() == AbstractIndexModificationOperationCallback.FILTER_BYTE) {
+ opCtx.setFilterSkip(false);
+ indexAccessor.updateFilter(logRecord.getNewValue());
} else {
throw new IllegalStateException("Unsupported OperationType: " + logRecord.getNewOp());
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
index e58a6fa..04f9751 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
@@ -50,121 +50,125 @@
int FLUSHING_COMPONENT_MAXID_LEN = Long.BYTES;
int ALL_RECORD_HEADER_LEN = LOG_SOURCE_LEN + TYPE_LEN + TxnId.BYTES;
- int ENTITYCOMMIT_UPDATE_HEADER_LEN = RS_PARTITION_LEN + DatasetId.BYTES + PKHASH_LEN + PKSZ_LEN;
+ int ENTITY_RESOURCE_HEADER_LEN = RS_PARTITION_LEN + DatasetId.BYTES;
+ int ENTITY_VALUE_HEADER_LEN = PKHASH_LEN + PKSZ_LEN;
int UPDATE_LSN_HEADER = RSID_LEN + LOGRCD_SZ_LEN;
int UPDATE_BODY_HEADER = FLDCNT_LEN + NEWOP_LEN + NEWVALSZ_LEN;
int JOB_TERMINATE_LOG_SIZE = ALL_RECORD_HEADER_LEN + CHKSUM_LEN;
- int ENTITY_COMMIT_LOG_BASE_SIZE = ALL_RECORD_HEADER_LEN + ENTITYCOMMIT_UPDATE_HEADER_LEN + CHKSUM_LEN;
+ int ENTITY_COMMIT_LOG_BASE_SIZE =
+ ALL_RECORD_HEADER_LEN + ENTITY_RESOURCE_HEADER_LEN + ENTITY_VALUE_HEADER_LEN + CHKSUM_LEN;
int UPDATE_LOG_BASE_SIZE = ENTITY_COMMIT_LOG_BASE_SIZE + UPDATE_LSN_HEADER + UPDATE_BODY_HEADER;
+ int FILTER_LOG_BASE_SIZE =
+ ALL_RECORD_HEADER_LEN + ENTITY_RESOURCE_HEADER_LEN + UPDATE_BODY_HEADER + UPDATE_LSN_HEADER + CHKSUM_LEN;
int FLUSH_LOG_SIZE = ALL_RECORD_HEADER_LEN + DS_LEN + RS_PARTITION_LEN + FLUSHING_COMPONENT_MINID_LEN
+ FLUSHING_COMPONENT_MAXID_LEN + CHKSUM_LEN;
int WAIT_LOG_SIZE = ALL_RECORD_HEADER_LEN + CHKSUM_LEN;
int MARKER_BASE_LOG_SIZE =
ALL_RECORD_HEADER_LEN + CHKSUM_LEN + DS_LEN + RS_PARTITION_LEN + PRVLSN_LEN + LOGRCD_SZ_LEN;
- public RecordReadStatus readLogRecord(ByteBuffer buffer);
+ RecordReadStatus readLogRecord(ByteBuffer buffer);
- public void writeLogRecord(ByteBuffer buffer);
+ void writeLogRecord(ByteBuffer buffer);
- public ITransactionContext getTxnCtx();
+ ITransactionContext getTxnCtx();
- public void setTxnCtx(ITransactionContext txnCtx);
+ void setTxnCtx(ITransactionContext txnCtx);
- public boolean isFlushed();
+ boolean isFlushed();
- public void isFlushed(boolean isFlushed);
+ void isFlushed(boolean isFlushed);
- public byte getLogType();
+ byte getLogType();
- public void setLogType(byte logType);
+ void setLogType(byte logType);
long getTxnId();
void setTxnId(long jobId);
- public int getDatasetId();
+ int getDatasetId();
- public void setDatasetId(int datasetId);
+ void setDatasetId(int datasetId);
- public int getPKHashValue();
+ int getPKHashValue();
- public void setPKHashValue(int PKHashValue);
+ void setPKHashValue(int PKHashValue);
- public long getResourceId();
+ long getResourceId();
- public void setResourceId(long resourceId);
+ void setResourceId(long resourceId);
- public int getLogSize();
+ int getLogSize();
- public void setLogSize(int logSize);
+ void setLogSize(int logSize);
- public byte getNewOp();
+ byte getNewOp();
- public void setNewOp(byte newOp);
+ void setNewOp(byte newOp);
- public void setNewValueSize(int newValueSize);
+ void setNewValueSize(int newValueSize);
- public ITupleReference getNewValue();
+ ITupleReference getNewValue();
- public void setNewValue(ITupleReference newValue);
+ void setNewValue(ITupleReference newValue);
- public long getChecksum();
+ long getChecksum();
- public void setChecksum(long checksum);
+ void setChecksum(long checksum);
- public long getLSN();
+ long getLSN();
- public void setLSN(long LSN);
+ void setLSN(long LSN);
- public String getLogRecordForDisplay();
+ String getLogRecordForDisplay();
- public void computeAndSetLogSize();
+ void computeAndSetLogSize();
- public int getPKValueSize();
+ int getPKValueSize();
- public ITupleReference getPKValue();
+ ITupleReference getPKValue();
- public void setPKFields(int[] primaryKeyFields);
+ void setPKFields(int[] primaryKeyFields);
- public void computeAndSetPKValueSize();
+ void computeAndSetPKValueSize();
- public void setPKValue(ITupleReference PKValue);
+ void setPKValue(ITupleReference PKValue);
- public void readRemoteLog(ByteBuffer buffer);
+ void readRemoteLog(ByteBuffer buffer);
- public void setLogSource(byte logSource);
+ void setLogSource(byte logSource);
- public byte getLogSource();
+ byte getLogSource();
- public int getRemoteLogSize();
+ int getRemoteLogSize();
- public int getResourcePartition();
+ int getResourcePartition();
- public void setResourcePartition(int resourcePartition);
+ void setResourcePartition(int resourcePartition);
- public void setReplicated(boolean replicated);
+ void setReplicated(boolean replicated);
/**
* @return a flag indicating whether the log was replicated
*/
- public boolean isReplicated();
+ boolean isReplicated();
- public void writeRemoteLogRecord(ByteBuffer buffer);
+ void writeRemoteLogRecord(ByteBuffer buffer);
- public ITupleReference getOldValue();
+ ITupleReference getOldValue();
- public void setOldValue(ITupleReference tupleBefore);
+ void setOldValue(ITupleReference tupleBefore);
- public void setOldValueSize(int beforeSize);
+ void setOldValueSize(int beforeSize);
- public boolean isMarker();
+ boolean isMarker();
- public ByteBuffer getMarker();
+ ByteBuffer getMarker();
- public void logAppended(long lsn);
+ void logAppended(long lsn);
- public long getPreviousMarkerLSN();
+ long getPreviousMarkerLSN();
/**
* Sets flag indicating if this log should be replicated or not
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
index d85fd70..5fdb4e2 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
@@ -24,6 +24,7 @@
import java.util.zip.CRC32;
import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
+import org.apache.commons.lang3.ArrayUtils;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.common.tuples.SimpleTupleReference;
import org.apache.hyracks.storage.am.common.tuples.SimpleTupleWriter;
@@ -36,18 +37,20 @@
* LogType(1)
* TxnId(8)
* ---------------------------
- * [Header2] (16 bytes + PKValueSize) : for entity_commit, upsert_entity_commit, and update log types
+ * [Header2] (8 bytes) : for entity_commit, upsert_entity_commit, filter and update log types
* DatasetId(4) //stored in dataset_dataset in Metadata Node
* ResourcePartition(4)
+ * ---------------------------
+ * [Header3] (8 bytes + PKValueSize) : for entity_commit, upsert_entity_commit, and update log types
* PKHashValue(4)
* PKValueSize(4)
* PKValue(PKValueSize)
* ---------------------------
- * [Header3] (12 bytes) : only for update log type
+ * [Header4] (12 bytes) : only for update, filter log type
* ResourceId(8) //stored in .metadata of the corresponding index in NC node
* LogRecordSize(4)
* ---------------------------
- * [Body] (9 bytes + NewValueSize) : only for update log type
+ * [Body] (9 bytes + NewValueSize) : only for update, filter log type
* FieldCnt(4)
* NewOp(1)
* NewValueSize(4)
@@ -57,7 +60,6 @@
* Checksum(8)
* ---------------------------
*/
-
public class LogRecord implements ILogRecord {
// ------------- fields in a log record (begin) ------------//
@@ -125,10 +127,12 @@
buffer.putLong(txnId);
switch (logType) {
case LogType.ENTITY_COMMIT:
- writeEntityInfo(buffer);
+ writeEntityResource(buffer);
+ writeEntityValue(buffer);
break;
case LogType.UPDATE:
- writeEntityInfo(buffer);
+ writeEntityResource(buffer);
+ writeEntityValue(buffer);
buffer.putLong(resourceId);
buffer.putInt(logSize);
buffer.putInt(newValueFieldCount);
@@ -141,6 +145,15 @@
writeTuple(buffer, oldValue, oldValueSize);
}
break;
+ case LogType.FILTER:
+ writeEntityResource(buffer);
+ buffer.putLong(resourceId);
+ buffer.putInt(logSize);
+ buffer.putInt(newValueFieldCount);
+ buffer.put(newOp);
+ buffer.putInt(newValueSize);
+ writeTuple(buffer, newValue, newValueSize);
+ break;
case LogType.FLUSH:
buffer.putInt(datasetId);
buffer.putInt(resourcePartition);
@@ -159,9 +172,7 @@
}
}
- private void writeEntityInfo(ByteBuffer buffer) {
- buffer.putInt(resourcePartition);
- buffer.putInt(datasetId);
+ private void writeEntityValue(ByteBuffer buffer) {
buffer.putInt(PKHashValue);
if (PKValueSize <= 0) {
throw new IllegalStateException("Primary Key Size is less than or equal to 0");
@@ -170,6 +181,11 @@
writePKValue(buffer);
}
+ private void writeEntityResource(ByteBuffer buffer) {
+ buffer.putInt(resourcePartition);
+ buffer.putInt(datasetId);
+ }
+
@Override
public void writeLogRecord(ByteBuffer buffer) {
int beginOffset = buffer.position();
@@ -264,51 +280,18 @@
computeAndSetLogSize();
break;
case LogType.ENTITY_COMMIT:
- if (readEntityInfo(buffer)) {
+ if (readEntityResource(buffer) && readEntityValue(buffer)) {
computeAndSetLogSize();
} else {
return RecordReadStatus.TRUNCATED;
}
break;
case LogType.UPDATE:
- if (readEntityInfo(buffer)) {
- if (buffer.remaining() < UPDATE_LSN_HEADER + UPDATE_BODY_HEADER) {
- return RecordReadStatus.TRUNCATED;
- }
- resourceId = buffer.getLong();
- logSize = buffer.getInt();
- newValueFieldCount = buffer.getInt();
- newOp = buffer.get();
- newValueSize = buffer.getInt();
- if (buffer.remaining() < newValueSize) {
- if (logSize > buffer.capacity()) {
- return RecordReadStatus.LARGE_RECORD;
- }
- return RecordReadStatus.TRUNCATED;
- }
- newValue = readTuple(buffer, readNewValue, newValueFieldCount, newValueSize);
- if (logSize > getUpdateLogSizeWithoutOldValue()) {
- // Prev Image exists
- if (buffer.remaining() < Integer.BYTES) {
- return RecordReadStatus.TRUNCATED;
- }
- oldValueSize = buffer.getInt();
- if (buffer.remaining() < Integer.BYTES) {
- return RecordReadStatus.TRUNCATED;
- }
- oldValueFieldCount = buffer.getInt();
- if (buffer.remaining() < oldValueSize) {
- return RecordReadStatus.TRUNCATED;
- }
- oldValue = readTuple(buffer, readOldValue, oldValueFieldCount, oldValueSize);
- } else {
- oldValueSize = 0;
- oldValue = null;
- }
+ if (readEntityResource(buffer) && readEntityValue(buffer)) {
+ return readUpdateInfo(buffer);
} else {
return RecordReadStatus.TRUNCATED;
}
- break;
case LogType.MARKER:
if (buffer.remaining() < DS_LEN + RS_PARTITION_LEN + PRVLSN_LEN + LOGRCD_SZ_LEN) {
return RecordReadStatus.TRUNCATED;
@@ -331,19 +314,23 @@
marker.position(lenRemaining);
marker.flip();
break;
+ case LogType.FILTER:
+ if (readEntityResource(buffer)) {
+ return readUpdateInfo(buffer);
+ } else {
+ return RecordReadStatus.TRUNCATED;
+ }
default:
break;
}
return RecordReadStatus.OK;
}
- private boolean readEntityInfo(ByteBuffer buffer) {
+ private boolean readEntityValue(ByteBuffer buffer) {
//attempt to read in the resourcePartition, dsid, PK hash and PK length
- if (buffer.remaining() < ENTITYCOMMIT_UPDATE_HEADER_LEN) {
+ if (buffer.remaining() < ENTITY_VALUE_HEADER_LEN) {
return false;
}
- resourcePartition = buffer.getInt();
- datasetId = buffer.getInt();
PKHashValue = buffer.getInt();
PKValueSize = buffer.getInt();
// attempt to read in the PK
@@ -357,6 +344,53 @@
return true;
}
+ private boolean readEntityResource(ByteBuffer buffer) {
+ //attempt to read in the resourcePartition and dsid
+ if (buffer.remaining() < ENTITY_RESOURCE_HEADER_LEN) {
+ return false;
+ }
+ resourcePartition = buffer.getInt();
+ datasetId = buffer.getInt();
+ return true;
+ }
+
+ private RecordReadStatus readUpdateInfo(ByteBuffer buffer) {
+ if (buffer.remaining() < UPDATE_LSN_HEADER + UPDATE_BODY_HEADER) {
+ return RecordReadStatus.TRUNCATED;
+ }
+ resourceId = buffer.getLong();
+ logSize = buffer.getInt();
+ newValueFieldCount = buffer.getInt();
+ newOp = buffer.get();
+ newValueSize = buffer.getInt();
+ if (buffer.remaining() < newValueSize) {
+ if (logSize > buffer.capacity()) {
+ return RecordReadStatus.LARGE_RECORD;
+ }
+ return RecordReadStatus.TRUNCATED;
+ }
+ newValue = readTuple(buffer, readNewValue, newValueFieldCount, newValueSize);
+ if (logSize > getUpdateLogSizeWithoutOldValue()) {
+ // Prev Image exists
+ if (buffer.remaining() < Integer.BYTES) {
+ return RecordReadStatus.TRUNCATED;
+ }
+ oldValueSize = buffer.getInt();
+ if (buffer.remaining() < Integer.BYTES) {
+ return RecordReadStatus.TRUNCATED;
+ }
+ oldValueFieldCount = buffer.getInt();
+ if (buffer.remaining() < oldValueSize) {
+ return RecordReadStatus.TRUNCATED;
+ }
+ oldValue = readTuple(buffer, readOldValue, oldValueFieldCount, oldValueSize);
+ } else {
+ oldValueSize = 0;
+ oldValue = null;
+ }
+ return RecordReadStatus.OK;
+ }
+
@Override
public void readRemoteLog(ByteBuffer buffer) {
//read common fields
@@ -403,6 +437,10 @@
}
}
+ private int getFilterLogSize() {
+ return FILTER_LOG_BASE_SIZE + newValueSize;
+ }
+
private int getUpdateLogSizeWithoutOldValue() {
return UPDATE_LOG_BASE_SIZE + PKValueSize + newValueSize;
}
@@ -426,6 +464,9 @@
case LogType.WAIT:
logSize = WAIT_LOG_SIZE;
break;
+ case LogType.FILTER:
+ logSize = getFilterLogSize();
+ break;
case LogType.MARKER:
setMarkerLogSize();
break;
@@ -499,8 +540,8 @@
}
@Override
- public void setTxnId(long jobId) {
- this.txnId = jobId;
+ public void setTxnId(long txnId) {
+ this.txnId = txnId;
}
@Override
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java
index 11c45ad..f02b0de 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java
@@ -26,7 +26,8 @@
public static final byte ABORT = 3;
public static final byte FLUSH = 4;
public static final byte WAIT = 6;
- public static final byte MARKER = 7;
+ public static final byte FILTER = 7;
+ public static final byte MARKER = 8;
private static final String STRING_UPDATE = "UPDATE";
private static final String STRING_JOB_COMMIT = "JOB_COMMIT";
@@ -34,6 +35,7 @@
private static final String STRING_ABORT = "ABORT";
private static final String STRING_FLUSH = "FLUSH";
private static final String STRING_WAIT = "WAIT";
+ private static final String STRING_FILTER = "FILTER";
private static final String STRING_MARKER = "MARKER";
private static final String STRING_UNKNOWN_LOG_TYPE = "UNKNOWN_LOG_TYPE";
@@ -51,6 +53,8 @@
return STRING_FLUSH;
case LogType.WAIT:
return STRING_WAIT;
+ case LogType.FILTER:
+ return STRING_FILTER;
case LogType.MARKER:
return STRING_MARKER;
default:
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/TestLSMIndexOperationContext.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/TestLSMIndexOperationContext.java
index 19536f6..9b749fa 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/TestLSMIndexOperationContext.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/TestLSMIndexOperationContext.java
@@ -22,6 +22,7 @@
import java.util.List;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.common.api.IExtendedModificationOperationCallback;
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
@@ -45,6 +46,8 @@
private IndexOperation op;
private LSMIOOperationType ioOperationType;
private ILSMDiskComponent newComponent;
+ private boolean filterSkip = false;
+ private boolean isRecovery = false;
public TestLSMIndexOperationContext(ILSMIndex index) {
this.index = index;
@@ -89,7 +92,7 @@
}
@Override
- public IModificationOperationCallback getModificationCallback() {
+ public IExtendedModificationOperationCallback getModificationCallback() {
return NoOpOperationCallback.INSTANCE;
}
@@ -156,6 +159,27 @@
}
@Override
+ public boolean isFilterSkipped() {
+ return filterSkip;
+ }
+
+ @Override
+ public void setFilterSkip(boolean skip) {
+ this.filterSkip = skip;
+ }
+
+ @Override
+ public boolean isRecovery() {
+ return isRecovery;
+ }
+
+ @Override
+ public void setRecovery(boolean recovery) {
+ this.isRecovery = recovery;
+
+ }
+
+ @Override
public LSMIOOperationType getIoOperationType() {
return ioOperationType;
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsProcessor.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsProcessor.java
index 6189e37..b094d9e 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsProcessor.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsProcessor.java
@@ -53,6 +53,7 @@
switch (reusableLog.getLogType()) {
case LogType.UPDATE:
case LogType.ENTITY_COMMIT:
+ case LogType.FILTER:
logManager.log(reusableLog);
break;
case LogType.JOB_COMMIT:
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
index 3da9e83..8746fba 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
@@ -29,20 +29,24 @@
import org.apache.asterix.common.transactions.LogType;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.common.api.IExtendedModificationOperationCallback;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.common.tuples.SimpleTupleWriter;
import org.apache.hyracks.storage.common.IModificationOperationCallback;
public abstract class AbstractIndexModificationOperationCallback extends AbstractOperationCallback
- implements IModificationOperationCallback {
+ implements IExtendedModificationOperationCallback {
public static final byte INSERT_BYTE = 0x01;
public static final byte DELETE_BYTE = 0x02;
public static final byte UPSERT_BYTE = 0x03;
+ public static final byte FILTER_BYTE = 0x04;
public enum Operation {
INSERT(INSERT_BYTE),
DELETE(DELETE_BYTE),
- UPSERT(UPSERT_BYTE);
+ UPSERT(UPSERT_BYTE),
+ FILTER_MOD(FILTER_BYTE);
+
private byte value;
Operation(byte value) {
@@ -59,6 +63,8 @@
return DELETE;
case INSERT:
return INSERT;
+ case FILTER_MOD:
+ return FILTER_MOD;
case UPSERT:
return UPSERT;
default:
@@ -71,7 +77,8 @@
protected final byte resourceType;
protected final Operation indexOp;
protected final ITransactionSubsystem txnSubsystem;
- protected final ILogRecord logRecord;
+ protected final ILogRecord indexRecord;
+ protected final ILogRecord filterRecord;
protected AbstractIndexModificationOperationCallback(DatasetId datasetId, int[] primaryKeyFields,
ITransactionContext txnCtx, ILockManager lockManager, ITransactionSubsystem txnSubsystem, long resourceId,
@@ -80,35 +87,52 @@
this.resourceType = resourceType;
this.indexOp = indexOp;
this.txnSubsystem = txnSubsystem;
- logRecord = new LogRecord();
- logRecord.setTxnCtx(txnCtx);
- logRecord.setLogType(LogType.UPDATE);
- logRecord.setTxnId(txnCtx.getTxnId().getId());
- logRecord.setDatasetId(datasetId.getId());
- logRecord.setResourceId(resourceId);
- logRecord.setResourcePartition(resourcePartition);
- logRecord.setNewOp(indexOp.value());
+ indexRecord = new LogRecord();
+ indexRecord.setTxnCtx(txnCtx);
+ indexRecord.setLogType(LogType.UPDATE);
+ indexRecord.setTxnId(txnCtx.getTxnId().getId());
+ indexRecord.setDatasetId(datasetId.getId());
+ indexRecord.setResourceId(resourceId);
+ indexRecord.setResourcePartition(resourcePartition);
+ indexRecord.setNewOp(indexOp.value());
+ filterRecord = new LogRecord();
+ filterRecord.setTxnCtx(txnCtx);
+ filterRecord.setLogType(LogType.FILTER);
+ filterRecord.setDatasetId(datasetId.getId());
+ filterRecord.setTxnId(txnCtx.getTxnId().getId());
+ filterRecord.setResourceId(resourceId);
+ filterRecord.setResourcePartition(resourcePartition);
+ filterRecord.setNewOp(Operation.FILTER_MOD.value());
}
protected void log(int PKHash, ITupleReference newValue, ITupleReference oldValue) throws ACIDException {
- logRecord.setPKHashValue(PKHash);
- logRecord.setPKFields(primaryKeyFields);
- logRecord.setPKValue(newValue);
- logRecord.computeAndSetPKValueSize();
+ indexRecord.setPKHashValue(PKHash);
+ indexRecord.setPKFields(primaryKeyFields);
+ indexRecord.setPKValue(newValue);
+ indexRecord.computeAndSetPKValueSize();
if (newValue != null) {
- logRecord.setNewValueSize(SimpleTupleWriter.INSTANCE.bytesRequired(newValue));
- logRecord.setNewValue(newValue);
+ indexRecord.setNewValueSize(SimpleTupleWriter.INSTANCE.bytesRequired(newValue));
+ indexRecord.setNewValue(newValue);
} else {
- logRecord.setNewValueSize(0);
+ indexRecord.setNewValueSize(0);
}
if (oldValue != null) {
- logRecord.setOldValueSize(SimpleTupleWriter.INSTANCE.bytesRequired(oldValue));
- logRecord.setOldValue(oldValue);
+ indexRecord.setOldValueSize(SimpleTupleWriter.INSTANCE.bytesRequired(oldValue));
+ indexRecord.setOldValue(oldValue);
} else {
- logRecord.setOldValueSize(0);
+ indexRecord.setOldValueSize(0);
}
- logRecord.computeAndSetLogSize();
- txnSubsystem.getLogManager().log(logRecord);
+ indexRecord.computeAndSetLogSize();
+ txnSubsystem.getLogManager().log(indexRecord);
+ }
+
+ public void after(ITupleReference newValue) throws HyracksDataException {
+ if (newValue != null) {
+ filterRecord.setNewValueSize(SimpleTupleWriter.INSTANCE.bytesRequired(newValue));
+ filterRecord.setNewValue(newValue);
+ filterRecord.computeAndSetLogSize();
+ txnSubsystem.getLogManager().log(filterRecord);
+ }
}
/**
@@ -116,9 +140,8 @@
* a single operator to perform different operations per tuple
*
* @param op
- * @throws HyracksDataException
*/
- public void setOp(Operation op) throws HyracksDataException {
- logRecord.setNewOp(op.value());
+ public void setOp(Operation op) {
+ indexRecord.setNewOp(op.value());
}
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
index 2c8079d..3e41264 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
@@ -104,10 +104,10 @@
}
private void logWait() throws ACIDException {
- logRecord.setLogType(LogType.WAIT);
- logRecord.computeAndSetLogSize();
- txnSubsystem.getLogManager().log(logRecord);
+ indexRecord.setLogType(LogType.WAIT);
+ indexRecord.computeAndSetLogSize();
+ txnSubsystem.getLogManager().log(indexRecord);
// set the log type back to UPDATE for normal updates
- logRecord.setLogType(LogType.UPDATE);
+ indexRecord.setLogType(LogType.UPDATE);
}
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
index a1aec1a..1e13883 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
@@ -48,6 +48,7 @@
case LogType.ENTITY_COMMIT:
case LogType.UPDATE:
case LogType.FLUSH:
+ case LogType.FILTER:
shouldReplicate = replicationStrategy.isMatch(logRecord.getDatasetId());
if (shouldReplicate && !replicatedTxn.contains(logRecord.getTxnId())) {
replicatedTxn.add(logRecord.getTxnId());
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
index 077a006..fb8770e 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
@@ -300,7 +300,7 @@
}
// fall-through
case SUFFICIENT_CONTIGUOUS_SPACE: {
- foundModCallback(ctx, null, tuple);
+ ctx.getModificationCallback().found(null, tuple);
ctx.getLeafFrame().insert(tuple, targetTupleIndex);
ctx.getSplitKey().reset();
break;
@@ -308,7 +308,7 @@
case SUFFICIENT_SPACE: {
int finalIndex = ctx.getLeafFrame().compact() ? ctx.getLeafFrame().findInsertTupleIndex(tuple)
: targetTupleIndex;
- foundModCallback(ctx, null, tuple);
+ ctx.getModificationCallback().found(null, tuple);
ctx.getLeafFrame().insert(tuple, finalIndex);
ctx.getSplitKey().reset();
break;
@@ -317,7 +317,7 @@
// Try compressing the page first and see if there is space available.
if (ctx.getLeafFrame().compress()
&& ctx.getLeafFrame().hasSpaceInsert(tuple) == FrameOpSpaceStatus.SUFFICIENT_CONTIGUOUS_SPACE) {
- foundModCallback(ctx, null, tuple);
+ ctx.getModificationCallback().found(null, tuple);
ctx.getLeafFrame().insert(tuple, ctx.getLeafFrame().findInsertTupleIndex(tuple));
ctx.getSplitKey().reset();
} else {
@@ -360,10 +360,10 @@
// Perform an update (delete + insert) if the updateTupleIndex != -1
if (updateTupleIndex != -1) {
ITupleReference beforeTuple = ctx.getLeafFrame().getMatchingKeyTuple(tuple, updateTupleIndex);
- foundModCallback(ctx, beforeTuple, tuple);
+ ctx.getModificationCallback().found(beforeTuple, tuple);
ctx.getLeafFrame().delete(tuple, updateTupleIndex);
} else {
- foundModCallback(ctx, null, tuple);
+ ctx.getModificationCallback().found(null, tuple);
}
ctx.getLeafFrame().split(rightFrame, tuple, ctx.getSplitKey(), ctx, bufferCache);
@@ -398,7 +398,7 @@
boolean restartOp = false;
switch (spaceStatus) {
case SUFFICIENT_INPLACE_SPACE: {
- foundModCallback(ctx, beforeTuple, tuple);
+ ctx.getModificationCallback().found(beforeTuple, tuple);
ctx.getLeafFrame().update(tuple, oldTupleIndex, true);
ctx.getSplitKey().reset();
break;
@@ -407,7 +407,7 @@
// TODO: avoid repeated calculation of tuple size
// TODO: in-place update on expand
// Delete the old tuple, compact the frame, and insert the new tuple.
- foundModCallback(ctx, beforeTuple, tuple);
+ ctx.getModificationCallback().found(beforeTuple, tuple);
ctx.getLeafFrame().delete(tuple, oldTupleIndex);
ctx.getLeafFrame().compact();
ctx.getLeafFrame().ensureCapacity(bufferCache, tuple, ctx);
@@ -417,14 +417,14 @@
break;
}
case SUFFICIENT_CONTIGUOUS_SPACE: {
- foundModCallback(ctx, beforeTuple, tuple);
+ ctx.getModificationCallback().found(beforeTuple, tuple);
ctx.getLeafFrame().update(tuple, oldTupleIndex, false);
ctx.getSplitKey().reset();
break;
}
case SUFFICIENT_SPACE: {
// Delete the old tuple, compact the frame, and insert the new tuple.
- foundModCallback(ctx, beforeTuple, tuple);
+ ctx.getModificationCallback().found(beforeTuple, tuple);
ctx.getLeafFrame().delete(tuple, oldTupleIndex);
ctx.getLeafFrame().compact();
int targetTupleIndex = ctx.getLeafFrame().findInsertTupleIndex(tuple);
@@ -759,12 +759,6 @@
modificationCallback, searchCallback);
}
- private BTreeOpContext createOpContext(IIndexAccessor accessor, IModificationOperationCallback modificationCallback,
- ISearchOperationCallback searchCallback, int[] logTupleFields) {
- return new BTreeOpContext(accessor, leafFrameFactory, interiorFrameFactory, freePageManager, cmpFactories,
- modificationCallback, searchCallback, logTupleFields);
- }
-
@SuppressWarnings("rawtypes")
public String printTree(IBTreeLeafFrame leafFrame, IBTreeInteriorFrame interiorFrame,
ISerializerDeserializer[] keySerdes) throws Exception {
@@ -824,11 +818,6 @@
return new BTreeAccessor(this, iap.getModificationCallback(), iap.getSearchOperationCallback());
}
- public BTreeAccessor createAccessor(IModificationOperationCallback modificationCallback,
- ISearchOperationCallback searchCallback, int[] logTupleFields) {
- return new BTreeAccessor(this, modificationCallback, searchCallback, logTupleFields);
- }
-
// TODO: Class should be private. But currently we need to expose the
// setOpContext() API to the LSM Tree for it to work correctly.
@@ -849,12 +838,6 @@
this.ctx = btree.createOpContext(this, modificationCalback, searchCallback);
}
- public BTreeAccessor(BTree btree, IModificationOperationCallback modificationCalback,
- ISearchOperationCallback searchCallback, int[] logTupleFields) {
- this.btree = btree;
- this.ctx = btree.createOpContext(this, modificationCalback, searchCallback, logTupleFields);
- }
-
public void reset(BTree btree, IModificationOperationCallback modificationCallback,
ISearchOperationCallback searchCallback) {
this.btree = btree;
@@ -1251,14 +1234,4 @@
public int getNumOfFilterFields() {
return 0;
}
-
- private void foundModCallback(BTreeOpContext ctx, ITupleReference before, ITupleReference after)
- throws HyracksDataException {
- if (ctx.getTupleWithNonIndexFields() == null) {
- ctx.getModificationCallback().found(before, after);
- } else {
- ctx.getModificationCallback().found(before, ctx.getTupleWithNonIndexFields());
- }
- }
-
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTreeOpContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTreeOpContext.java
index 96370fe..c082ad7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTreeOpContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTreeOpContext.java
@@ -39,7 +39,6 @@
import org.apache.hyracks.storage.am.common.api.ITreeIndexMetadataFrame;
import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleReference;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
-import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
import org.apache.hyracks.storage.common.IIndexAccessor;
import org.apache.hyracks.storage.common.IModificationOperationCallback;
import org.apache.hyracks.storage.common.ISearchOperationCallback;
@@ -57,7 +56,6 @@
private final IBTreeInteriorFrame interiorFrame;
private final IPageManager freePageManager;
private final ITreeIndexMetadataFrame metaFrame;
- private PermutingTupleReference tupleWithNonIndexFields; // Optional, for filtered LSM Index transaction support
private ITreeIndexFrameFactory leafFrameFactory;
private IBTreeLeafFrame leafFrame;
private IndexOperation op;
@@ -115,15 +113,6 @@
this.interiorFrameTuple = getInteriorFrame().createTupleReference();
}
- public BTreeOpContext(IIndexAccessor accessor, ITreeIndexFrameFactory leafFrameFactory,
- ITreeIndexFrameFactory interiorFrameFactory, IPageManager freePageManager,
- IBinaryComparatorFactory[] cmpFactories, IModificationOperationCallback modificationCallback,
- ISearchOperationCallback searchCallback, int[] nonIndexFields) {
- this(accessor, leafFrameFactory, interiorFrameFactory, freePageManager, cmpFactories, modificationCallback,
- searchCallback);
- this.tupleWithNonIndexFields = new PermutingTupleReference(nonIndexFields);
- }
-
@Override
public void reset() {
if (pageLsns != null) {
@@ -378,14 +367,6 @@
this.leafFrameFactory = leafFrameFactory;
}
- public ITupleReference getTupleWithNonIndexFields() {
- return tupleWithNonIndexFields;
- }
-
- public void resetNonIndexFieldsTuple(ITupleReference newValue) {
- tupleWithNonIndexFields.reset(newValue);
- }
-
@Override
public void destroy() throws HyracksDataException {
if (destroyed) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/DiskBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/DiskBTree.java
index eee43b5..523ed9b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/DiskBTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/DiskBTree.java
@@ -198,12 +198,6 @@
return new DiskBTreeAccessor(this, iap.getModificationCallback(), iap.getSearchOperationCallback());
}
- @Override
- public DiskBTreeAccessor createAccessor(IModificationOperationCallback modificationCallback,
- ISearchOperationCallback searchCallback, int[] logTupleFields) {
- return new DiskBTreeAccessor(this, modificationCallback, searchCallback, logTupleFields);
- }
-
public class DiskBTreeAccessor extends BTreeAccessor {
public DiskBTreeAccessor(DiskBTree btree, IModificationOperationCallback modificationCalback,
@@ -211,11 +205,6 @@
super(btree, modificationCalback, searchCallback);
}
- public DiskBTreeAccessor(DiskBTree btree, IModificationOperationCallback modificationCalback,
- ISearchOperationCallback searchCallback, int[] logTupleFields) {
- super(btree, modificationCalback, searchCallback, logTupleFields);
- }
-
@Override
public void insert(ITupleReference tuple) throws HyracksDataException {
throw new UnsupportedOperationException("Insert is not supported by DiskBTree. ");
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IExtendedModificationOperationCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IExtendedModificationOperationCallback.java
new file mode 100644
index 0000000..7bb4e82
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IExtendedModificationOperationCallback.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.common.api;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.common.IModificationOperationCallback;
+
+public interface IExtendedModificationOperationCallback extends IModificationOperationCallback {
+ /**
+ * Called after the action taken in found, to take action on a tuple that is not part of the index
+ * itself but is part of an ancillary structure that is updated alongside the index. An example would
+ * be a simple statistic on the index that records the minimum and maximum values.
+ *
+ * @param after
+ * The tuple to feed to the ancilliary structure
+ * @throws HyracksDataException
+ */
+
+ void after(ITupleReference after) throws HyracksDataException;
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/ExtendedIndexAccessParameters.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/ExtendedIndexAccessParameters.java
new file mode 100644
index 0000000..dbefe74
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/ExtendedIndexAccessParameters.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.common.impls;
+
+import org.apache.hyracks.storage.am.common.api.IExtendedModificationOperationCallback;
+import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.common.IIndexAccessParameters;
+import org.apache.hyracks.storage.common.ISearchOperationCallback;
+
+public class ExtendedIndexAccessParameters extends IndexAccessParameters implements IIndexAccessParameters {
+
+ protected final IExtendedModificationOperationCallback extendedModificationCallback;
+ // This map is used to put additional parameters to an index accessor.
+
+ public ExtendedIndexAccessParameters(IExtendedModificationOperationCallback extendedModificationCallback,
+ ISearchOperationCallback searchOperationCallback) {
+ super(extendedModificationCallback, searchOperationCallback);
+ this.extendedModificationCallback = extendedModificationCallback;
+ }
+
+ @Override
+ public IExtendedModificationOperationCallback getModificationCallback() {
+ return extendedModificationCallback;
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpIndexAccessParameters.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpIndexAccessParameters.java
index 2edea70..0068f4f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpIndexAccessParameters.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpIndexAccessParameters.java
@@ -21,6 +21,7 @@
import java.util.Collections;
import java.util.Map;
+import org.apache.hyracks.storage.am.common.api.IExtendedModificationOperationCallback;
import org.apache.hyracks.storage.common.IIndexAccessParameters;
import org.apache.hyracks.storage.common.IModificationOperationCallback;
import org.apache.hyracks.storage.common.ISearchOperationCallback;
@@ -34,7 +35,7 @@
}
@Override
- public IModificationOperationCallback getModificationCallback() {
+ public IExtendedModificationOperationCallback getModificationCallback() {
return NoOpOperationCallback.INSTANCE;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallback.java
index 15aba57..85417f2 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallback.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallback.java
@@ -21,13 +21,15 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.common.api.IExtendedModificationOperationCallback;
import org.apache.hyracks.storage.common.IModificationOperationCallback;
import org.apache.hyracks.storage.common.ISearchOperationCallback;
/**
* Dummy operation callback that simply does nothing.
*/
-public enum NoOpOperationCallback implements IModificationOperationCallback, ISearchOperationCallback {
+public enum NoOpOperationCallback
+ implements IModificationOperationCallback, ISearchOperationCallback, IExtendedModificationOperationCallback {
INSTANCE;
@Override
@@ -59,4 +61,9 @@
public void complete(ITupleReference tuple) throws HyracksDataException {
// Do nothing.
}
+
+ @Override
+ public void after(ITupleReference tuple) throws HyracksDataException {
+ //Do nothing.
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/ophelpers/IndexOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/ophelpers/IndexOperation.java
index ff47d27..636e4f5 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/ophelpers/IndexOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/ophelpers/IndexOperation.java
@@ -26,6 +26,7 @@
UPDATE,
UPSERT,
SEARCH,
+ FILTER_MOD,
DISKORDERSCAN,
PHYSICALDELETE,
NOOP,
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddyOpContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddyOpContext.java
index 1c74275..7f53ed5 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddyOpContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddyOpContext.java
@@ -20,6 +20,7 @@
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.common.api.IExtendedModificationOperationCallback;
import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
@@ -65,7 +66,7 @@
// This should never be needed for disk only indexes
@Override
- public IModificationOperationCallback getModificationCallback() {
+ public IExtendedModificationOperationCallback getModificationCallback() {
return null;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index 4578eb3..41a11e6 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -32,11 +32,13 @@
import org.apache.hyracks.storage.am.btree.impls.BTree;
import org.apache.hyracks.storage.am.btree.impls.BTree.BTreeAccessor;
import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
+import org.apache.hyracks.storage.am.common.api.IExtendedModificationOperationCallback;
import org.apache.hyracks.storage.am.common.api.IIndexOperationContext;
import org.apache.hyracks.storage.am.common.api.IPageManager;
import org.apache.hyracks.storage.am.common.api.ITreeIndex;
import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
+import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
import org.apache.hyracks.storage.am.lsm.btree.tuples.LSMBTreeTupleReference;
import org.apache.hyracks.storage.am.lsm.common.api.AbstractLSMWithBloomFilterDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.IComponentFilterHelper;
@@ -151,7 +153,6 @@
if (ctx.getIndexTuple() != null) {
ctx.getIndexTuple().reset(tuple);
indexTuple = ctx.getIndexTuple();
- ctx.getCurrentMutableBTreeAccessor().getOpContext().resetNonIndexFieldsTuple(tuple);
} else {
indexTuple = tuple;
}
@@ -303,7 +304,8 @@
List<ITupleReference> filterTuples = new ArrayList<>();
filterTuples.add(flushingComponent.getLSMComponentFilter().getMinTuple());
filterTuples.add(flushingComponent.getLSMComponentFilter().getMaxTuple());
- getFilterManager().updateFilter(component.getLSMComponentFilter(), filterTuples);
+ getFilterManager().updateFilter(component.getLSMComponentFilter(), filterTuples,
+ NoOpOperationCallback.INSTANCE);
getFilterManager().writeFilter(component.getLSMComponentFilter(), component.getMetadataHolder());
}
// Write metadata from memory component to disk
@@ -353,7 +355,8 @@
filterTuples.add(mergeOp.getMergingComponents().get(i).getLSMComponentFilter().getMinTuple());
filterTuples.add(mergeOp.getMergingComponents().get(i).getLSMComponentFilter().getMaxTuple());
}
- getFilterManager().updateFilter(mergedComponent.getLSMComponentFilter(), filterTuples);
+ getFilterManager().updateFilter(mergedComponent.getLSMComponentFilter(), filterTuples,
+ NoOpOperationCallback.INSTANCE);
getFilterManager().writeFilter(mergedComponent.getLSMComponentFilter(),
mergedComponent.getMetadataHolder());
}
@@ -396,8 +399,9 @@
int numBloomFilterKeyFields = hasBloomFilter
? ((LSMBTreeWithBloomFilterDiskComponentFactory) componentFactory).getBloomFilterKeyFields().length : 0;
return new LSMBTreeOpContext(this, memoryComponents, insertLeafFrameFactory, deleteLeafFrameFactory,
- iap.getModificationCallback(), iap.getSearchOperationCallback(), numBloomFilterKeyFields,
- getTreeFields(), getFilterFields(), getHarness(), getFilterCmpFactories(), tracer);
+ (IExtendedModificationOperationCallback) iap.getModificationCallback(),
+ iap.getSearchOperationCallback(), numBloomFilterKeyFields, getTreeFields(), getFilterFields(),
+ getHarness(), getFilterCmpFactories(), tracer);
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
index 7969ba3..1cfc414 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
@@ -29,8 +29,10 @@
import org.apache.hyracks.storage.am.btree.impls.BTreeOpContext;
import org.apache.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
+import org.apache.hyracks.storage.am.common.api.IExtendedModificationOperationCallback;
import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import org.apache.hyracks.storage.am.common.impls.IndexAccessParameters;
+import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
@@ -69,7 +71,7 @@
public LSMBTreeOpContext(ILSMIndex index, List<ILSMMemoryComponent> mutableComponents,
ITreeIndexFrameFactory insertLeafFrameFactory, ITreeIndexFrameFactory deleteLeafFrameFactory,
- IModificationOperationCallback modificationCallback, ISearchOperationCallback searchCallback,
+ IExtendedModificationOperationCallback modificationCallback, ISearchOperationCallback searchCallback,
int numBloomFilterKeyFields, int[] btreeFields, int[] filterFields, ILSMHarness lsmHarness,
IBinaryComparatorFactory[] filterCmpFactories, ITracer tracer) {
super(index, btreeFields, filterFields, filterCmpFactories, searchCallback, modificationCallback, tracer);
@@ -90,14 +92,9 @@
for (int i = 0; i < mutableComponents.size(); i++) {
LSMBTreeMemoryComponent mutableComponent = (LSMBTreeMemoryComponent) mutableComponents.get(i);
mutableBTrees[i] = mutableComponent.getIndex();
- if (allFields != null) {
- mutableBTreeAccessors[i] = mutableBTrees[i].createAccessor(modificationCallback,
- NoOpOperationCallback.INSTANCE, allFields);
- } else {
- IIndexAccessParameters iap =
- new IndexAccessParameters(modificationCallback, NoOpOperationCallback.INSTANCE);
- mutableBTreeAccessors[i] = mutableBTrees[i].createAccessor(iap);
- }
+ IIndexAccessParameters iap =
+ new IndexAccessParameters(modificationCallback, NoOpOperationCallback.INSTANCE);
+ mutableBTreeAccessors[i] = mutableBTrees[i].createAccessor(iap);
mutableBTreeOpCtxs[i] = mutableBTreeAccessors[i].getOpContext();
}
this.insertLeafFrameFactory = insertLeafFrameFactory;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentFilter.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentFilter.java
index b92e0d2..34f8855 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentFilter.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentFilter.java
@@ -21,14 +21,16 @@
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.common.api.IExtendedModificationOperationCallback;
import org.apache.hyracks.storage.common.MultiComparator;
public interface ILSMComponentFilter {
- void update(ITupleReference tuple, MultiComparator cmp) throws HyracksDataException;
-
boolean satisfy(ITupleReference min, ITupleReference max, MultiComparator cmp) throws HyracksDataException;
+ void update(ITupleReference tuple, MultiComparator cmp, IExtendedModificationOperationCallback opCallback)
+ throws HyracksDataException;
+
ITupleReference getMinTuple();
ITupleReference getMaxTuple();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentFilterManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentFilterManager.java
index f310b4e..64b0c44 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentFilterManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentFilterManager.java
@@ -22,11 +22,13 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.common.api.IExtendedModificationOperationCallback;
import org.apache.hyracks.storage.am.common.api.ITreeIndex;
public interface ILSMComponentFilterManager {
- void updateFilter(ILSMComponentFilter filter, List<ITupleReference> filterTuples) throws HyracksDataException;
+ void updateFilter(ILSMComponentFilter filter, List<ITupleReference> filterTuples,
+ IExtendedModificationOperationCallback opCallback) throws HyracksDataException;
boolean readFilter(ILSMComponentFilter filter, ITreeIndex index) throws HyracksDataException;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
index 0e1a5e4..61ef6cf 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
@@ -263,4 +263,11 @@
* @throws HyracksDataException
*/
void deleteComponents(Predicate<ILSMComponent> predicate) throws HyracksDataException;
+
+ /**
+ * Update the filter of an LSM index
+ * @param tuple
+ * @throws HyracksDataException
+ */
+ void updateFilter(ITupleReference tuple) throws HyracksDataException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java
index 79b3262..b34b403 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java
@@ -20,6 +20,7 @@
import java.util.List;
+import org.apache.hyracks.storage.am.common.api.IExtendedModificationOperationCallback;
import org.apache.hyracks.storage.am.common.api.IIndexOperationContext;
import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
@@ -35,7 +36,7 @@
ISearchOperationCallback getSearchOperationCallback();
- IModificationOperationCallback getModificationCallback();
+ IExtendedModificationOperationCallback getModificationCallback();
void setCurrentMutableComponentId(int currentMutableComponentId);
@@ -84,6 +85,14 @@
*/
boolean isTracingEnabled();
+ boolean isFilterSkipped();
+
+ void setFilterSkip(boolean skip);
+
+ boolean isRecovery();
+
+ void setRecovery(boolean recovery);
+
/**
* @return the IO Operation type associated with this context
*/
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index 0368a09..bd91094 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -662,10 +662,15 @@
@Override
public void updateFilter(ILSMIndexOperationContext ctx, ITupleReference tuple) throws HyracksDataException {
- if (ctx.getFilterTuple() != null) {
- ctx.getFilterTuple().reset(tuple);
- memoryComponents.get(currentMutableComponentId.get()).getLSMComponentFilter().update(ctx.getFilterTuple(),
- ctx.getFilterCmp());
+ if (ctx.getFilterTuple() != null && !ctx.isFilterSkipped()) {
+ if (ctx.isRecovery()) {
+ memoryComponents.get(currentMutableComponentId.get()).getLSMComponentFilter().update(tuple,
+ ctx.getFilterCmp(), ctx.getModificationCallback());
+ } else {
+ ctx.getFilterTuple().reset(tuple);
+ memoryComponents.get(currentMutableComponentId.get()).getLSMComponentFilter()
+ .update(ctx.getFilterTuple(), ctx.getFilterCmp(), ctx.getModificationCallback());
+ }
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexOperationContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexOperationContext.java
index 72c2b07..17aa394 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexOperationContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexOperationContext.java
@@ -23,6 +23,7 @@
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.common.api.IExtendedModificationOperationCallback;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
@@ -43,24 +44,25 @@
protected final PermutingTupleReference indexTuple;
protected final MultiComparator filterCmp;
protected final PermutingTupleReference filterTuple;
- protected final int[] allFields;
protected final List<ILSMComponent> componentHolder;
protected final List<ILSMDiskComponent> componentsToBeMerged;
protected final List<ILSMDiskComponent> componentsToBeReplicated;
protected final ISearchOperationCallback searchCallback;
- protected final IModificationOperationCallback modificationCallback;
+ protected final IExtendedModificationOperationCallback modificationCallback;
protected IndexOperation op;
protected boolean accessingComponents = false;
protected ISearchPredicate searchPredicate;
protected final ITracer tracer;
protected final long traceCategory;
private long enterExitTime = 0L;
+ protected boolean skipFilter = false;
+ protected boolean recovery = false;
private LSMIOOperationType ioOpType = LSMIOOperationType.NOOP;
private ILSMDiskComponent newDiskComponent;
public AbstractLSMIndexOperationContext(ILSMIndex index, int[] treeFields, int[] filterFields,
IBinaryComparatorFactory[] filterCmpFactories, ISearchOperationCallback searchCallback,
- IModificationOperationCallback modificationCallback, ITracer tracer) {
+ IExtendedModificationOperationCallback modificationCallback, ITracer tracer) {
this.index = index;
this.searchCallback = searchCallback;
this.modificationCallback = modificationCallback;
@@ -71,18 +73,10 @@
indexTuple = new PermutingTupleReference(treeFields);
filterCmp = MultiComparator.create(filterCmpFactories);
filterTuple = new PermutingTupleReference(filterFields);
- allFields = new int[treeFields.length + filterFields.length];
- for (int i = 0; i < treeFields.length; i++) {
- allFields[i] = treeFields[i];
- }
- for (int i = treeFields.length; i < treeFields.length + filterFields.length; i++) {
- allFields[i] = filterFields[i - treeFields.length];
- }
} else {
indexTuple = null;
filterCmp = null;
filterTuple = null;
- allFields = null;
}
this.tracer = tracer;
this.traceCategory = tracer.getRegistry().get("op-ctx");
@@ -137,7 +131,7 @@
}
@Override
- public IModificationOperationCallback getModificationCallback() {
+ public IExtendedModificationOperationCallback getModificationCallback() {
return modificationCallback;
}
@@ -196,6 +190,25 @@
}
@Override
+ public boolean isFilterSkipped() {
+ return skipFilter;
+ }
+
+ @Override
+ public void setFilterSkip(boolean skip) {
+ this.skipFilter = skip;
+ }
+
+ @Override
+ public boolean isRecovery() {
+ return recovery;
+ }
+
+ @Override
+ public void setRecovery(boolean recovery) {
+ this.recovery = recovery;
+ }
+
public LSMIOOperationType getIoOperationType() {
return ioOpType;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ComponentReplacementContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ComponentReplacementContext.java
index a992c5e..7bb24dc 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ComponentReplacementContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ComponentReplacementContext.java
@@ -23,6 +23,7 @@
import java.util.List;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.common.api.IExtendedModificationOperationCallback;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
@@ -91,7 +92,7 @@
}
@Override
- public IModificationOperationCallback getModificationCallback() {
+ public IExtendedModificationOperationCallback getModificationCallback() {
return null;
}
@@ -216,6 +217,26 @@
}
@Override
+ public boolean isFilterSkipped() {
+ return false;
+ }
+
+ @Override
+ public void setFilterSkip(boolean skip) {
+ //not used in recovery
+ }
+
+ @Override
+ public boolean isRecovery() {
+ return false;
+ }
+
+ @Override
+ public void setRecovery(boolean recovery) {
+ //not used in recovery
+ }
+
+ @Override
public void destroy() throws HyracksDataException {
// No Op.. Nothing to destroy
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FilterBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FilterBulkLoader.java
index 625f81e..43d2b0d 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FilterBulkLoader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FilterBulkLoader.java
@@ -21,6 +21,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.common.api.ITreeIndex;
+import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterManager;
@@ -76,6 +77,6 @@
private void updateFilter(ITupleReference tuple) throws HyracksDataException {
filterTuple.reset(tuple);
- filter.update(filterTuple, filterCmp);
+ filter.update(filterTuple, filterCmp, NoOpOperationCallback.INSTANCE);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFilter.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFilter.java
index 5f01550..6ccb114 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFilter.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFilter.java
@@ -23,6 +23,7 @@
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.common.api.IExtendedModificationOperationCallback;
import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleReference;
import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
@@ -63,10 +64,14 @@
}
@Override
- public void update(ITupleReference tuple, MultiComparator cmp) throws HyracksDataException {
+ public void update(ITupleReference tuple, MultiComparator cmp, IExtendedModificationOperationCallback opCallback)
+ throws HyracksDataException {
+ boolean logged = false;
if (minTuple == null) {
int numBytes = tupleWriter.bytesRequired(tuple);
minTupleBytes = new byte[numBytes];
+ opCallback.after(tuple);
+ logged = true;
tupleWriter.writeTuple(tuple, minTupleBytes, 0);
minTupleBuf = ByteBuffer.wrap(minTupleBytes);
minTuple = tupleWriter.createTupleReference();
@@ -74,6 +79,8 @@
} else {
int c = cmp.compare(tuple, minTuple);
if (c < 0) {
+ opCallback.after(tuple);
+ logged = true;
int numBytes = tupleWriter.bytesRequired(tuple);
if (minTupleBytes.length < numBytes) {
minTupleBytes = new byte[numBytes];
@@ -88,6 +95,9 @@
if (maxTuple == null) {
int numBytes = tupleWriter.bytesRequired(tuple);
maxTupleBytes = new byte[numBytes];
+ if (!logged) {
+ opCallback.after(tuple);
+ }
tupleWriter.writeTuple(tuple, maxTupleBytes, 0);
maxTupleBuf = ByteBuffer.wrap(maxTupleBytes);
maxTuple = tupleWriter.createTupleReference();
@@ -95,6 +105,9 @@
} else {
int c = cmp.compare(tuple, maxTuple);
if (c > 0) {
+ if (!logged) {
+ opCallback.after(tuple);
+ }
int numBytes = tupleWriter.bytesRequired(tuple);
if (maxTupleBytes.length < numBytes) {
maxTupleBytes = new byte[numBytes];
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFilterManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFilterManager.java
index 90ca7d6..c2b4842 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFilterManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFilterManager.java
@@ -23,9 +23,11 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.common.api.IExtendedModificationOperationCallback;
import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
import org.apache.hyracks.storage.am.common.api.ITreeIndex;
import org.apache.hyracks.storage.am.common.freepage.MutableArrayValueReference;
+import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterFrameFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterManager;
@@ -42,11 +44,11 @@
}
@Override
- public void updateFilter(ILSMComponentFilter filter, List<ITupleReference> filterTuples)
- throws HyracksDataException {
+ public void updateFilter(ILSMComponentFilter filter, List<ITupleReference> filterTuples,
+ IExtendedModificationOperationCallback opCallback) throws HyracksDataException {
MultiComparator filterCmp = MultiComparator.create(filter.getFilterCmpFactories());
for (ITupleReference tuple : filterTuples) {
- filter.update(tuple, filterCmp);
+ filter.update(tuple, filterCmp, opCallback);
}
}
@@ -78,7 +80,7 @@
List<ITupleReference> filterTuples = new ArrayList<>();
filterTuples.add(filterFrame.getMinTuple());
filterTuples.add(filterFrame.getMaxTuple());
- updateFilter(filter, filterTuples);
+ updateFilter(filter, filterTuples, NoOpOperationCallback.INSTANCE);
return true;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
index 9794a98..1548f86 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
@@ -205,6 +205,7 @@
return cursorFactory.create(ctx);
}
+ @Override
public void updateFilter(ITupleReference tuple) throws HyracksDataException {
ctx.setOperation(IndexOperation.UPSERT);
lsmHarness.updateFilter(ctx, tuple);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
index a395e67..5fda514 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
@@ -35,6 +35,7 @@
import org.apache.hyracks.storage.am.btree.util.BTreeUtils;
import org.apache.hyracks.storage.am.common.api.IIndexOperationContext;
import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
+import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
import org.apache.hyracks.storage.am.lsm.common.api.IComponentFilterHelper;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
@@ -155,13 +156,12 @@
if (ctx.getIndexTuple() != null) {
ctx.getIndexTuple().reset(tuple);
indexTuple = ctx.getIndexTuple();
- ((InMemoryInvertedIndexAccessor) (ctx.getCurrentMutableInvIndexAccessors())).resetLogTuple(tuple);
} else {
indexTuple = tuple;
}
- ctx.getModificationCallback().before(tuple);
- ctx.getModificationCallback().found(null, tuple);
+ ctx.getModificationCallback().before(indexTuple);
+ ctx.getModificationCallback().found(null, indexTuple);
switch (ctx.getOperation()) {
case INSERT:
// Insert into the in-memory inverted index.
@@ -336,7 +336,7 @@
List<ITupleReference> filterTuples = new ArrayList<>();
filterTuples.add(flushingComponent.getLSMComponentFilter().getMinTuple());
filterTuples.add(flushingComponent.getLSMComponentFilter().getMaxTuple());
- filterManager.updateFilter(component.getLSMComponentFilter(), filterTuples);
+ filterManager.updateFilter(component.getLSMComponentFilter(), filterTuples, NoOpOperationCallback.INSTANCE);
filterManager.writeFilter(component.getLSMComponentFilter(), component.getMetadataHolder());
}
flushingComponent.getMetadata().copy(component.getMetadata());
@@ -399,7 +399,8 @@
filterTuples.add(max);
}
}
- getFilterManager().updateFilter(component.getLSMComponentFilter(), filterTuples);
+ getFilterManager().updateFilter(component.getLSMComponentFilter(), filterTuples,
+ NoOpOperationCallback.INSTANCE);
getFilterManager().writeFilter(component.getLSMComponentFilter(), component.getMetadataHolder());
}
componentBulkLoader.end();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
index d7408ff..247e44c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
@@ -208,6 +208,11 @@
}
@Override
+ public void updateFilter(ITupleReference tuple) throws HyracksDataException {
+ lsmHarness.updateFilter(ctx, tuple);
+ }
+
+ @Override
public void scanDiskComponents(IIndexCursor cursor) throws HyracksDataException {
throw HyracksDataException.create(ErrorCode.DISK_COMPONENT_SCAN_NOT_ALLOWED_FOR_SECONDARY_INDEX);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java
index e7a725e..e238eb2 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java
@@ -24,6 +24,7 @@
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.util.CleanupUtils;
+import org.apache.hyracks.storage.am.common.api.IExtendedModificationOperationCallback;
import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
@@ -54,18 +55,14 @@
IIndexAccessParameters iap, int[] invertedIndexFields, int[] filterFields,
IBinaryComparatorFactory[] filterComparatorFactories, ITracer tracer) throws HyracksDataException {
super(index, invertedIndexFields, filterFields, filterComparatorFactories, iap.getSearchOperationCallback(),
- iap.getModificationCallback(), tracer);
+ (IExtendedModificationOperationCallback) iap.getModificationCallback(), tracer);
mutableInvIndexAccessors = new IInvertedIndexAccessor[mutableComponents.size()];
deletedKeysBTreeAccessors = new IIndexAccessor[mutableComponents.size()];
for (int i = 0; i < mutableComponents.size(); i++) {
LSMInvertedIndexMemoryComponent mutableComponent =
(LSMInvertedIndexMemoryComponent) mutableComponents.get(i);
- if (allFields != null) {
- mutableInvIndexAccessors[i] = mutableComponent.getIndex().createAccessor(iap, allFields);
- } else {
- mutableInvIndexAccessors[i] =
- mutableComponent.getIndex().createAccessor(NoOpIndexAccessParameters.INSTANCE);
- }
+ mutableInvIndexAccessors[i] =
+ mutableComponent.getIndex().createAccessor(NoOpIndexAccessParameters.INSTANCE);
deletedKeysBTreeAccessors[i] =
mutableComponent.getBuddyIndex().createAccessor(NoOpIndexAccessParameters.INSTANCE);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java
index e74733b..4970ccb 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java
@@ -182,13 +182,6 @@
(IHyracksTaskContext) iap.getParameters().get(HyracksConstants.HYRACKS_TASK_CONTEXT));
}
- public InMemoryInvertedIndexAccessor createAccessor(IIndexAccessParameters iap, int[] nonIndexFields)
- throws HyracksDataException {
- return new InMemoryInvertedIndexAccessor(this,
- new InMemoryInvertedIndexOpContext(btree, tokenCmpFactories, tokenizerFactory), nonIndexFields,
- (IHyracksTaskContext) iap.getParameters().get(HyracksConstants.HYRACKS_TASK_CONTEXT));
- }
-
@Override
public IBufferCache getBufferCache() {
return btree.getBufferCache();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndexAccessor.java
index 2a35301..e48f16f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndexAccessor.java
@@ -56,16 +56,6 @@
this.btreeAccessor = index.getBTree().createAccessor(NoOpIndexAccessParameters.INSTANCE);
}
- public InMemoryInvertedIndexAccessor(InMemoryInvertedIndex index, IIndexOperationContext opCtx,
- int[] nonIndexFields, IHyracksTaskContext ctx) throws HyracksDataException {
- this.ctx = ctx;
- this.opCtx = opCtx;
- this.index = index;
- this.searcher = null;
- this.btreeAccessor = index.getBTree().createAccessor(NoOpOperationCallback.INSTANCE,
- NoOpOperationCallback.INSTANCE, nonIndexFields);
- }
-
@Override
public void insert(ITupleReference tuple) throws HyracksDataException {
opCtx.setOperation(IndexOperation.INSERT);
@@ -137,10 +127,6 @@
return null;
}
- public void resetLogTuple(ITupleReference newTuple) {
- btreeAccessor.getOpContext().resetNonIndexFieldsTuple(newTuple);
- }
-
@Override
public void destroy() throws HyracksDataException {
if (destroyed) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndex.java
index 986ceac..806bbf5 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndex.java
@@ -91,14 +91,6 @@
}
@Override
- public PartitionedInMemoryInvertedIndexAccessor createAccessor(IIndexAccessParameters iap, int[] nonIndexFields)
- throws HyracksDataException {
- return new PartitionedInMemoryInvertedIndexAccessor(this,
- new PartitionedInMemoryInvertedIndexOpContext(btree, tokenCmpFactories, tokenizerFactory),
- nonIndexFields, iap);
- }
-
- @Override
public boolean openInvertedListPartitionCursors(IInvertedIndexSearcher searcher, IIndexOperationContext ictx,
short numTokensLowerBound, short numTokensUpperBound, InvertedListPartitions invListPartitions)
throws HyracksDataException {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndexAccessor.java
index b5044d0..64dc4c4 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndexAccessor.java
@@ -34,12 +34,6 @@
super(index, opCtx, (IHyracksTaskContext) iap.getParameters().get(HyracksConstants.HYRACKS_TASK_CONTEXT));
}
- public PartitionedInMemoryInvertedIndexAccessor(InMemoryInvertedIndex index, IIndexOperationContext opCtx,
- int[] nonIndexFields, IIndexAccessParameters iap) throws HyracksDataException {
- super(index, opCtx, nonIndexFields,
- (IHyracksTaskContext) iap.getParameters().get(HyracksConstants.HYRACKS_TASK_CONTEXT));
- }
-
protected IInvertedIndexSearcher createSearcher() throws HyracksDataException {
return new PartitionedTOccurrenceSearcher(index, ctx);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
index 0dc25ec..e36abb4 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
@@ -29,6 +29,7 @@
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.btree.impls.BTree;
+import org.apache.hyracks.storage.am.common.api.IExtendedModificationOperationCallback;
import org.apache.hyracks.storage.am.common.api.IIndexOperationContext;
import org.apache.hyracks.storage.am.common.api.IPageManager;
import org.apache.hyracks.storage.am.common.api.ITreeIndex;
@@ -201,13 +202,12 @@
if (ctx.getIndexTuple() != null) {
ctx.getIndexTuple().reset(tuple);
indexTuple = ctx.getIndexTuple();
- ctx.getCurrentMutableRTreeAccessor().getOpContext().resetNonIndexFieldsTuple(tuple);
} else {
indexTuple = tuple;
}
ctx.getModificationCallback().before(indexTuple);
- ctx.getModificationCallback().found(null, tuple);
+ ctx.getModificationCallback().found(null, indexTuple);
if (ctx.getOperation() == IndexOperation.INSERT) {
ctx.getCurrentMutableRTreeAccessor().insert(indexTuple);
} else {
@@ -230,8 +230,9 @@
@Override
protected LSMRTreeOpContext createOpContext(IIndexAccessParameters iap) {
return new LSMRTreeOpContext(this, memoryComponents, rtreeLeafFrameFactory, rtreeInteriorFrameFactory,
- btreeLeafFrameFactory, iap.getModificationCallback(), iap.getSearchOperationCallback(), getTreeFields(),
- getFilterFields(), getHarness(), comparatorFields, linearizerArray, getFilterCmpFactories(), tracer);
+ btreeLeafFrameFactory, (IExtendedModificationOperationCallback) iap.getModificationCallback(),
+ iap.getSearchOperationCallback(), getTreeFields(), getFilterFields(), getHarness(), comparatorFields,
+ linearizerArray, getFilterCmpFactories(), tracer);
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
index 7c1467c..4510618 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
@@ -36,6 +36,7 @@
import org.apache.hyracks.storage.am.common.api.IIndexOperationContext;
import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
+import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.common.tuples.DualTupleReference;
import org.apache.hyracks.storage.am.lsm.common.api.IComponentFilterHelper;
@@ -151,7 +152,8 @@
List<ITupleReference> filterTuples = new ArrayList<>();
filterTuples.add(flushingComponent.getLSMComponentFilter().getMinTuple());
filterTuples.add(flushingComponent.getLSMComponentFilter().getMaxTuple());
- getFilterManager().updateFilter(component.getLSMComponentFilter(), filterTuples);
+ getFilterManager().updateFilter(component.getLSMComponentFilter(), filterTuples,
+ NoOpOperationCallback.INSTANCE);
getFilterManager().writeFilter(component.getLSMComponentFilter(), component.getMetadataHolder());
}
// Note. If we change the filter to write to metadata object, we don't need the if block above
@@ -289,7 +291,8 @@
filterTuples.add(mergeOp.getMergingComponents().get(i).getLSMComponentFilter().getMinTuple());
filterTuples.add(mergeOp.getMergingComponents().get(i).getLSMComponentFilter().getMaxTuple());
}
- getFilterManager().updateFilter(mergedComponent.getLSMComponentFilter(), filterTuples);
+ getFilterManager().updateFilter(mergedComponent.getLSMComponentFilter(), filterTuples,
+ NoOpOperationCallback.INSTANCE);
getFilterManager().writeFilter(mergedComponent.getLSMComponentFilter(),
mergedComponent.getMetadataHolder());
}
@@ -389,7 +392,6 @@
if (ctx.getIndexTuple() != null) {
ctx.getIndexTuple().reset(tuple);
indexTuple = ctx.getIndexTuple();
- ctx.getCurrentMutableRTreeAccessor().getOpContext().resetNonIndexFieldsTuple(tuple);
} else {
indexTuple = tuple;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java
index 858f6e0..eb79960 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java
@@ -26,6 +26,7 @@
import org.apache.hyracks.api.util.CleanupUtils;
import org.apache.hyracks.storage.am.btree.impls.BTree;
import org.apache.hyracks.storage.am.btree.impls.BTreeOpContext;
+import org.apache.hyracks.storage.am.common.api.IExtendedModificationOperationCallback;
import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
@@ -57,7 +58,7 @@
public LSMRTreeOpContext(ILSMIndex index, List<ILSMMemoryComponent> mutableComponents,
ITreeIndexFrameFactory rtreeLeafFrameFactory, ITreeIndexFrameFactory rtreeInteriorFrameFactory,
- ITreeIndexFrameFactory btreeLeafFrameFactory, IModificationOperationCallback modificationCallback,
+ ITreeIndexFrameFactory btreeLeafFrameFactory, IExtendedModificationOperationCallback modificationCallback,
ISearchOperationCallback searchCallback, int[] rtreeFields, int[] filterFields, ILSMHarness lsmHarness,
int[] comparatorFields, IBinaryComparatorFactory[] linearizerArray,
IBinaryComparatorFactory[] filterComparatorFactories, ITracer tracer) {
@@ -69,13 +70,7 @@
btreeOpContexts = new BTreeOpContext[mutableComponents.size()];
for (int i = 0; i < mutableComponents.size(); i++) {
LSMRTreeMemoryComponent mutableComponent = (LSMRTreeMemoryComponent) mutableComponents.get(i);
- if (allFields != null) {
- mutableRTreeAccessors[i] = mutableComponent.getIndex().createAccessor(NoOpOperationCallback.INSTANCE,
- NoOpOperationCallback.INSTANCE, allFields);
- } else {
- mutableRTreeAccessors[i] =
- mutableComponent.getIndex().createAccessor(NoOpIndexAccessParameters.INSTANCE);
- }
+ mutableRTreeAccessors[i] = mutableComponent.getIndex().createAccessor(NoOpIndexAccessParameters.INSTANCE);
mutableBTreeAccessors[i] =
mutableComponent.getBuddyIndex().createAccessor(NoOpIndexAccessParameters.INSTANCE);
@@ -146,4 +141,4 @@
throw HyracksDataException.create(failure);
}
}
-}
\ No newline at end of file
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
index 3bb94ed..a3ba4b1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
@@ -32,6 +32,7 @@
import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
+import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.IComponentFilterHelper;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterFrameFactory;
@@ -174,7 +175,8 @@
List<ITupleReference> filterTuples = new ArrayList<>();
filterTuples.add(flushingComponent.getLSMComponentFilter().getMinTuple());
filterTuples.add(flushingComponent.getLSMComponentFilter().getMaxTuple());
- getFilterManager().updateFilter(component.getLSMComponentFilter(), filterTuples);
+ getFilterManager().updateFilter(component.getLSMComponentFilter(), filterTuples,
+ NoOpOperationCallback.INSTANCE);
getFilterManager().writeFilter(component.getLSMComponentFilter(), component.getMetadataHolder());
}
flushingComponent.getMetadata().copy(component.getMetadata());
@@ -249,7 +251,8 @@
filterTuples.add(mergeOp.getMergingComponents().get(i).getLSMComponentFilter().getMinTuple());
filterTuples.add(mergeOp.getMergingComponents().get(i).getLSMComponentFilter().getMaxTuple());
}
- getFilterManager().updateFilter(component.getLSMComponentFilter(), filterTuples);
+ getFilterManager().updateFilter(component.getLSMComponentFilter(), filterTuples,
+ NoOpOperationCallback.INSTANCE);
getFilterManager().writeFilter(component.getLSMComponentFilter(), component.getMetadataHolder());
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java
index 5582075..f6f0a3d 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java
@@ -157,12 +157,6 @@
modificationCallback);
}
- private RTreeOpContext createOpContext(IModificationOperationCallback modificationCallback, int[] nonIndexFields) {
- return new RTreeOpContext((IRTreeLeafFrame) leafFrameFactory.createFrame(),
- (IRTreeInteriorFrame) interiorFrameFactory.createFrame(), freePageManager, cmpFactories,
- modificationCallback, nonIndexFields);
- }
-
private ICachedPage findLeaf(RTreeOpContext ctx) throws HyracksDataException {
int pageId = rootPage;
boolean writeLatched = false;
@@ -305,7 +299,7 @@
if (!isLeaf) {
ctx.getInteriorFrame().insert(tuple, -1);
} else {
- foundModCallback(ctx, null, tuple);
+ ctx.getModificationCallback().found(null, tuple);
ctx.getLeafFrame().insert(tuple, -1);
}
succeeded = true;
@@ -330,7 +324,7 @@
ctx.getInteriorFrame().insert(tuple, -1);
} else {
ctx.getLeafFrame().compact();
- foundModCallback(ctx, null, tuple);
+ ctx.getModificationCallback().found(null, tuple);
ctx.getLeafFrame().insert(tuple, -1);
}
succeeded = true;
@@ -368,7 +362,7 @@
rightFrame.setPage(rightNode);
rightFrame.initBuffer((byte) 0);
rightFrame.setRightPage(ctx.getInteriorFrame().getRightPage());
- foundModCallback(ctx, null, tuple);
+ ctx.getModificationCallback().found(null, tuple);
ctx.getLeafFrame().split(rightFrame, tuple, ctx.getSplitKey(), ctx, bufferCache);
ctx.getLeafFrame().setRightPage(rightPageId);
}
@@ -761,11 +755,6 @@
return new RTreeAccessor(this, iap.getModificationCallback(), iap.getSearchOperationCallback());
}
- public RTreeAccessor createAccessor(IModificationOperationCallback modificationCallback,
- ISearchOperationCallback searchCallback, int[] nonIndexFields) {
- return new RTreeAccessor(this, modificationCallback, searchCallback, nonIndexFields);
- }
-
public class RTreeAccessor implements ITreeIndexAccessor {
private RTree rtree;
private RTreeOpContext ctx;
@@ -777,12 +766,6 @@
this.ctx = rtree.createOpContext(modificationCallback);
}
- public RTreeAccessor(RTree rtree, IModificationOperationCallback modificationCallback,
- ISearchOperationCallback searchCallback, int[] nonIndexFields) {
- this.rtree = rtree;
- this.ctx = rtree.createOpContext(modificationCallback, nonIndexFields);
- }
-
public void reset(RTree rtree, IModificationOperationCallback modificationCallback) {
this.rtree = rtree;
ctx.setModificationCallback(modificationCallback);
@@ -1126,13 +1109,4 @@
public int getNumOfFilterFields() {
return 0;
}
-
- private void foundModCallback(RTreeOpContext ctx, ITupleReference before, ITupleReference after)
- throws HyracksDataException {
- if (ctx.getTupleWithNonIndexFields() != null) {
- ctx.getModificationCallback().found(before, ctx.getTupleWithNonIndexFields());
- } else {
- ctx.getModificationCallback().found(before, after);
- }
- }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTreeOpContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTreeOpContext.java
index 46e6b22..76071f9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTreeOpContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTreeOpContext.java
@@ -30,7 +30,6 @@
import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor;
import org.apache.hyracks.storage.am.common.api.ITreeIndexMetadataFrame;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
-import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
import org.apache.hyracks.storage.am.rtree.api.IRTreeInteriorFrame;
import org.apache.hyracks.storage.am.rtree.api.IRTreeLeafFrame;
import org.apache.hyracks.storage.common.IModificationOperationCallback;
@@ -61,7 +60,6 @@
private IModificationOperationCallback modificationCallback;
- private PermutingTupleReference tupleWithNonIndexFields;
private boolean destroyed = false;
public RTreeOpContext(IRTreeLeafFrame leafFrame, IRTreeInteriorFrame interiorFrame, IPageManager freePageManager,
@@ -83,13 +81,6 @@
LSNUpdates = new ArrayList<>();
}
- public RTreeOpContext(IRTreeLeafFrame leafFrame, IRTreeInteriorFrame interiorFrame, IPageManager freePageManager,
- IBinaryComparatorFactory[] cmpFactories, IModificationOperationCallback modificationCallback,
- int[] nonIndexFields) {
- this(leafFrame, interiorFrame, freePageManager, cmpFactories, modificationCallback);
- tupleWithNonIndexFields = new PermutingTupleReference(nonIndexFields);
- }
-
public ITupleReference getTuple() {
return tuple;
}
@@ -201,14 +192,6 @@
return cursorInitialState;
}
- public ITupleReference getTupleWithNonIndexFields() {
- return tupleWithNonIndexFields;
- }
-
- public void resetNonIndexFieldsTuple(ITupleReference newValue) {
- tupleWithNonIndexFields.reset(newValue);
- }
-
@Override
public void destroy() throws HyracksDataException {
if (destroyed) {
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/btree/AbstractModificationOperationCallbackTest.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/btree/AbstractModificationOperationCallbackTest.java
index 1c81f68..e915112 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/btree/AbstractModificationOperationCallbackTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/btree/AbstractModificationOperationCallbackTest.java
@@ -23,6 +23,7 @@
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.dataflow.common.utils.TupleUtils;
+import org.apache.hyracks.storage.am.common.api.IExtendedModificationOperationCallback;
import org.apache.hyracks.storage.am.common.impls.IndexAccessParameters;
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
import org.apache.hyracks.storage.am.config.AccessMethodTestsConfig;
@@ -84,7 +85,7 @@
}
}
- private class VeriyfingModificationCallback implements IModificationOperationCallback {
+ private class VeriyfingModificationCallback implements IExtendedModificationOperationCallback {
@Override
public void before(ITupleReference tuple) throws HyracksDataException {
@@ -100,6 +101,11 @@
}
Assert.assertEquals(0, cmp.compare(AbstractModificationOperationCallbackTest.this.tuple, after));
}
+
+ @Override
+ public void after(ITupleReference tuple) throws HyracksDataException {
+ Assert.assertEquals(0, cmp.compare(AbstractModificationOperationCallbackTest.this.tuple, tuple));
+ }
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/TestOperationCallback.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/TestOperationCallback.java
index d796ece..3a5c8fe 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/TestOperationCallback.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/TestOperationCallback.java
@@ -22,10 +22,11 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.common.api.IExtendedModificationOperationCallback;
import org.apache.hyracks.storage.common.IModificationOperationCallback;
import org.apache.hyracks.storage.common.ISearchOperationCallback;
-public enum TestOperationCallback implements ISearchOperationCallback, IModificationOperationCallback {
+public enum TestOperationCallback implements ISearchOperationCallback, IExtendedModificationOperationCallback {
INSTANCE;
private static final int RANDOM_SEED = 50;
@@ -65,4 +66,9 @@
public void complete(ITupleReference tuple) throws HyracksDataException {
// Do nothing.
}
+
+ @Override
+ public void after(ITupleReference tuple) throws HyracksDataException {
+ // Do nothing.
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java
index d42c3b6..fc852cd 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java
@@ -29,6 +29,7 @@
import org.apache.hyracks.dataflow.common.utils.TupleUtils;
import org.apache.hyracks.storage.am.btree.AbstractOperationCallbackTest;
import org.apache.hyracks.storage.am.common.api.IBTreeIndexTupleReference;
+import org.apache.hyracks.storage.am.common.api.IExtendedModificationOperationCallback;
import org.apache.hyracks.storage.am.common.impls.IndexAccessParameters;
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
import org.apache.hyracks.storage.am.config.AccessMethodTestsConfig;
@@ -153,7 +154,7 @@
test((IIndexAccessor a) -> a.upsert(tuple), (IIndexAccessor a) -> a.upsert(tuple));
}
- private class VerifyingUpdateModificationCallback implements IModificationOperationCallback {
+ private class VerifyingUpdateModificationCallback implements IExtendedModificationOperationCallback {
private final ITupleReference tuple;
@@ -176,6 +177,11 @@
}
Assert.assertEquals(0, cmp.compare(this.tuple, after));
}
+
+ @Override
+ public void after(ITupleReference tuple) {
+ //Nothing to do there, not testing filters
+ }
}
}