Introducing data replication API to LSM indexes
Change-Id: I80565fc9d74e30440d2df5917911904ba8f33c25
Reviewed-on: https://asterix-gerrit.ics.uci.edu/322
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/replication/IIOReplicationManager.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/replication/IIOReplicationManager.java
new file mode 100644
index 0000000..d2e7011
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/replication/IIOReplicationManager.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.replication;
+
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
+
+public interface IIOReplicationManager extends ILifeCycleComponent {
+
+ public void submitJob(IReplicationJob job) throws IOException;
+
+ public boolean isReplicationEnabled();
+
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/replication/IReplicationJob.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/replication/IReplicationJob.java
new file mode 100644
index 0000000..4d3b58d
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/replication/IReplicationJob.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.replication;
+
+import java.util.Set;
+
+public interface IReplicationJob {
+
+ public enum ReplicationJobType {
+ LSM_COMPONENT,
+ METADATA
+ }
+
+ public enum ReplicationOperation {
+ REPLICATE,
+ DELETE,
+ STOP
+ }
+
+ public enum ReplicationExecutionType {
+ ASYNC,
+ SYNC
+ }
+
+ public ReplicationJobType getJobType();
+
+ public ReplicationOperation getOperation();
+
+ public ReplicationExecutionType getExecutionType();
+
+ public Set<String> getJobFiles();
+
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/replication/impl/AbstractReplicationJob.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/replication/impl/AbstractReplicationJob.java
new file mode 100644
index 0000000..f73c9e1
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/replication/impl/AbstractReplicationJob.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.replication.impl;
+
+import java.util.Set;
+
+import edu.uci.ics.hyracks.api.replication.IReplicationJob;
+
+public abstract class AbstractReplicationJob implements IReplicationJob {
+
+ private final Set<String> filesToReplicate;
+ private final ReplicationOperation operation;
+ private final ReplicationExecutionType executionType;
+ private final ReplicationJobType jobType;
+
+ public AbstractReplicationJob(ReplicationJobType jobType, ReplicationOperation operation, ReplicationExecutionType executionType, Set<String> filesToReplicate){
+ this.jobType = jobType;
+ this.operation = operation;
+ this.executionType = executionType;
+ this.filesToReplicate = filesToReplicate;
+ }
+
+ @Override
+ public Set<String> getJobFiles() {
+ return filesToReplicate;
+ }
+
+ @Override
+ public ReplicationOperation getOperation() {
+ return operation;
+ }
+
+ @Override
+ public ReplicationExecutionType getExecutionType() {
+ return executionType;
+ }
+
+ @Override
+ public ReplicationJobType getJobType() {
+ return jobType;
+ }
+}
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/IIndexLifecycleManager.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/IIndexLifecycleManager.java
index 938464f..de4ce5a 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/IIndexLifecycleManager.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/IIndexLifecycleManager.java
@@ -30,4 +30,5 @@
public void close(long resourceID) throws HyracksDataException;
public List<IIndex> getOpenIndexes();
+
}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/frames/LIFOMetaDataFrame.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/frames/LIFOMetaDataFrame.java
index d23d484..5cc7602 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/frames/LIFOMetaDataFrame.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/frames/LIFOMetaDataFrame.java
@@ -41,7 +41,7 @@
// We store the page id that will be used to store the information of the the filter that is associated with a disk component.
// It is only set in the first meta page other meta pages (i.e., with level -2) have junk in the max page field.
private static final int additionalFilteringPageOff = validOff + 4; // 29
- protected static final int lsnOff = additionalFilteringPageOff + 4; // 33
+ public static final int lsnOff = additionalFilteringPageOff + 4; // 33
protected ICachedPage page = null;
protected ByteBuffer buf = null;
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/IndexOperation.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/IndexOperation.java
index b5c0a1d..9d00adb 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/IndexOperation.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/IndexOperation.java
@@ -26,5 +26,6 @@
NOOP,
MERGE,
FULL_MERGE,
- FLUSH
+ FLUSH,
+ REPLICATE
}
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/util/IndexFileNameUtil.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/util/IndexFileNameUtil.java
index 5b3ccb7..1fdfe28 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/util/IndexFileNameUtil.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/util/IndexFileNameUtil.java
@@ -19,7 +19,9 @@
public class IndexFileNameUtil {
+ public static final String IO_DEVICE_NAME_PREFIX = "device_id_";
+
public static String prepareFileName(String path, int ioDeviceId) {
- return path + File.separator + "device_id_" + ioDeviceId;
+ return path + File.separator + IO_DEVICE_NAME_PREFIX + ioDeviceId;
}
}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java
index 088a15b..f350e51 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java
@@ -440,6 +440,9 @@
case FULL_MERGE:
operationalComponents.addAll(immutableComponents);
break;
+ case REPLICATE:
+ operationalComponents.addAll(ctx.getComponentsToBeReplicated());
+ break;
default:
throw new UnsupportedOperationException("Operation " + ctx.getOperation() + " not supported.");
}
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/ExternalBTreeOpContext.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/ExternalBTreeOpContext.java
index 68ec7d1..a51e460 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/ExternalBTreeOpContext.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/ExternalBTreeOpContext.java
@@ -39,6 +39,7 @@
public final ISearchOperationCallback searchCallback;
private final List<ILSMComponent> componentHolder;
private final List<ILSMComponent> componentsToBeMerged;
+ private final List<ILSMComponent> componentsToBeReplicated;
private final int targetIndexVersion;
public ISearchPredicate searchPredicate;
@@ -63,6 +64,7 @@
}
this.componentHolder = new LinkedList<ILSMComponent>();
this.componentsToBeMerged = new LinkedList<ILSMComponent>();
+ this.componentsToBeReplicated = new LinkedList<ILSMComponent>();
this.searchCallback = searchCallback;
this.targetIndexVersion = targetIndexVersion;
}
@@ -77,6 +79,7 @@
public void reset() {
componentHolder.clear();
componentsToBeMerged.clear();
+ componentsToBeReplicated.clear();
}
public IndexOperation getOperation() {
@@ -123,5 +126,10 @@
public ISearchPredicate getSearchPredicate() {
return searchPredicate;
}
+
+ @Override
+ public List<ILSMComponent> getComponentsToBeReplicated() {
+ return componentsToBeReplicated;
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java
index 1a4b85f..184ac9b 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java
@@ -15,8 +15,10 @@
package edu.uci.ics.hyracks.storage.am.lsm.btree.impls;
import java.io.IOException;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
+import java.util.Set;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -479,6 +481,8 @@
break;
case FULL_MERGE:
operationalComponents.addAll(immutableComponents);
+ case REPLICATE:
+ operationalComponents.addAll(ctx.getComponentsToBeReplicated());
break;
case FLUSH:
// Do nothing. this is left here even though the index never
@@ -878,4 +882,16 @@
public boolean isPrimaryIndex() {
return false;
}
+
+ @Override
+ public Set<String> getLSMComponentPhysicalFiles(ILSMComponent lsmComponent) {
+ Set<String> files = new HashSet<String>();
+
+ LSMBTreeWithBuddyDiskComponent component = (LSMBTreeWithBuddyDiskComponent) lsmComponent;
+ files.add(component.getBTree().getFileReference().toString());
+ files.add(component.getBuddyBTree().getFileReference().toString());
+ files.add(component.getBloomFilter().getFileReference().toString());
+
+ return files;
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddyOpContext.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddyOpContext.java
index 645810a..e898e3b 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddyOpContext.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddyOpContext.java
@@ -32,6 +32,7 @@
private MultiComparator buddyBTreeCmp;
public final List<ILSMComponent> componentHolder;
private final List<ILSMComponent> componentsToBeMerged;
+ private final List<ILSMComponent> componentsToBeReplicated;
public final ISearchOperationCallback searchCallback;
private final int targetIndexVersion;
public ISearchPredicate searchPredicate;
@@ -42,6 +43,7 @@
this.componentHolder = new LinkedList<ILSMComponent>();
this.componentsToBeMerged = new LinkedList<ILSMComponent>();
+ this.componentsToBeReplicated = new LinkedList<ILSMComponent>();
this.searchCallback = searchCallback;
this.targetIndexVersion = targetIndexVersion;
this.bTreeCmp = MultiComparator.create(btreeCmpFactories);
@@ -62,6 +64,7 @@
public void reset() {
componentHolder.clear();
componentsToBeMerged.clear();
+ componentsToBeReplicated.clear();
}
@Override
@@ -111,4 +114,9 @@
public ISearchPredicate getSearchPredicate() {
return searchPredicate;
}
+
+ @Override
+ public List<ILSMComponent> getComponentsToBeReplicated() {
+ return componentsToBeReplicated;
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index 6d2f78c..b1b6837 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -17,7 +17,9 @@
import java.io.File;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -332,6 +334,9 @@
case FULL_MERGE:
operationalComponents.addAll(immutableComponents);
break;
+ case REPLICATE:
+ operationalComponents.addAll(ctx.getComponentsToBeReplicated());
+ break;
default:
throw new UnsupportedOperationException("Operation " + ctx.getOperation() + " not supported.");
}
@@ -845,4 +850,16 @@
public boolean isPrimaryIndex() {
return needKeyDupCheck;
}
+
+ @Override
+ public Set<String> getLSMComponentPhysicalFiles(ILSMComponent lsmComponent) {
+
+ Set<String> files = new HashSet<String>();
+ LSMBTreeDiskComponent component = (LSMBTreeDiskComponent) lsmComponent;
+
+ files.add(component.getBTree().getFileReference().toString());
+ files.add(component.getBloomFilter().getFileReference().toString());
+
+ return files;
+ }
}
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
index a2598ea..419a765 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
@@ -51,6 +51,7 @@
public final ISearchOperationCallback searchCallback;
private final List<ILSMComponent> componentHolder;
private final List<ILSMComponent> componentsToBeMerged;
+ private final List<ILSMComponent> componentsToBeReplicated;
public final PermutingTupleReference indexTuple;
public final MultiComparator filterCmp;
public final PermutingTupleReference filterTuple;
@@ -92,6 +93,7 @@
}
this.componentHolder = new LinkedList<ILSMComponent>();
this.componentsToBeMerged = new LinkedList<ILSMComponent>();
+ this.componentsToBeReplicated = new LinkedList<ILSMComponent>();
this.modificationCallback = modificationCallback;
this.searchCallback = searchCallback;
@@ -126,6 +128,7 @@
public void reset() {
componentHolder.clear();
componentsToBeMerged.clear();
+ componentsToBeReplicated.clear();
}
public IndexOperation getOperation() {
@@ -187,4 +190,9 @@
public ISearchPredicate getSearchPredicate() {
return searchPredicate;
}
+
+ @Override
+ public List<ILSMComponent> getComponentsToBeReplicated() {
+ return componentsToBeReplicated;
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java
index 1903998..a5e57c1 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java
@@ -15,6 +15,8 @@
package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+import java.util.List;
+
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
@@ -52,4 +54,10 @@
public void addBulkLoadedComponent(ILSMComponent index) throws HyracksDataException, IndexException;
public ILSMOperationTracker getOperationTracker();
+
+ public void scheduleReplication(ILSMIndexOperationContext ctx, List<ILSMComponent> lsmComponents, boolean bulkload)
+ throws HyracksDataException;
+
+ public void endReplication(ILSMIndexOperationContext ctx) throws HyracksDataException;
+
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
index 36a2ca1..2b0d64b 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
@@ -114,4 +114,6 @@
public void forceInsert(ITupleReference tuple) throws HyracksDataException, IndexException;
public void forceDelete(ITupleReference tuple) throws HyracksDataException, IndexException;
+
+ public void scheduleReplication(List<ILSMComponent> lsmComponents, boolean bulkload) throws HyracksDataException;
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexInternal.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexInternal.java
index be67611..04f7760 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexInternal.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexInternal.java
@@ -18,6 +18,7 @@
import java.util.List;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.replication.IReplicationJob.ReplicationOperation;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexOperationContext;
@@ -80,5 +81,8 @@
public void markAsValid(ILSMComponent lsmComponent) throws HyracksDataException;
public boolean isCurrentMutableComponentEmpty() throws HyracksDataException;
+
+ public void scheduleReplication(ILSMIndexOperationContext ctx, List<ILSMComponent> lsmComponents, boolean bulkload,
+ ReplicationOperation operation) throws HyracksDataException;
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java
index 954ae1b..86a145c 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java
@@ -35,4 +35,6 @@
public void setSearchPredicate(ISearchPredicate searchPredicate);
public ISearchPredicate getSearchPredicate();
+
+ public List<ILSMComponent> getComponentsToBeReplicated();
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexReplicationJob.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexReplicationJob.java
new file mode 100644
index 0000000..1791084
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexReplicationJob.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.replication.IReplicationJob;
+
+public interface ILSMIndexReplicationJob extends IReplicationJob {
+
+ public void endReplication() throws HyracksDataException;
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractDiskLSMComponent.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractDiskLSMComponent.java
index 8c1d826..d8dd7ea 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractDiskLSMComponent.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractDiskLSMComponent.java
@@ -37,6 +37,7 @@
switch (opType) {
case FORCE_MODIFICATION:
case MODIFICATION:
+ case REPLICATE:
case SEARCH:
readerCount++;
break;
@@ -68,6 +69,7 @@
}
case FORCE_MODIFICATION:
case MODIFICATION:
+ case REPLICATE:
case SEARCH:
readerCount--;
if (readerCount == 0 && state == ComponentState.READABLE_MERGING) {
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index a14e3a7..d947155 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -15,13 +15,18 @@
package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.replication.IReplicationJob.ReplicationExecutionType;
+import edu.uci.ics.hyracks.api.replication.IReplicationJob.ReplicationOperation;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrame;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
@@ -32,6 +37,7 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexFileManager;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexInternal;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
@@ -85,7 +91,7 @@
this.filterFields = filterFields;
this.inactiveDiskComponents = new LinkedList<ILSMComponent>();
this.durable = durable;
- lsmHarness = new LSMHarness(this, mergePolicy, opTracker);
+ lsmHarness = new LSMHarness(this, mergePolicy, opTracker, diskBufferCache.isReplicationEnabled());
isActivated = false;
diskComponents = new ArrayList<ILSMComponent>();
memoryComponents = new ArrayList<ILSMComponent>();
@@ -108,7 +114,7 @@
this.ioScheduler = ioScheduler;
this.ioOpCallback = ioOpCallback;
this.durable = durable;
- lsmHarness = new ExternalIndexHarness(this, mergePolicy, opTracker);
+ lsmHarness = new ExternalIndexHarness(this, mergePolicy, opTracker, diskBufferCache.isReplicationEnabled());
isActivated = false;
diskComponents = new LinkedList<ILSMComponent>();
this.inactiveDiskComponents = new LinkedList<ILSMComponent>();
@@ -293,4 +299,35 @@
public void addInactiveDiskComponent(ILSMComponent diskComponent) {
inactiveDiskComponents.add(diskComponent);
}
+
+ public abstract Set<String> getLSMComponentPhysicalFiles(ILSMComponent newComponent);
+
+ @Override
+ public void scheduleReplication(ILSMIndexOperationContext ctx, List<ILSMComponent> lsmComponents, boolean bulkload,
+ ReplicationOperation operation) throws HyracksDataException {
+ //get set of files to be replicated for this component
+ Set<String> componentFiles = new HashSet<String>();
+
+ //get set of files to be replicated for each component
+ for (ILSMComponent lsmComponent : lsmComponents) {
+ componentFiles.addAll(getLSMComponentPhysicalFiles(lsmComponent));
+ }
+
+ ReplicationExecutionType executionType;
+ if (bulkload) {
+ executionType = ReplicationExecutionType.SYNC;
+ } else {
+ executionType = ReplicationExecutionType.ASYNC;
+ }
+
+ //create replication job and submit it
+ LSMIndexReplicationJob job = new LSMIndexReplicationJob(this, ctx, componentFiles, operation,
+ executionType);
+ try {
+ diskBufferCache.getIIOReplicationManager().submitJob(job);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+
+ }
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractMemoryLSMComponent.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractMemoryLSMComponent.java
index 1d93331..c45ce58 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractMemoryLSMComponent.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractMemoryLSMComponent.java
@@ -82,6 +82,7 @@
}
}
break;
+ case REPLICATE:
case SEARCH:
if (state == ComponentState.READABLE_WRITABLE || state == ComponentState.READABLE_UNWRITABLE
|| state == ComponentState.READABLE_UNWRITABLE_FLUSHING) {
@@ -126,6 +127,7 @@
}
}
break;
+ case REPLICATE:
case SEARCH:
readerCount--;
if (state == ComponentState.UNREADABLE_UNWRITABLE && readerCount == 0) {
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java
index d82489e..7bf1c40 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java
@@ -19,6 +19,7 @@
import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.replication.IReplicationJob.ReplicationOperation;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
@@ -36,8 +37,8 @@
public class ExternalIndexHarness extends LSMHarness {
private static final Logger LOGGER = Logger.getLogger(ExternalIndexHarness.class.getName());
- public ExternalIndexHarness(ILSMIndexInternal lsmIndex, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker) {
- super(lsmIndex, mergePolicy, opTracker);
+ public ExternalIndexHarness(ILSMIndexInternal lsmIndex, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, boolean replicationEnabled) {
+ super(lsmIndex, mergePolicy, opTracker, replicationEnabled);
}
@Override
@@ -111,6 +112,11 @@
c.threadExit(opType, failedOperation, false);
switch (c.getState()) {
case INACTIVE:
+ if (replicationEnabled) {
+ componentsToBeReplicated.clear();
+ componentsToBeReplicated.add(c);
+ lsmIndex.scheduleReplication(null, componentsToBeReplicated, false, ReplicationOperation.DELETE);
+ }
((AbstractDiskLSMComponent) c).destroy();
break;
default:
@@ -124,6 +130,11 @@
if (newComponent != null) {
beforeSubsumeMergedComponents(newComponent, ctx.getComponentHolder());
lsmIndex.subsumeMergedComponents(newComponent, ctx.getComponentHolder());
+ if (replicationEnabled) {
+ componentsToBeReplicated.clear();
+ componentsToBeReplicated.add(newComponent);
+ triggerReplication(componentsToBeReplicated, false);
+ }
mergePolicy.diskComponentAdded(lsmIndex, fullMergeIsRequested.get());
}
break;
@@ -223,6 +234,11 @@
lsmIndex.markAsValid(c);
synchronized (opTracker) {
lsmIndex.addComponent(c);
+ if (replicationEnabled) {
+ componentsToBeReplicated.clear();
+ componentsToBeReplicated.add(c);
+ triggerReplication(componentsToBeReplicated, true);
+ }
// Enter the component
enterComponent(c);
mergePolicy.diskComponentAdded(lsmIndex, false);
@@ -311,6 +327,11 @@
private void exitComponent(ILSMComponent diskComponent) throws HyracksDataException {
diskComponent.threadExit(LSMOperationType.SEARCH, false, false);
if (diskComponent.getState() == ILSMComponent.ComponentState.INACTIVE) {
+ if (replicationEnabled) {
+ componentsToBeReplicated.clear();
+ componentsToBeReplicated.add(diskComponent);
+ lsmIndex.scheduleReplication(null, componentsToBeReplicated, false, ReplicationOperation.DELETE);
+ }
((AbstractDiskLSMComponent) diskComponent).destroy();
}
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index d861404..a5f3e33 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -15,6 +15,7 @@
package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -22,10 +23,12 @@
import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.replication.IReplicationJob.ReplicationOperation;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
@@ -33,6 +36,7 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMHarness;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessorInternal;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexInternal;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
@@ -45,12 +49,18 @@
protected final ILSMMergePolicy mergePolicy;
protected final ILSMOperationTracker opTracker;
protected final AtomicBoolean fullMergeIsRequested;
-
- public LSMHarness(ILSMIndexInternal lsmIndex, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker) {
+ protected final boolean replicationEnabled;
+ protected List<ILSMComponent> componentsToBeReplicated;
+
+ public LSMHarness(ILSMIndexInternal lsmIndex, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, boolean replicationEnabled) {
this.lsmIndex = lsmIndex;
this.opTracker = opTracker;
this.mergePolicy = mergePolicy;
fullMergeIsRequested = new AtomicBoolean();
+ this.replicationEnabled = replicationEnabled;
+ if (replicationEnabled) {
+ this.componentsToBeReplicated = new ArrayList<ILSMComponent>();
+ }
}
protected boolean getAndEnterComponents(ILSMIndexOperationContext ctx, LSMOperationType opType,
@@ -213,6 +223,11 @@
// newComponent is null if the flush op. was not performed.
if (newComponent != null) {
lsmIndex.addComponent(newComponent);
+ if (replicationEnabled) {
+ componentsToBeReplicated.clear();
+ componentsToBeReplicated.add(newComponent);
+ triggerReplication(componentsToBeReplicated, false);
+ }
mergePolicy.diskComponentAdded(lsmIndex, false);
}
break;
@@ -220,6 +235,11 @@
// newComponent is null if the merge op. was not performed.
if (newComponent != null) {
lsmIndex.subsumeMergedComponents(newComponent, ctx.getComponentHolder());
+ if (replicationEnabled) {
+ componentsToBeReplicated.clear();
+ componentsToBeReplicated.add(newComponent);
+ triggerReplication(componentsToBeReplicated, false);
+ }
mergePolicy.diskComponentAdded(lsmIndex, fullMergeIsRequested.get());
}
break;
@@ -269,6 +289,12 @@
*/
if (inactiveDiskComponentsToBeDeleted != null) {
try {
+ //schedule a replication job to delete these inactive disk components from replicas
+ if (replicationEnabled) {
+ lsmIndex.scheduleReplication(null, inactiveDiskComponentsToBeDeleted, false,
+ ReplicationOperation.DELETE);
+ }
+
for (ILSMComponent c : inactiveDiskComponentsToBeDeleted) {
((AbstractDiskLSMComponent) c).destroy();
}
@@ -427,6 +453,11 @@
lsmIndex.markAsValid(c);
synchronized (opTracker) {
lsmIndex.addComponent(c);
+ if (replicationEnabled) {
+ componentsToBeReplicated.clear();
+ componentsToBeReplicated.add(c);
+ triggerReplication(componentsToBeReplicated, true);
+ }
mergePolicy.diskComponentAdded(lsmIndex, false);
}
}
@@ -435,4 +466,31 @@
public ILSMOperationTracker getOperationTracker() {
return opTracker;
}
+
+ protected void triggerReplication(List<ILSMComponent> lsmComponents, boolean bulkload) throws HyracksDataException {
+ ILSMIndexAccessorInternal accessor = lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE,
+ NoOpOperationCallback.INSTANCE);
+ accessor.scheduleReplication(lsmComponents, bulkload);
+ }
+
+ @Override
+ public void scheduleReplication(ILSMIndexOperationContext ctx, List<ILSMComponent> lsmComponents, boolean bulkload)
+ throws HyracksDataException {
+
+ //enter the LSM components to be replicated to prevent them from being deleted until they are replicated
+ if (!getAndEnterComponents(ctx, LSMOperationType.REPLICATE, false)) {
+ return;
+ }
+
+ lsmIndex.scheduleReplication(ctx, lsmComponents, bulkload, ReplicationOperation.REPLICATE);
+ }
+
+ @Override
+ public void endReplication(ILSMIndexOperationContext ctx) throws HyracksDataException {
+ try {
+ exitComponents(ctx, LSMOperationType.REPLICATE, null, false);
+ } catch (IndexException e) {
+ throw new HyracksDataException(e);
+ }
+ }
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexReplicationJob.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexReplicationJob.java
new file mode 100644
index 0000000..232bfb2
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexReplicationJob.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import java.util.Set;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.replication.impl.AbstractReplicationJob;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexReplicationJob;
+
+public class LSMIndexReplicationJob extends AbstractReplicationJob implements ILSMIndexReplicationJob {
+
+ private final AbstractLSMIndex lsmIndex;
+ private final ILSMIndexOperationContext ctx;
+
+ public LSMIndexReplicationJob(AbstractLSMIndex lsmIndex, ILSMIndexOperationContext ctx,
+ Set<String> filesToReplicate, ReplicationOperation operation, ReplicationExecutionType executionType) {
+ super(ReplicationJobType.LSM_COMPONENT, operation, executionType, filesToReplicate);
+ this.lsmIndex = lsmIndex;
+ this.ctx = ctx;
+ }
+
+ @Override
+ public void endReplication() throws HyracksDataException {
+ if (ctx != null) {
+ lsmIndex.lsmHarness.endReplication(ctx);
+ }
+ }
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMOperationType.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMOperationType.java
index 853b6d0..070b0da 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMOperationType.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMOperationType.java
@@ -19,5 +19,6 @@
MODIFICATION,
FORCE_MODIFICATION,
FLUSH,
- MERGE
+ MERGE,
+ REPLICATE
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
index c828bd2..9d2cf36 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
@@ -126,6 +126,14 @@
ctx.getComponentsToBeMerged().addAll(components);
lsmHarness.scheduleMerge(ctx, callback);
}
+
+ @Override
+ public void scheduleReplication(List<ILSMComponent> lsmComponents, boolean bulkload) throws HyracksDataException {
+ ctx.setOperation(IndexOperation.REPLICATE);
+ ctx.getComponentsToBeReplicated().clear();
+ ctx.getComponentsToBeReplicated().addAll(lsmComponents);
+ lsmHarness.scheduleReplication(ctx, lsmComponents, bulkload);
+ }
@Override
public void scheduleFullMerge(ILSMIOOperationCallback callback) throws HyracksDataException, IndexException {
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java
index 3c009c1..63b3349 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java
@@ -16,6 +16,7 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.replication.IIOReplicationManager;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
import edu.uci.ics.hyracks.storage.common.file.IFileMapManager;
@@ -146,4 +147,13 @@
return 0;
}
+ @Override
+ public boolean isReplicationEnabled() {
+ return false;
+ }
+
+ @Override
+ public IIOReplicationManager getIIOReplicationManager() {
+ return null;
+ }
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java
index 80b2897..d5d04fb 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java
@@ -23,6 +23,7 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.replication.IIOReplicationManager;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import edu.uci.ics.hyracks.storage.common.buffercache.ICacheMemoryAllocator;
import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
@@ -371,4 +372,14 @@
public int getFileReferenceCount(int fileId) {
return 0;
}
+
+ @Override
+ public boolean isReplicationEnabled() {
+ return false;
+ }
+
+ @Override
+ public IIOReplicationManager getIIOReplicationManager() {
+ return null;
+ }
}
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
index 7bfb378..4250e6d 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
@@ -16,7 +16,9 @@
import java.io.File;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
@@ -324,6 +326,9 @@
case FULL_MERGE:
operationalComponents.addAll(immutableComponents);
break;
+ case REPLICATE:
+ operationalComponents.addAll(ctx.getComponentsToBeReplicated());
+ break;
default:
throw new UnsupportedOperationException("Operation " + ctx.getOperation() + " not supported.");
}
@@ -934,4 +939,19 @@
public boolean isPrimaryIndex() {
return false;
}
+
+ @Override
+ public Set<String> getLSMComponentPhysicalFiles(ILSMComponent lsmComponent) {
+ Set<String> files = new HashSet<String>();
+
+ LSMInvertedIndexDiskComponent invIndexComponent = (LSMInvertedIndexDiskComponent) lsmComponent;
+ OnDiskInvertedIndex invIndex = (OnDiskInvertedIndex) invIndexComponent.getInvIndex();
+
+ files.add(invIndex.getInvListsFile().toString());
+ files.add(invIndex.getBTree().toString());
+ files.add(invIndexComponent.getBloomFilter().getFileReference().toString());
+ files.add(invIndexComponent.getDeletedKeysBTree().getFileReference().toString());
+
+ return files;
+ }
}
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
index f21cc2a..03a2691 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
@@ -96,6 +96,14 @@
ctx.getComponentsToBeMerged().addAll(components);
lsmHarness.scheduleMerge(ctx, callback);
}
+
+ @Override
+ public void scheduleReplication(List<ILSMComponent> lsmComponents, boolean bulkload) throws HyracksDataException {
+ ctx.setOperation(IndexOperation.REPLICATE);
+ ctx.getComponentsToBeReplicated().clear();
+ ctx.getComponentsToBeReplicated().addAll(lsmComponents);
+ lsmHarness.scheduleReplication(ctx, lsmComponents, bulkload);
+ }
@Override
public void scheduleFullMerge(ILSMIOOperationCallback callback) throws HyracksDataException, IndexException {
@@ -171,4 +179,5 @@
throws HyracksDataException, IndexException {
throw new UnsupportedOperationException("Cannot open inverted list cursor on lsm inverted index.");
}
+
}
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java
index 7a8305d..1b1f65b 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java
@@ -38,6 +38,7 @@
private IndexOperation op;
private final List<ILSMComponent> componentHolder;
private final List<ILSMComponent> componentsToBeMerged;
+ private final List<ILSMComponent> componentsToBeReplicated;
public final IModificationOperationCallback modificationCallback;
public final ISearchOperationCallback searchCallback;
@@ -64,6 +65,7 @@
int[] invertedIndexFields, int[] filterFields) throws HyracksDataException {
this.componentHolder = new LinkedList<ILSMComponent>();
this.componentsToBeMerged = new LinkedList<ILSMComponent>();
+ this.componentsToBeReplicated = new LinkedList<ILSMComponent>();
this.modificationCallback = modificationCallback;
this.searchCallback = searchCallback;
@@ -105,6 +107,7 @@
public void reset() {
componentHolder.clear();
componentsToBeMerged.clear();
+ componentsToBeReplicated.clear();
}
@Override
@@ -154,4 +157,9 @@
public ISearchPredicate getSearchPredicate() {
return searchPredicate;
}
+
+ @Override
+ public List<ILSMComponent> getComponentsToBeReplicated() {
+ return componentsToBeReplicated;
+ }
}
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
index 4f866a7..4b93100 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
@@ -270,6 +270,9 @@
case FULL_MERGE:
operationalComponents.addAll(immutableComponents);
break;
+ case REPLICATE:
+ operationalComponents.addAll(ctx.getComponentsToBeReplicated());
+ break;
default:
throw new UnsupportedOperationException("Operation " + ctx.getOperation() + " not supported.");
}
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java
index 66a9250..ea66b46 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java
@@ -483,6 +483,9 @@
case FULL_MERGE:
operationalComponents.addAll(immutableComponents);
break;
+ case REPLICATE:
+ operationalComponents.addAll(ctx.getComponentsToBeReplicated());
+ break;
case FLUSH:
// Do nothing. this is left here even though the index never
// performs flushes because a flush is triggered by
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/ExternalRTreeOpContext.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/ExternalRTreeOpContext.java
index 0ce83dc..b49199d 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/ExternalRTreeOpContext.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/ExternalRTreeOpContext.java
@@ -32,6 +32,7 @@
private MultiComparator rTreeCmp;
public final List<ILSMComponent> componentHolder;
private final List<ILSMComponent> componentsToBeMerged;
+ private final List<ILSMComponent> componentsToBeReplicated;
public final ISearchOperationCallback searchCallback;
private final int targetIndexVersion;
public ISearchPredicate searchPredicate;
@@ -42,6 +43,7 @@
this.componentHolder = new LinkedList<ILSMComponent>();
this.componentsToBeMerged = new LinkedList<ILSMComponent>();
+ this.componentsToBeReplicated = new LinkedList<ILSMComponent>();
this.searchCallback = searchCallback;
this.targetIndexVersion = targetIndexVersion;
this.bTreeCmp = MultiComparator.create(btreeCmpFactories);
@@ -62,6 +64,7 @@
public void reset() {
componentHolder.clear();
componentsToBeMerged.clear();
+ componentsToBeReplicated.clear();
}
@Override
@@ -111,4 +114,9 @@
public ISearchPredicate getSearchPredicate() {
return searchPredicate;
}
+
+ @Override
+ public List<ILSMComponent> getComponentsToBeReplicated() {
+ return componentsToBeReplicated;
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
index 825874a..b84fb8b 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
@@ -16,7 +16,9 @@
package edu.uci.ics.hyracks.storage.am.lsm.rtree.impls;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
@@ -617,4 +619,18 @@
forceFlushDirtyPages(component.getBTree());
markAsValidInternal(component.getBTree());
}
+
+ @Override
+ public Set<String> getLSMComponentPhysicalFiles(ILSMComponent lsmComponent) {
+ Set<String> files = new HashSet<String>();
+
+ LSMRTreeDiskComponent component = (LSMRTreeDiskComponent) lsmComponent;
+
+ files.add(component.getBTree().getFileReference().toString());
+ files.add(component.getRTree().getFileReference().toString());
+ files.add(component.getBloomFilter().getFileReference().toString());
+
+ return files;
+ }
+
}
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java
index b2c939a..4ff05e6 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java
@@ -51,6 +51,7 @@
private IndexOperation op;
public final List<ILSMComponent> componentHolder;
private final List<ILSMComponent> componentsToBeMerged;
+ private final List<ILSMComponent> componentsToBeReplicated;
public final IModificationOperationCallback modificationCallback;
public final ISearchOperationCallback searchCallback;
public final PermutingTupleReference indexTuple;
@@ -86,6 +87,7 @@
currentBTreeOpContext = btreeOpContexts[0];
this.componentHolder = new LinkedList<ILSMComponent>();
this.componentsToBeMerged = new LinkedList<ILSMComponent>();
+ this.componentsToBeReplicated = new LinkedList<ILSMComponent>();
this.modificationCallback = modificationCallback;
this.searchCallback = searchCallback;
@@ -162,4 +164,9 @@
public ISearchPredicate getSearchPredicate() {
return searchPredicate;
}
+
+ @Override
+ public List<ILSMComponent> getComponentsToBeReplicated() {
+ return componentsToBeReplicated;
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
index f1703b1..4eb1709 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
@@ -16,7 +16,9 @@
package edu.uci.ics.hyracks.storage.am.lsm.rtree.impls;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
@@ -447,4 +449,15 @@
forceFlushDirtyPages(rtree);
markAsValidInternal(rtree);
}
+
+ @Override
+ public Set<String> getLSMComponentPhysicalFiles(ILSMComponent lsmComponent) {
+ Set<String> files = new HashSet<String>();
+
+ RTree rtree = ((LSMRTreeDiskComponent) lsmComponent).getRTree();
+ files.add(rtree.getFileReference().toString());
+
+ return files;
+ }
+
}
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
index f5b5b17..94ad801 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
@@ -35,6 +35,7 @@
import edu.uci.ics.hyracks.api.io.IFileHandle;
import edu.uci.ics.hyracks.api.io.IIOManager;
import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
+import edu.uci.ics.hyracks.api.replication.IIOReplicationManager;
import edu.uci.ics.hyracks.storage.common.file.BufferedFileHandle;
import edu.uci.ics.hyracks.storage.common.file.IFileMapManager;
@@ -55,9 +56,8 @@
private final CleanerThread cleanerThread;
private final Map<Integer, BufferedFileHandle> fileInfoMap;
private final Set<Integer> virtualFiles;
-
+ private IIOReplicationManager ioReplicationManager;
private List<ICachedPageInternal> cachedPages = new ArrayList<ICachedPageInternal>();
-
private boolean closed;
public BufferCache(IIOManager ioManager, IPageReplacementStrategy pageReplacementStrategy,
@@ -83,6 +83,15 @@
closed = false;
}
+ //this constructor is used when replication is enabled to pass the IIOReplicationManager
+ public BufferCache(IIOManager ioManager, IPageReplacementStrategy pageReplacementStrategy,
+ IPageCleanerPolicy pageCleanerPolicy, IFileMapManager fileMapManager, int maxOpenFiles,
+ ThreadFactory threadFactory, IIOReplicationManager ioReplicationManager) {
+
+ this(ioManager, pageReplacementStrategy, pageCleanerPolicy, fileMapManager, maxOpenFiles, threadFactory);
+ this.ioReplicationManager = ioReplicationManager;
+ }
+
@Override
public int getPageSize() {
return pageSize;
@@ -366,7 +375,7 @@
buffer.append("Buffer cache state\n");
buffer.append("Page Size: ").append(pageSize).append('\n');
buffer.append("Number of physical pages: ").append(pageReplacementStrategy.getMaxAllowedNumPages())
- .append('\n');
+ .append('\n');
buffer.append("Hash table size: ").append(pageMap.length).append('\n');
buffer.append("Page Map:\n");
int nCachedPages = 0;
@@ -379,10 +388,10 @@
buffer.append(" ").append(i).append('\n');
while (cp != null) {
buffer.append(" ").append(cp.cpid).append(" -> [")
- .append(BufferedFileHandle.getFileId(cp.dpid)).append(':')
- .append(BufferedFileHandle.getPageId(cp.dpid)).append(", ").append(cp.pinCount.get())
- .append(", ").append(cp.valid ? "valid" : "invalid").append(", ")
- .append(cp.dirty.get() ? "dirty" : "clean").append("]\n");
+ .append(BufferedFileHandle.getFileId(cp.dpid)).append(':')
+ .append(BufferedFileHandle.getPageId(cp.dpid)).append(", ").append(cp.pinCount.get())
+ .append(", ").append(cp.valid ? "valid" : "invalid").append(", ")
+ .append(cp.dirty.get() ? "dirty" : "clean").append("]\n");
cp = cp.next;
++nCachedPages;
}
@@ -808,4 +817,17 @@
public void dumpState(OutputStream os) throws IOException {
os.write(dumpState().getBytes());
}
+
+ @Override
+ public boolean isReplicationEnabled() {
+ if (ioReplicationManager != null) {
+ return ioReplicationManager.isReplicationEnabled();
+ }
+ return false;
+ }
+
+ @Override
+ public IIOReplicationManager getIIOReplicationManager() {
+ return ioReplicationManager;
+ }
}
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/DebugBufferCache.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/DebugBufferCache.java
index 3a88091..3716445 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/DebugBufferCache.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/DebugBufferCache.java
@@ -19,6 +19,7 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.replication.IIOReplicationManager;
/**
* Implementation of an IBufferCache that counts the number of pins/unpins,
@@ -192,5 +193,14 @@
public int getFileReferenceCount(int fileId) {
return bufferCache.getFileReferenceCount(fileId);
}
+
+ @Override
+ public boolean isReplicationEnabled() {
+ return false;
+ }
+ @Override
+ public IIOReplicationManager getIIOReplicationManager() {
+ return null;
+ }
}
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCache.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCache.java
index 478c641..957d19d 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCache.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCache.java
@@ -16,6 +16,7 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.replication.IIOReplicationManager;
public interface IBufferCache {
public void createFile(FileReference fileRef) throws HyracksDataException;
@@ -51,4 +52,9 @@
public int getFileReferenceCount(int fileId);
public void close() throws HyracksDataException;
+
+ public boolean isReplicationEnabled();
+
+ public IIOReplicationManager getIIOReplicationManager();
+
}
\ No newline at end of file