ASTERIXDB-139: Add temp workspace files deletion to IOManager
This change includes the following:
- Add a method to delete temp workspace files (WAF)
- Expose LSMComponents files suffixes to Asterix
Change-Id: I760074764755e7aee100ff33c14b13bf4b29ec2e
Reviewed-on: https://asterix-gerrit.ics.uci.edu/337
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
index de16342..a0cca95 100644
--- a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
+++ b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
@@ -54,4 +54,6 @@
public void sync(IFileHandle fileHandle, boolean metadata) throws HyracksDataException;
public void setExecutor(Executor executor);
+
+ public void deleteWorkspaceFiles();
}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/replication/impl/AbstractReplicationJob.java b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/replication/impl/AbstractReplicationJob.java
index 20f1410..de0f1f8 100644
--- a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/replication/impl/AbstractReplicationJob.java
+++ b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/replication/impl/AbstractReplicationJob.java
@@ -23,19 +23,19 @@
import org.apache.hyracks.api.replication.IReplicationJob;
public abstract class AbstractReplicationJob implements IReplicationJob {
-
private final Set<String> filesToReplicate;
private final ReplicationOperation operation;
private final ReplicationExecutionType executionType;
private final ReplicationJobType jobType;
- public AbstractReplicationJob(ReplicationJobType jobType, ReplicationOperation operation, ReplicationExecutionType executionType, Set<String> filesToReplicate){
+ public AbstractReplicationJob(ReplicationJobType jobType, ReplicationOperation operation,
+ ReplicationExecutionType executionType, Set<String> filesToReplicate) {
this.jobType = jobType;
this.operation = operation;
this.executionType = executionType;
this.filesToReplicate = filesToReplicate;
}
-
+
@Override
public Set<String> getJobFiles() {
return filesToReplicate;
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
index c4e462d..3b88f02 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.control.nc.io;
import java.io.File;
+import java.io.FilenameFilter;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -35,12 +36,10 @@
import org.apache.hyracks.api.io.IODeviceHandle;
public class IOManager implements IIOManager {
+ private static final String WORKSPACE_FILE_SUFFIX = ".waf";
private final List<IODeviceHandle> ioDevices;
-
private Executor executor;
-
private final List<IODeviceHandle> workAreaIODevices;
-
private int workAreaDeviceIndex;
public IOManager(List<IODeviceHandle> devices, Executor executor) throws HyracksException {
@@ -167,7 +166,7 @@
String waPath = dev.getWorkAreaPath();
File waf;
try {
- waf = File.createTempFile(prefix, ".waf", new File(dev.getPath(), waPath));
+ waf = File.createTempFile(prefix, WORKSPACE_FILE_SUFFIX, new File(dev.getPath(), waPath));
} catch (IOException e) {
throw new HyracksDataException(e);
}
@@ -256,4 +255,23 @@
throw new HyracksDataException(e);
}
}
+
+ @Override
+ public void deleteWorkspaceFiles() {
+ for (IODeviceHandle ioDevice : workAreaIODevices) {
+ File workspaceFolder = new File(ioDevice.getPath(), ioDevice.getWorkAreaPath());
+ if (workspaceFolder.exists() && workspaceFolder.isDirectory()) {
+ File[] workspaceFiles = workspaceFolder.listFiles(WORKSPACE_FILES_FILTER);
+ for (File workspaceFile : workspaceFiles) {
+ workspaceFile.delete();
+ }
+ }
+ }
+ }
+
+ private static final FilenameFilter WORKSPACE_FILES_FILTER = new FilenameFilter() {
+ public boolean accept(File dir, String name) {
+ return name.endsWith(WORKSPACE_FILE_SUFFIX);
+ }
+ };
}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
index c3031c6..2be4e4a 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
@@ -40,7 +40,7 @@
import org.apache.hyracks.storage.common.file.IFileMapProvider;
public class LSMBTreeFileManager extends AbstractLSMIndexFileManager {
- private static final String BTREE_STRING = "b";
+ public static final String BTREE_STRING = "b";
private final TreeIndexFactory<? extends ITreeIndex> btreeFactory;
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyFileManager.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyFileManager.java
index df5addb..ed39630 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyFileManager.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyFileManager.java
@@ -39,8 +39,8 @@
import org.apache.hyracks.storage.common.file.IFileMapProvider;
public class LSMBTreeWithBuddyFileManager extends AbstractLSMIndexFileManager {
- private static final String BUDDY_BTREE_STRING = "buddy";
- private static final String BTREE_STRING = "b";
+ public static final String BUDDY_BTREE_STRING = "buddy";
+ public static final String BTREE_STRING = "b";
private final TreeIndexFactory<? extends ITreeIndex> btreeFactory;
private final TreeIndexFactory<? extends ITreeIndex> buddyBtreeFactory;
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
index d0d7c26..55e016b 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
@@ -16,15 +16,16 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.hyracks.storage.am.lsm.common.api;
import java.util.List;
+
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.common.api.IIndexCursor;
import org.apache.hyracks.storage.am.common.api.ISearchPredicate;
import org.apache.hyracks.storage.am.common.api.IndexException;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMOperationType;
public interface ILSMHarness {
@@ -57,10 +58,10 @@
public void addBulkLoadedComponent(ILSMComponent index) throws HyracksDataException, IndexException;
public ILSMOperationTracker getOperationTracker();
-
- public void scheduleReplication(ILSMIndexOperationContext ctx, List<ILSMComponent> lsmComponents, boolean bulkload)
- throws HyracksDataException;
-
+
+ public void scheduleReplication(ILSMIndexOperationContext ctx, List<ILSMComponent> lsmComponents, boolean bulkload,
+ LSMOperationType opType) throws HyracksDataException;
+
public void endReplication(ILSMIndexOperationContext ctx) throws HyracksDataException;
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
index 6c6dc9e..97dfa97 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
@@ -16,7 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.hyracks.storage.am.lsm.common.api;
import java.util.List;
@@ -25,6 +24,7 @@
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.common.api.IIndexAccessor;
import org.apache.hyracks.storage.am.common.api.IndexException;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMOperationType;
/**
* Client handle for performing operations
@@ -118,6 +118,7 @@
public void forceInsert(ITupleReference tuple) throws HyracksDataException, IndexException;
public void forceDelete(ITupleReference tuple) throws HyracksDataException, IndexException;
-
- public void scheduleReplication(List<ILSMComponent> lsmComponents, boolean bulkload) throws HyracksDataException;
+
+ public void scheduleReplication(List<ILSMComponent> lsmComponents, boolean bulkload, LSMOperationType opType)
+ throws HyracksDataException;
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexInternal.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexInternal.java
index 3f08406..59a035f 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexInternal.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexInternal.java
@@ -16,7 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.hyracks.storage.am.lsm.common.api;
import java.util.List;
@@ -30,6 +29,7 @@
import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
import org.apache.hyracks.storage.am.common.api.ISearchPredicate;
import org.apache.hyracks.storage.am.common.api.IndexException;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMOperationType;
public interface ILSMIndexInternal extends ILSMIndex {
@Override
@@ -85,8 +85,8 @@
public void markAsValid(ILSMComponent lsmComponent) throws HyracksDataException;
public boolean isCurrentMutableComponentEmpty() throws HyracksDataException;
-
+
public void scheduleReplication(ILSMIndexOperationContext ctx, List<ILSMComponent> lsmComponents, boolean bulkload,
- ReplicationOperation operation) throws HyracksDataException;
+ ReplicationOperation operation, LSMOperationType opType) throws HyracksDataException;
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexReplicationJob.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexReplicationJob.java
index b32093e..f6057ed 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexReplicationJob.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexReplicationJob.java
@@ -20,8 +20,16 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.replication.IReplicationJob;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMOperationType;
public interface ILSMIndexReplicationJob extends IReplicationJob {
public void endReplication() throws HyracksDataException;
+
+ public ILSMIndexOperationContext getLSMIndexOperationContext();
+
+ public ILSMIndex getLSMIndex();
+
+ public LSMOperationType getLSMOpType();
+
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index 3a7c632..3d3f959 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -303,12 +303,12 @@
public void addInactiveDiskComponent(ILSMComponent diskComponent) {
inactiveDiskComponents.add(diskComponent);
}
-
+
public abstract Set<String> getLSMComponentPhysicalFiles(ILSMComponent newComponent);
@Override
public void scheduleReplication(ILSMIndexOperationContext ctx, List<ILSMComponent> lsmComponents, boolean bulkload,
- ReplicationOperation operation) throws HyracksDataException {
+ ReplicationOperation operation, LSMOperationType opType) throws HyracksDataException {
//get set of files to be replicated for this component
Set<String> componentFiles = new HashSet<String>();
@@ -325,13 +325,12 @@
}
//create replication job and submit it
- LSMIndexReplicationJob job = new LSMIndexReplicationJob(this, ctx, componentFiles, operation,
- executionType);
+ LSMIndexReplicationJob job = new LSMIndexReplicationJob(this, ctx, componentFiles, operation, executionType,
+ opType);
try {
- diskBufferCache.getIIOReplicationManager().submitJob(job);
+ diskBufferCache.getIOReplicationManager().submitJob(job);
} catch (IOException e) {
throw new HyracksDataException(e);
}
-
}
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
index e0a48d7..3ad1396 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
@@ -46,7 +46,7 @@
public abstract class AbstractLSMIndexFileManager implements ILSMIndexFileManager {
- protected static final String SPLIT_STRING = "_";
+ public static final String SPLIT_STRING = "_";
protected static final String BLOOM_FILTER_STRING = "f";
protected static final String TRANSACTION_PREFIX = ".T";
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java
index 5721c07..c2aff6e 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java
@@ -41,7 +41,8 @@
public class ExternalIndexHarness extends LSMHarness {
private static final Logger LOGGER = Logger.getLogger(ExternalIndexHarness.class.getName());
- public ExternalIndexHarness(ILSMIndexInternal lsmIndex, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, boolean replicationEnabled) {
+ public ExternalIndexHarness(ILSMIndexInternal lsmIndex, ILSMMergePolicy mergePolicy,
+ ILSMOperationTracker opTracker, boolean replicationEnabled) {
super(lsmIndex, mergePolicy, opTracker, replicationEnabled);
}
@@ -119,7 +120,8 @@
if (replicationEnabled) {
componentsToBeReplicated.clear();
componentsToBeReplicated.add(c);
- lsmIndex.scheduleReplication(null, componentsToBeReplicated, false, ReplicationOperation.DELETE);
+ lsmIndex.scheduleReplication(null, componentsToBeReplicated, false,
+ ReplicationOperation.DELETE, opType);
}
((AbstractDiskLSMComponent) c).destroy();
break;
@@ -137,7 +139,7 @@
if (replicationEnabled) {
componentsToBeReplicated.clear();
componentsToBeReplicated.add(newComponent);
- triggerReplication(componentsToBeReplicated, false);
+ triggerReplication(componentsToBeReplicated, false, opType);
}
mergePolicy.diskComponentAdded(lsmIndex, fullMergeIsRequested.get());
}
@@ -241,7 +243,7 @@
if (replicationEnabled) {
componentsToBeReplicated.clear();
componentsToBeReplicated.add(c);
- triggerReplication(componentsToBeReplicated, true);
+ triggerReplication(componentsToBeReplicated, true, LSMOperationType.MERGE);
}
// Enter the component
enterComponent(c);
@@ -334,7 +336,7 @@
if (replicationEnabled) {
componentsToBeReplicated.clear();
componentsToBeReplicated.add(diskComponent);
- lsmIndex.scheduleReplication(null, componentsToBeReplicated, false, ReplicationOperation.DELETE);
+ lsmIndex.scheduleReplication(null, componentsToBeReplicated, false, ReplicationOperation.DELETE, null);
}
((AbstractDiskLSMComponent) diskComponent).destroy();
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index f78e6d3..bee62ed 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -55,8 +55,9 @@
protected final AtomicBoolean fullMergeIsRequested;
protected final boolean replicationEnabled;
protected List<ILSMComponent> componentsToBeReplicated;
-
- public LSMHarness(ILSMIndexInternal lsmIndex, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, boolean replicationEnabled) {
+
+ public LSMHarness(ILSMIndexInternal lsmIndex, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker,
+ boolean replicationEnabled) {
this.lsmIndex = lsmIndex;
this.opTracker = opTracker;
this.mergePolicy = mergePolicy;
@@ -230,7 +231,7 @@
if (replicationEnabled) {
componentsToBeReplicated.clear();
componentsToBeReplicated.add(newComponent);
- triggerReplication(componentsToBeReplicated, false);
+ triggerReplication(componentsToBeReplicated, false, opType);
}
mergePolicy.diskComponentAdded(lsmIndex, false);
}
@@ -242,7 +243,7 @@
if (replicationEnabled) {
componentsToBeReplicated.clear();
componentsToBeReplicated.add(newComponent);
- triggerReplication(componentsToBeReplicated, false);
+ triggerReplication(componentsToBeReplicated, false, opType);
}
mergePolicy.diskComponentAdded(lsmIndex, fullMergeIsRequested.get());
}
@@ -296,9 +297,9 @@
//schedule a replication job to delete these inactive disk components from replicas
if (replicationEnabled) {
lsmIndex.scheduleReplication(null, inactiveDiskComponentsToBeDeleted, false,
- ReplicationOperation.DELETE);
+ ReplicationOperation.DELETE, opType);
}
-
+
for (ILSMComponent c : inactiveDiskComponentsToBeDeleted) {
((AbstractDiskLSMComponent) c).destroy();
}
@@ -460,7 +461,7 @@
if (replicationEnabled) {
componentsToBeReplicated.clear();
componentsToBeReplicated.add(c);
- triggerReplication(componentsToBeReplicated, true);
+ triggerReplication(componentsToBeReplicated, true, LSMOperationType.MERGE);
}
mergePolicy.diskComponentAdded(lsmIndex, false);
}
@@ -470,23 +471,24 @@
public ILSMOperationTracker getOperationTracker() {
return opTracker;
}
-
- protected void triggerReplication(List<ILSMComponent> lsmComponents, boolean bulkload) throws HyracksDataException {
+
+ protected void triggerReplication(List<ILSMComponent> lsmComponents, boolean bulkload, LSMOperationType opType)
+ throws HyracksDataException {
ILSMIndexAccessorInternal accessor = lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
- accessor.scheduleReplication(lsmComponents, bulkload);
+ accessor.scheduleReplication(lsmComponents, bulkload, opType);
}
-
+
@Override
- public void scheduleReplication(ILSMIndexOperationContext ctx, List<ILSMComponent> lsmComponents, boolean bulkload)
- throws HyracksDataException {
+ public void scheduleReplication(ILSMIndexOperationContext ctx, List<ILSMComponent> lsmComponents, boolean bulkload,
+ LSMOperationType opType) throws HyracksDataException {
//enter the LSM components to be replicated to prevent them from being deleted until they are replicated
if (!getAndEnterComponents(ctx, LSMOperationType.REPLICATE, false)) {
return;
}
- lsmIndex.scheduleReplication(ctx, lsmComponents, bulkload, ReplicationOperation.REPLICATE);
+ lsmIndex.scheduleReplication(ctx, lsmComponents, bulkload, ReplicationOperation.REPLICATE, opType);
}
@Override
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexReplicationJob.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexReplicationJob.java
index 3b7a0bc..b520e8e3 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexReplicationJob.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexReplicationJob.java
@@ -22,25 +22,44 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.replication.impl.AbstractReplicationJob;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexReplicationJob;
public class LSMIndexReplicationJob extends AbstractReplicationJob implements ILSMIndexReplicationJob {
- private final AbstractLSMIndex lsmIndex;
- private final ILSMIndexOperationContext ctx;
+ private final ILSMIndex lsmIndex;
+ private final ILSMIndexOperationContext operationContext;
+ private final LSMOperationType LSMOpType;
- public LSMIndexReplicationJob(AbstractLSMIndex lsmIndex, ILSMIndexOperationContext ctx,
- Set<String> filesToReplicate, ReplicationOperation operation, ReplicationExecutionType executionType) {
+ public LSMIndexReplicationJob(ILSMIndex lsmIndex, ILSMIndexOperationContext operationContext,
+ Set<String> filesToReplicate, ReplicationOperation operation, ReplicationExecutionType executionType,
+ LSMOperationType opType) {
super(ReplicationJobType.LSM_COMPONENT, operation, executionType, filesToReplicate);
this.lsmIndex = lsmIndex;
- this.ctx = ctx;
+ this.operationContext = operationContext;
+ this.LSMOpType = opType;
}
@Override
public void endReplication() throws HyracksDataException {
- if (ctx != null) {
- lsmIndex.lsmHarness.endReplication(ctx);
+ if (operationContext != null) {
+ ((AbstractLSMIndex) (lsmIndex)).lsmHarness.endReplication(operationContext);
}
}
+
+ @Override
+ public ILSMIndex getLSMIndex() {
+ return lsmIndex;
+ }
+
+ @Override
+ public ILSMIndexOperationContext getLSMIndexOperationContext() {
+ return operationContext;
+ }
+
+ @Override
+ public LSMOperationType getLSMOpType() {
+ return LSMOpType;
+ }
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
index 05dbfd9..7913626 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
@@ -132,11 +132,11 @@
}
@Override
- public void scheduleReplication(List<ILSMComponent> lsmComponents, boolean bulkload) throws HyracksDataException {
+ public void scheduleReplication(List<ILSMComponent> lsmComponents, boolean bulkload, LSMOperationType opType) throws HyracksDataException {
ctx.setOperation(IndexOperation.REPLICATE);
ctx.getComponentsToBeReplicated().clear();
ctx.getComponentsToBeReplicated().addAll(lsmComponents);
- lsmHarness.scheduleReplication(ctx, lsmComponents, bulkload);
+ lsmHarness.scheduleReplication(ctx, lsmComponents, bulkload, opType);
}
@Override
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java
index 551d8c5..7dceb1d 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java
@@ -158,7 +158,7 @@
}
@Override
- public IIOReplicationManager getIIOReplicationManager() {
+ public IIOReplicationManager getIOReplicationManager() {
return null;
}
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java
index c6d3045..ff0c6a7 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java
@@ -384,7 +384,7 @@
}
@Override
- public IIOReplicationManager getIIOReplicationManager() {
+ public IIOReplicationManager getIOReplicationManager() {
return null;
}
}
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
index e2307ac..3866c80 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
@@ -16,7 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.hyracks.storage.am.lsm.invertedindex.impls;
import java.util.List;
@@ -33,6 +32,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessorInternal;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMOperationType;
import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexAccessor;
import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
@@ -100,13 +100,14 @@
ctx.getComponentsToBeMerged().addAll(components);
lsmHarness.scheduleMerge(ctx, callback);
}
-
+
@Override
- public void scheduleReplication(List<ILSMComponent> lsmComponents, boolean bulkload) throws HyracksDataException {
+ public void scheduleReplication(List<ILSMComponent> lsmComponents, boolean bulkload, LSMOperationType opType)
+ throws HyracksDataException {
ctx.setOperation(IndexOperation.REPLICATE);
ctx.getComponentsToBeReplicated().clear();
ctx.getComponentsToBeReplicated().addAll(lsmComponents);
- lsmHarness.scheduleReplication(ctx, lsmComponents, bulkload);
+ lsmHarness.scheduleReplication(ctx, lsmComponents, bulkload, opType);
}
@Override
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java
index 89b5d26..6c699da 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java
@@ -38,9 +38,9 @@
// TODO: Refactor for better code sharing with other file managers.
public class LSMInvertedIndexFileManager extends AbstractLSMIndexFileManager implements IInvertedIndexFileNameMapper {
- private static final String DICT_BTREE_SUFFIX = "b";
- private static final String INVLISTS_SUFFIX = "i";
- private static final String DELETED_KEYS_BTREE_SUFFIX = "d";
+ public static final String DICT_BTREE_SUFFIX = "b";
+ public static final String INVLISTS_SUFFIX = "i";
+ public static final String DELETED_KEYS_BTREE_SUFFIX = "d";
// We only need a BTree factory because the inverted indexes consistency is validated against its dictionary BTree.
private final BTreeFactory btreeFactory;
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFileManager.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFileManager.java
index 2cc3f73..4f50980 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFileManager.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFileManager.java
@@ -40,8 +40,8 @@
import org.apache.hyracks.storage.common.file.IFileMapProvider;
public class LSMRTreeFileManager extends AbstractLSMIndexFileManager {
- private static final String RTREE_STRING = "r";
- private static final String BTREE_STRING = "b";
+ public static final String RTREE_STRING = "r";
+ public static final String BTREE_STRING = "b";
private final TreeIndexFactory<? extends ITreeIndex> rtreeFactory;
private final TreeIndexFactory<? extends ITreeIndex> btreeFactory;
diff --git a/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java b/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
index facd8c1..58ba988 100644
--- a/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
@@ -832,7 +832,7 @@
}
@Override
- public IIOReplicationManager getIIOReplicationManager() {
+ public IIOReplicationManager getIOReplicationManager() {
return ioReplicationManager;
}
}
diff --git a/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/DebugBufferCache.java b/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/DebugBufferCache.java
index 2211af7..10ce65a 100644
--- a/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/DebugBufferCache.java
+++ b/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/DebugBufferCache.java
@@ -204,7 +204,7 @@
}
@Override
- public IIOReplicationManager getIIOReplicationManager() {
+ public IIOReplicationManager getIOReplicationManager() {
return null;
}
}
diff --git a/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IBufferCache.java b/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IBufferCache.java
index 24d6255..dc4ee3b 100644
--- a/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IBufferCache.java
+++ b/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IBufferCache.java
@@ -56,9 +56,9 @@
public int getFileReferenceCount(int fileId);
public void close() throws HyracksDataException;
-
+
public boolean isReplicationEnabled();
- public IIOReplicationManager getIIOReplicationManager();
+ public IIOReplicationManager getIOReplicationManager();
}