Added a new merge policy.
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
index 18a81e5..76b3a22 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
@@ -29,6 +29,7 @@
import edu.uci.ics.asterix.common.context.AsterixFileMapManager;
import edu.uci.ics.asterix.common.context.ConstantMergePolicy;
import edu.uci.ics.asterix.common.context.DatasetLifecycleManager;
+import edu.uci.ics.asterix.common.context.PrefixMergePolicy;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
@@ -110,7 +111,7 @@
AsynchronousScheduler.INSTANCE.init(ncApplicationContext.getThreadFactory());
lsmIOScheduler = AsynchronousScheduler.INSTANCE;
- mergePolicy = new ConstantMergePolicy(storageProperties.getLSMIndexMergeThreshold(), this);
+ mergePolicy = new PrefixMergePolicy(10485760, 5, this);//new ConstantMergePolicy(storageProperties.getLSMIndexMergeThreshold(), this);
ILocalResourceRepositoryFactory persistentLocalResourceRepositoryFactory = new PersistentLocalResourceRepositoryFactory(
ioManager);
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/ConstantMergePolicy.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/ConstantMergePolicy.java
index cf69bfe..4e1239f 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/ConstantMergePolicy.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/ConstantMergePolicy.java
@@ -15,6 +15,7 @@
package edu.uci.ics.asterix.common.context;
+import java.util.ArrayList;
import java.util.List;
import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
@@ -39,11 +40,11 @@
@Override
public void diskComponentAdded(final ILSMIndex index) throws HyracksDataException, IndexException {
- List<ILSMComponent> immutableComponents = index.getImmutableComponents();
+ List<ILSMComponent> immutableComponents = new ArrayList<ILSMComponent>(index.getImmutableComponents());
if (!ctx.isShuttingdown() && immutableComponents.size() >= threshold) {
ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
- accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE);
+ accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE, immutableComponents);
}
}
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrefixMergePolicy.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrefixMergePolicy.java
new file mode 100644
index 0000000..d975c2a
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrefixMergePolicy.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.common.context;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractDiskLSMComponent;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
+
+public class PrefixMergePolicy implements ILSMMergePolicy {
+
+ private final long maxMergableComponentSize;
+ private final int maxTolernaceComponentCount;
+ private final IAsterixAppRuntimeContext ctx;
+
+ public PrefixMergePolicy(long maxMergableComponentSize, int maxTolernaceComponentCount,
+ IAsterixAppRuntimeContext ctx) {
+ this.maxMergableComponentSize = maxMergableComponentSize;
+ this.maxTolernaceComponentCount = maxTolernaceComponentCount;
+ this.ctx = ctx;
+ }
+
+ @Override
+ public void diskComponentAdded(final ILSMIndex index) throws HyracksDataException, IndexException {
+ if (!ctx.isShuttingdown()) {
+ // 1. Look at the candidate components for merging in oldest-first order. If one exists, identify the prefix of the sequence of
+ // all such components for which the sum of their sizes exceeds MaxMrgCompSz. Schedule a merge of those components into a new component.
+ // 2. If a merge from 1 doesn't happen, see if the set of candidate components for merging exceeds MaxTolCompCnt. If so, schedule
+ // a merge all of the current candidates into a new single component.
+ List<ILSMComponent> immutableComponents = new ArrayList<ILSMComponent>(index.getImmutableComponents());
+ long totalSize = 0;
+ int startIndex = -1;
+ for (int i = 0; i < immutableComponents.size(); i++) {
+ ILSMComponent c = immutableComponents.get(i);
+ long componentSize = ((AbstractDiskLSMComponent) c).getComponentSize();
+ if (componentSize > maxMergableComponentSize) {
+ startIndex = i;
+ continue;
+ }
+ totalSize += componentSize;
+ boolean isLastComponent = i + 1 == immutableComponents.size() ? true : false;
+ if (totalSize > maxMergableComponentSize
+ || (isLastComponent && i - startIndex >= maxTolernaceComponentCount)) {
+ List<ILSMComponent> mergableCopments = new ArrayList<ILSMComponent>();
+ for (int j = startIndex + 1; j <= i; j++) {
+ mergableCopments.add(immutableComponents.get(j));
+ }
+ ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(
+ NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE, mergableCopments);
+ break;
+ }
+ }
+ }
+ }
+}