Merge commit '95f508bbb1d07650392c21da2958ba1c53f2a03d' from stabilization-f69489
Change-Id: Ib6a428564681938dceb6ad4a2ba91f15256dc8a2
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/InjectTypeCastForSwitchCaseRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/InjectTypeCastForSwitchCaseRule.java
index fa9385a..683d29f 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/InjectTypeCastForSwitchCaseRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/InjectTypeCastForSwitchCaseRule.java
@@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Set;
import org.apache.asterix.dataflow.data.common.TypeResolverUtil;
import org.apache.asterix.lang.common.util.FunctionUtil;
@@ -38,14 +39,23 @@
import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+import com.google.common.collect.ImmutableSet;
+
/**
- * This rule injects cast functions for "THEN" and "ELSE" branches of a switch-case function if
- * different "THEN" and "ELSE" branches have heterogeneous return types.
+ * This rule injects casts for function parameters if they have heterogeneous return types:
+ * <ul>
+ * <li>for "THEN" and "ELSE" branches of a switch-case function</li>
+ * <li>for parameters of "if missing/null" functions (if-missing(), if-null(), if-missing-or-null())</li>
+ * </ul>
*/
public class InjectTypeCastForSwitchCaseRule implements IAlgebraicRewriteRule {
+ private static final Set<FunctionIdentifier> IF_FUNCTIONS =
+ ImmutableSet.of(BuiltinFunctions.IF_MISSING, BuiltinFunctions.IF_NULL, BuiltinFunctions.IF_MISSING_OR_NULL);
+
@Override
public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
throws AlgebricksException {
@@ -80,10 +90,17 @@
rewritten = true;
}
}
- if (!func.getFunctionIdentifier().equals(BuiltinFunctions.SWITCH_CASE)) {
- return rewritten;
+ FunctionIdentifier funcId = func.getFunctionIdentifier();
+ if (funcId.equals(BuiltinFunctions.SWITCH_CASE)) {
+ if (rewriteSwitchCase(op, func, context)) {
+ rewritten = true;
+ }
+ } else if (IF_FUNCTIONS.contains(funcId)) {
+ if (rewriteFunction(op, func, context)) {
+ rewritten = true;
+ }
}
- return rewriteSwitchCase(op, func, context);
+ return rewritten;
}
// Injects casts that cast types for different "THEN" and "ELSE" branches.
@@ -96,20 +113,44 @@
boolean rewritten = false;
for (int argIndex = 2; argIndex < argSize; argIndex += (argIndex + 2 == argSize) ? 1 : 2) {
Mutable<ILogicalExpression> argRef = argRefs.get(argIndex);
- IAType type = (IAType) env.getType(argRefs.get(argIndex).getValue());
- if (TypeResolverUtil.needsCast(producedType, type)) {
- ILogicalExpression argExpr = argRef.getValue();
- // Injects a cast call to cast the data type to the produced type of the switch-case function call.
- ScalarFunctionCallExpression castFunc =
- new ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(BuiltinFunctions.CAST_TYPE),
- new ArrayList<>(Collections.singletonList(new MutableObject<>(argExpr))));
- castFunc.setSourceLocation(argExpr.getSourceLocation());
- TypeCastUtils.setRequiredAndInputTypes(castFunc, producedType, type);
- argRef.setValue(castFunc);
+ if (rewriteFunctionArgument(argRef, producedType, env)) {
rewritten = true;
}
}
return rewritten;
}
+ // Injects casts that cast types for all function parameters
+ private boolean rewriteFunction(ILogicalOperator op, AbstractFunctionCallExpression func,
+ IOptimizationContext context) throws AlgebricksException {
+ IVariableTypeEnvironment env = op.computeInputTypeEnvironment(context);
+ IAType producedType = (IAType) env.getType(func);
+ List<Mutable<ILogicalExpression>> argRefs = func.getArguments();
+ int argSize = argRefs.size();
+ boolean rewritten = false;
+ for (int argIndex = 0; argIndex < argSize; argIndex++) {
+ Mutable<ILogicalExpression> argRef = argRefs.get(argIndex);
+ if (rewriteFunctionArgument(argRef, producedType, env)) {
+ rewritten = true;
+ }
+ }
+ return rewritten;
+ }
+
+ private boolean rewriteFunctionArgument(Mutable<ILogicalExpression> argRef, IAType funcOutputType,
+ IVariableTypeEnvironment env) throws AlgebricksException {
+ ILogicalExpression argExpr = argRef.getValue();
+ IAType type = (IAType) env.getType(argExpr);
+ if (TypeResolverUtil.needsCast(funcOutputType, type)) {
+ // Injects a cast call to cast the data type to the produced type of the function call.
+ ScalarFunctionCallExpression castFunc =
+ new ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(BuiltinFunctions.CAST_TYPE),
+ new ArrayList<>(Collections.singletonList(new MutableObject<>(argExpr))));
+ castFunc.setSourceLocation(argExpr.getSourceLocation());
+ TypeCastUtils.setRequiredAndInputTypes(castFunc, funcOutputType, type);
+ argRef.setValue(castFunc);
+ return true;
+ }
+ return false;
+ }
}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceSelectAccessMethodRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceSelectAccessMethodRule.java
index d2b456f..0ea16ae 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceSelectAccessMethodRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceSelectAccessMethodRule.java
@@ -220,13 +220,17 @@
List<ILogicalOperator> subRoots = new ArrayList<>();
for (Pair<IAccessMethod, Index> pair : chosenIndexes) {
AccessMethodAnalysisContext analysisCtx = analyzedAMs.get(pair.first);
- subRoots.add(pair.first.createIndexSearchPlan(afterSelectRefs, selectRef, conditionRef,
- subTree.getAssignsAndUnnestsRefs(), subTree, null, pair.second, analysisCtx,
- AccessMethodUtils.retainInputs(subTree.getDataSourceVariables(),
- subTree.getDataSourceRef().getValue(), afterSelectRefs),
- false, subTree.getDataSourceRef().getValue().getInputs().get(0).getValue()
- .getExecutionMode() == ExecutionMode.UNPARTITIONED,
- context, null));
+ boolean retainInput = AccessMethodUtils.retainInputs(subTree.getDataSourceVariables(),
+ subTree.getDataSourceRef().getValue(), afterSelectRefs);
+ boolean requiresBroadcast = subTree.getDataSourceRef().getValue().getInputs().get(0).getValue()
+ .getExecutionMode() == ExecutionMode.UNPARTITIONED;
+ ILogicalOperator subRoot = pair.first.createIndexSearchPlan(afterSelectRefs, selectRef, conditionRef,
+ subTree.getAssignsAndUnnestsRefs(), subTree, null, pair.second, analysisCtx, retainInput, false,
+ requiresBroadcast, context, null);
+ if (subRoot == null) {
+ return false;
+ }
+ subRoots.add(subRoot);
}
// Connect each secondary index utilization plan to a common intersect operator.
ILogicalOperator primaryUnnestOp = connectAll2ndarySearchPlanWithIntersect(subRoots, context);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
index 64d8e93..3c62d99 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
@@ -30,7 +30,6 @@
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
-import java.util.Optional;
import org.apache.asterix.common.storage.IIndexCheckpointManager;
import org.apache.asterix.common.storage.IndexCheckpoint;
@@ -56,7 +55,7 @@
}
@Override
- public synchronized void init(String lastComponentTimestamp, long lsn) throws HyracksDataException {
+ public synchronized void init(long validComponentSequence, long lsn) throws HyracksDataException {
List<IndexCheckpoint> checkpoints;
try {
checkpoints = getCheckpoints();
@@ -67,25 +66,24 @@
LOGGER.warn(() -> "Checkpoints found on initializing: " + indexPath);
delete();
}
- IndexCheckpoint firstCheckpoint = IndexCheckpoint.first(lastComponentTimestamp, lsn);
+ IndexCheckpoint firstCheckpoint = IndexCheckpoint.first(validComponentSequence, lsn);
persist(firstCheckpoint);
}
@Override
- public synchronized void replicated(String componentTimestamp, long masterLsn, long componentId)
+ public synchronized void replicated(long componentSequence, long masterLsn, long componentId)
throws HyracksDataException {
final Long localLsn = getLatest().getMasterNodeFlushMap().get(masterLsn);
if (localLsn == null) {
throw new IllegalStateException("Component flushed before lsn mapping was received");
}
- flushed(componentTimestamp, localLsn, componentId);
+ flushed(componentSequence, localLsn, componentId);
}
@Override
- public synchronized void flushed(String componentTimestamp, long lsn, long componentId)
- throws HyracksDataException {
+ public synchronized void flushed(long componentSequence, long lsn, long componentId) throws HyracksDataException {
final IndexCheckpoint latest = getLatest();
- IndexCheckpoint nextCheckpoint = IndexCheckpoint.next(latest, lsn, componentTimestamp, componentId);
+ IndexCheckpoint nextCheckpoint = IndexCheckpoint.next(latest, lsn, componentSequence, componentId);
persist(nextCheckpoint);
deleteHistory(nextCheckpoint.getId(), HISTORY_CHECKPOINTS);
}
@@ -95,7 +93,7 @@
final IndexCheckpoint latest = getLatest();
latest.getMasterNodeFlushMap().put(masterLsn, localLsn);
final IndexCheckpoint next = IndexCheckpoint.next(latest, latest.getLowWatermark(),
- latest.getValidComponentTimestamp(), latest.getLastComponentId());
+ latest.getValidComponentSequence(), latest.getLastComponentId());
persist(next);
notifyAll();
}
@@ -119,9 +117,8 @@
}
@Override
- public Optional<String> getValidComponentTimestamp() throws HyracksDataException {
- String validComponentTimestamp = getLatest().getValidComponentTimestamp();
- return validComponentTimestamp != null ? Optional.of(validComponentTimestamp) : Optional.empty();
+ public long getValidComponentSequence() throws HyracksDataException {
+ return getLatest().getValidComponentSequence();
}
@Override
@@ -153,18 +150,17 @@
@Override
public synchronized void setLastComponentId(long componentId) throws HyracksDataException {
final IndexCheckpoint latest = getLatest();
- final IndexCheckpoint next = IndexCheckpoint.next(latest, latest.getLowWatermark(),
- latest.getValidComponentTimestamp(), componentId);
+ final IndexCheckpoint next =
+ IndexCheckpoint.next(latest, latest.getLowWatermark(), latest.getValidComponentSequence(), componentId);
persist(next);
}
@Override
- public synchronized void advanceValidComponentTimestamp(String timestamp) throws HyracksDataException {
+ public synchronized void advanceValidComponentSequence(long componentSequence) throws HyracksDataException {
final IndexCheckpoint latest = getLatest();
- if (latest.getValidComponentTimestamp() == null
- || timestamp.compareTo(latest.getValidComponentTimestamp()) > 0) {
- final IndexCheckpoint next =
- IndexCheckpoint.next(latest, latest.getLowWatermark(), timestamp, latest.getLastComponentId());
+ if (componentSequence > latest.getValidComponentSequence()) {
+ final IndexCheckpoint next = IndexCheckpoint.next(latest, latest.getLowWatermark(), componentSequence,
+ latest.getLastComponentId());
persist(next);
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index adf9960..946815f 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -300,7 +300,6 @@
TxnEntityId tempKeyTxnEntityId = new TxnEntityId(-1, -1, -1, null, -1, false);
ILogRecord logRecord = null;
- ILSMComponentIdGenerator idGenerator = null;
try {
logReader.setPosition(lowWaterMarkLSN);
logRecord = logReader.next();
@@ -389,8 +388,7 @@
int partition = logRecord.getResourcePartition();
if (partitions.contains(partition)) {
int datasetId = logRecord.getDatasetId();
- idGenerator = datasetLifecycleManager.getComponentIdGenerator(datasetId, partition);
- if (idGenerator == null) {
+ if (!datasetLifecycleManager.isRegistered(datasetId)) {
// it's possible this dataset has been dropped
logRecord = logReader.next();
continue;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
index 017c59f..5aa1b36 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
@@ -56,6 +56,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
+import org.apache.hyracks.test.support.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -75,6 +76,7 @@
private static ITransactionContext txnCtx;
private static LSMInsertDeleteOperatorNodePushable insertOp;
private static final int PARTITION = 0;
+ private static String indexPath;
@BeforeClass
public static void setUp() throws Exception {
@@ -109,6 +111,7 @@
txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx),
new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
insertOp = StorageTestUtils.getInsertPipeline(nc, ctx);
+ indexPath = indexDataflowHelper.getResource().getPath();
}
@After
@@ -148,9 +151,9 @@
Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
- dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh();
+ dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).refresh();
ILSMComponentId next =
- dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).getId();
+ dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).getId();
long flushLsn = nc.getTransactionSubsystem().getLogManager().getAppendLSN();
Map<String, Object> flushMap = new HashMap<>();
flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn);
@@ -163,8 +166,8 @@
// rollback the last disk component
lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
long lsn = LSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
- dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh();
- next = dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).getId();
+ dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).refresh();
+ next = dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).getId();
flushLsn = nc.getTransactionSubsystem().getLogManager().getAppendLSN();
flushMap = new HashMap<>();
flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn);
@@ -216,9 +219,9 @@
Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
- dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh();
+ dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).refresh();
ILSMComponentId next =
- dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).getId();
+ dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).getId();
long flushLsn = nc.getTransactionSubsystem().getLogManager().getAppendLSN();
Map<String, Object> flushMap = new HashMap<>();
flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn);
@@ -248,8 +251,8 @@
// rollback the last disk component
lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
long lsn = LSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
- dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh();
- next = dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).getId();
+ dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).refresh();
+ next = dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).getId();
flushLsn = nc.getTransactionSubsystem().getLogManager().getAppendLSN();
flushMap = new HashMap<>();
flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn);
@@ -302,9 +305,9 @@
firstSearcher.waitUntilEntered();
// now that we enetered, we will rollback
ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
- dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh();
+ dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).refresh();
ILSMComponentId next =
- dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).getId();
+ dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).getId();
long flushLsn = nc.getTransactionSubsystem().getLogManager().getAppendLSN();
Map<String, Object> flushMap = new HashMap<>();
flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn);
@@ -329,8 +332,8 @@
secondSearcher.waitUntilEntered();
lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
long lsn = LSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
- dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh();
- next = dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).getId();
+ dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).refresh();
+ next = dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).getId();
flushLsn = nc.getTransactionSubsystem().getLogManager().getAppendLSN();
flushMap = new HashMap<>();
flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn);
@@ -738,9 +741,10 @@
public void run() {
ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
try {
- dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh();
- ILSMComponentId next =
- dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).getId();
+ dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath)
+ .refresh();
+ ILSMComponentId next = dsLifecycleMgr
+ .getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION, indexPath).getId();
long flushLsn = nc.getTransactionSubsystem().getLogManager().getAppendLSN();
Map<String, Object> flushMap = new HashMap<>();
flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmIoOpCallbackFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmIoOpCallbackFactory.java
index 108afa7..c727f52 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmIoOpCallbackFactory.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmIoOpCallbackFactory.java
@@ -28,7 +28,7 @@
import org.apache.hyracks.api.io.IJsonSerializable;
import org.apache.hyracks.api.io.IPersistedResourceRegistry;
import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
@@ -64,8 +64,8 @@
// Whenever this is called, it resets the counter
// However, the counters for the failed operations are never reset since we expect them
// To be always 0
- return new TestLsmIoOpCallback(datasetInfoProvider.getDatasetInfo(ncCtx), index,
- getComponentIdGenerator().getId(), getIndexCheckpointManagerProvider());
+ return new TestLsmIoOpCallback(datasetInfoProvider.getDatasetInfo(ncCtx), index, getComponentIdGenerator(),
+ getIndexCheckpointManagerProvider());
}
public int getTotalFlushes() {
@@ -125,9 +125,9 @@
public class TestLsmIoOpCallback extends LSMIOOperationCallback {
private final TestLsmBtree lsmBtree;
- public TestLsmIoOpCallback(DatasetInfo dsInfo, ILSMIndex index, ILSMComponentId id,
+ public TestLsmIoOpCallback(DatasetInfo dsInfo, ILSMIndex index, ILSMComponentIdGenerator idGenerator,
IIndexCheckpointManagerProvider checkpointManagerProvider) {
- super(dsInfo, index, id, checkpointManagerProvider);
+ super(dsInfo, index, idGenerator.getId(), checkpointManagerProvider);
lsmBtree = (TestLsmBtree) index;
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
index 22ee542..c4390fa 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
@@ -55,12 +55,13 @@
DatasetLifecycleManager dslcManager = (DatasetLifecycleManager) appCtx.getDatasetLifecycleManager();
DatasetResource dsr = dslcManager.getDatasetLifecycle(datasetId);
int partition = StoragePathUtil.getPartitionNumFromRelativePath(resource.getPath());
- PrimaryIndexOperationTracker opTracker = dslcManager.getOperationTracker(datasetId, partition);
+ PrimaryIndexOperationTracker opTracker =
+ dslcManager.getOperationTracker(datasetId, partition, resource.getPath());
if (!(opTracker instanceof TestPrimaryIndexOperationTracker)) {
Field opTrackersField = DatasetResource.class.getDeclaredField("datasetPrimaryOpTrackers");
opTracker = new TestPrimaryIndexOperationTracker(datasetId, partition,
appCtx.getTransactionSubsystem().getLogManager(), dsr.getDatasetInfo(),
- dslcManager.getComponentIdGenerator(datasetId, partition));
+ dslcManager.getComponentIdGenerator(datasetId, partition, resource.getPath()));
replaceMapEntry(opTrackersField, dsr, partition, opTracker);
}
return opTracker;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java
index 694a0c7..54ae683 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java
@@ -18,18 +18,11 @@
*/
package org.apache.asterix.test.storage;
-import static org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager.COMPONENT_TIMESTAMP_FORMAT;
-
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
-import java.text.Format;
-import java.text.SimpleDateFormat;
import java.util.Arrays;
-import java.util.Date;
-import java.util.Optional;
-import java.util.concurrent.TimeUnit;
import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
import org.apache.asterix.common.TestDataUtil;
@@ -129,17 +122,15 @@
TestDataUtil.upsertData(datasetName, 100);
ncAppCtx.getDatasetLifecycleManager().flushDataset(dataset.getDatasetId(), false);
- // create new invalid component with a timestamp > checkpoint valid component timestamp (i.e. in the future)
- Format formatter = new SimpleDateFormat(COMPONENT_TIMESTAMP_FORMAT);
- Date futureTime = new Date(System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(5));
- String invalidComponentTimestamp =
- formatter.format(futureTime) + AbstractLSMIndexFileManager.DELIMITER + formatter.format(futureTime);
+ // create new invalid component sequence with a sequence > checkpoint valid component sequence
+ String invalidComponentId = "1000";
+ String invalidComponentRange = invalidComponentId + AbstractLSMIndexFileManager.DELIMITER + invalidComponentId;
FileReference indexDirRef = ncAppCtx.getIoManager().resolve(indexPath);
String indexDir = indexDirRef.getFile().getAbsolutePath();
// create the invalid component files
- Path btreePath = Paths.get(indexDir, invalidComponentTimestamp + AbstractLSMIndexFileManager.DELIMITER
+ Path btreePath = Paths.get(indexDir, invalidComponentRange + AbstractLSMIndexFileManager.DELIMITER
+ AbstractLSMIndexFileManager.BTREE_SUFFIX);
- Path filterPath = Paths.get(indexDir, invalidComponentTimestamp + AbstractLSMIndexFileManager.DELIMITER
+ Path filterPath = Paths.get(indexDir, invalidComponentRange + AbstractLSMIndexFileManager.DELIMITER
+ AbstractLSMIndexFileManager.BLOOM_FILTER_SUFFIX);
Files.createFile(btreePath);
Files.createFile(filterPath);
@@ -156,14 +147,14 @@
DatasetResourceReference drr = DatasetResourceReference.of(localResource);
IIndexCheckpointManagerProvider indexCheckpointManagerProvider = ncAppCtx.getIndexCheckpointManagerProvider();
IIndexCheckpointManager indexCheckpointManager = indexCheckpointManagerProvider.get(drr);
- Optional<String> validComponentTimestamp = indexCheckpointManager.getValidComponentTimestamp();
- Assert.assertTrue(validComponentTimestamp.isPresent());
+ long validComponentSequence = indexCheckpointManager.getValidComponentSequence();
+ Assert.assertTrue(validComponentSequence > Long.MIN_VALUE);
File[] indexRemainingFiles =
indexDirRef.getFile().listFiles(AbstractLSMIndexFileManager.COMPONENT_FILES_FILTER);
Assert.assertNotNull(indexRemainingFiles);
long validComponentFilesCount = Arrays.stream(indexRemainingFiles)
- .filter(file -> file.getName().startsWith(validComponentTimestamp.get())).count();
+ .filter(file -> file.getName().startsWith(String.valueOf(validComponentSequence))).count();
Assert.assertTrue(validComponentFilesCount > 0);
}
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/btree-index/btree-secondary-68.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/btree-index/btree-secondary-68.sqlpp
new file mode 100644
index 0000000..af04479
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/btree-index/btree-secondary-68.sqlpp
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Description : Secondary BTree Index intersection with between operator (ASTERIXDB-2448)
+ * Expected Result : Success
+ */
+
+drop dataverse tpch if exists;
+create dataverse tpch;
+
+use tpch;
+
+create type tpch.OrderType as
+ closed {
+ o_orderkey : bigint,
+ o_custkey : bigint,
+ o_orderstatus : string,
+ o_totalprice : double,
+ o_orderdate : string,
+ o_orderpriority : string,
+ o_clerk : string,
+ o_shippriority : bigint,
+ o_comment : string
+};
+
+create dataset Orders(OrderType) primary key o_orderkey;
+
+create index idx_custkey on Orders (o_custkey) type btree;
+
+create index idx_orderpriority on Orders (o_orderpriority) type btree;
+
+select o_custkey, o_orderkey, o_orderstatus from Orders
+where
+ o_orderpriority = '1-URGENT' and
+ o_custkey between 40 and 43
+order by o_custkey, o_orderkey;
+
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/btree-index/btree-secondary-68.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/btree-index/btree-secondary-68.plan
new file mode 100644
index 0000000..80def88
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/btree-index/btree-secondary-68.plan
@@ -0,0 +1,32 @@
+-- DISTRIBUTE_RESULT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- SORT_MERGE_EXCHANGE [$$25(ASC), $$26(ASC) ] |PARTITIONED|
+ -- STABLE_SORT [$$25(ASC), $$26(ASC)] |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- STREAM_SELECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- BTREE_SEARCH |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- INTERSECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STABLE_SORT [$$35(ASC)] |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- BTREE_SEARCH |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STABLE_SORT [$$39(ASC)] |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- BTREE_SEARCH |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/binary/concat2/concat2.1.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/binary/concat2/concat2.1.query.sqlpp
new file mode 100644
index 0000000..782cce2
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/binary/concat2/concat2.1.query.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+{
+ 't1': binary_concat([null,hex('aa')]),
+ 't2': binary_concat([hex('aa'),null]),
+ 't3': binary_concat([null,missing,hex('aa')]),
+ 't4': binary_concat([hex('aa'),null,missing])
+};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446/query-ASTERIXDB-2446.1.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446/query-ASTERIXDB-2446.1.query.sqlpp
new file mode 100644
index 0000000..554cf28
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446/query-ASTERIXDB-2446.1.query.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Description : Ensure error if there's a duplicate field name in the SELECT clause
+ */
+
+select t as a, 2 as a from range(1, 10) t
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446/query-ASTERIXDB-2446.2.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446/query-ASTERIXDB-2446.2.query.sqlpp
new file mode 100644
index 0000000..b548b5d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446/query-ASTERIXDB-2446.2.query.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Description : Ensure error if there's a duplicate field name in the closed-object-constructor() function
+ */
+
+set `import-private-functions` `true`;
+
+`closed-object-constructor`('b',get_year(current_date()),'c',[20],'c',[30]);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446/query-ASTERIXDB-2446.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446/query-ASTERIXDB-2446.3.query.sqlpp
new file mode 100644
index 0000000..73810d8
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446/query-ASTERIXDB-2446.3.query.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Description : Ensure error if there's a duplicate field name in the open-object-constructor() function
+ */
+
+set `import-private-functions` `true`;
+
+`open-object-constructor`('d' || to_string(get_year(current_date())),10,'e',[20],'e',[30]);
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/index-selection/intersection-with-between/intersection-with-between.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/index-selection/intersection-with-between/intersection-with-between.1.ddl.sqlpp
new file mode 100644
index 0000000..cb51012
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/index-selection/intersection-with-between/intersection-with-between.1.ddl.sqlpp
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Description : Secondary BTree Index intersection with between operator (ASTERIXDB-2448)
+ * Expected Result : Success
+ */
+
+drop dataverse tpch if exists;
+create dataverse tpch;
+
+use tpch;
+
+create type tpch.OrderType as
+ closed {
+ o_orderkey : bigint,
+ o_custkey : bigint,
+ o_orderstatus : string,
+ o_totalprice : double,
+ o_orderdate : string,
+ o_orderpriority : string,
+ o_clerk : string,
+ o_shippriority : bigint,
+ o_comment : string
+};
+
+create dataset Orders(OrderType) primary key o_orderkey;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/index-selection/intersection-with-between/intersection-with-between.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/index-selection/intersection-with-between/intersection-with-between.2.update.sqlpp
new file mode 100644
index 0000000..1097325
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/index-selection/intersection-with-between/intersection-with-between.2.update.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use tpch;
+
+load dataset Orders using localfs ((`path`=`asterix_nc1://data/tpch0.001/orders.tbl`),(`format`=`delimited-text`),(`delimiter`=`|`)) pre-sorted;
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/index-selection/intersection-with-between/intersection-with-between.3.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/index-selection/intersection-with-between/intersection-with-between.3.ddl.sqlpp
new file mode 100644
index 0000000..62f4cc0
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/index-selection/intersection-with-between/intersection-with-between.3.ddl.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use tpch;
+
+create index idx_custkey on Orders (o_custkey) type btree;
+
+create index idx_orderpriority on Orders (o_orderpriority) type btree;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/index-selection/intersection-with-between/intersection-with-between.4.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/index-selection/intersection-with-between/intersection-with-between.4.query.sqlpp
new file mode 100644
index 0000000..75212af
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/index-selection/intersection-with-between/intersection-with-between.4.query.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use tpch;
+
+select o_custkey, o_orderkey, o_orderstatus from Orders
+where
+ o_orderpriority = '1-URGENT' and
+ o_custkey between 40 and 43
+order by o_custkey, o_orderkey
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/null-missing/ifmissing/ifmissing.1.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/null-missing/ifmissing/ifmissing.1.query.sqlpp
index 2f0d837..0abb997 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/null-missing/ifmissing/ifmissing.1.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/null-missing/ifmissing/ifmissing.1.query.sqlpp
@@ -29,5 +29,12 @@
missing,
case when get_year(current_datetime()) > 0 then missing else false end,
case when get_year(current_datetime()) > 0 then true else null end
+ ),
+ "j": (
+ let v = if_missing(
+ case when get_year(current_datetime()) > 0 then missing else false end,
+ { "c": [ 2 ] }
+ )
+ select v as b
)
};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/null-missing/ifmissingornull/ifmissingornull.1.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/null-missing/ifmissingornull/ifmissingornull.1.query.sqlpp
index 32f040f..22a8acd 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/null-missing/ifmissingornull/ifmissingornull.1.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/null-missing/ifmissingornull/ifmissingornull.1.query.sqlpp
@@ -32,5 +32,12 @@
case when get_year(current_datetime()) > 0 then missing else false end,
case when get_year(current_datetime()) > 0 then null else false end,
case when get_year(current_datetime()) > 0 then true else missing end
+ ),
+ "j": (
+ let v = if_missing_or_null(
+ case when get_year(current_datetime()) > 0 then missing else false end,
+ { "c": [ 2 ] }
+ )
+ select v as b
)
};
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/null-missing/ifnull/ifnull.1.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/null-missing/ifnull/ifnull.1.query.sqlpp
index c0683bd..0121cd8 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/null-missing/ifnull/ifnull.1.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/null-missing/ifnull/ifnull.1.query.sqlpp
@@ -30,5 +30,12 @@
null,
case when get_year(current_datetime()) > 0 then null else false end,
case when get_year(current_datetime()) > 0 then true else missing end
+ ),
+ "j": (
+ let v = if_null(
+ case when get_year(current_datetime()) > 0 then null else false end,
+ { "c": [ 2 ] }
+ )
+ select v as b
)
};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/string/string-concat2/string-concat2.1.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/string/string-concat2/string-concat2.1.query.sqlpp
new file mode 100644
index 0000000..3e48631
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/string/string-concat2/string-concat2.1.query.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+{
+ 't1': string_concat([null,'aa']),
+ 't2': string_concat(['aa',null]),
+ 't3': string_concat([null,missing,'aa']),
+ 't4': string_concat(['aa',null,missing])
+};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/string/string-concat2/string-concat2.2.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/string/string-concat2/string-concat2.2.query.sqlpp
new file mode 100644
index 0000000..12208a7
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/string/string-concat2/string-concat2.2.query.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+{
+ 't1': null || 'aa',
+ 't2': 'aa' || null,
+ 't3': null || missing || 'aa',
+ 't4': 'aa' || null || missing
+};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/binary/concat2/concat2.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/binary/concat2/concat2.1.adm
new file mode 100644
index 0000000..770f2cd
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/binary/concat2/concat2.1.adm
@@ -0,0 +1 @@
+{ "t1": null, "t2": null }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/index-selection/intersection-with-between/intersection-with-between.4.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/index-selection/intersection-with-between/intersection-with-between.4.adm
new file mode 100644
index 0000000..ad86590
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/index-selection/intersection-with-between/intersection-with-between.4.adm
@@ -0,0 +1,7 @@
+{ "o_custkey": 40, "o_orderkey": 323, "o_orderstatus": "F" }
+{ "o_custkey": 40, "o_orderkey": 3653, "o_orderstatus": "F" }
+{ "o_custkey": 40, "o_orderkey": 4934, "o_orderstatus": "O" }
+{ "o_custkey": 43, "o_orderkey": 258, "o_orderstatus": "F" }
+{ "o_custkey": 43, "o_orderkey": 2596, "o_orderstatus": "O" }
+{ "o_custkey": 43, "o_orderkey": 3687, "o_orderstatus": "F" }
+{ "o_custkey": 43, "o_orderkey": 5378, "o_orderstatus": "F" }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-primary-scan-select/push-limit-to-primary-scan-select.8.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-primary-scan-select/push-limit-to-primary-scan-select.8.adm
index 7fa0bce..131b860 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-primary-scan-select/push-limit-to-primary-scan-select.8.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/limit/push-limit-to-primary-scan-select/push-limit-to-primary-scan-select.8.adm
@@ -20,7 +20,7 @@
-- ASSIGN |PARTITIONED|
exchange
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- data-scan []<-[$$22, $$t] <- test.TweetMessages condition (and(le(0, $$t.getField("user").getField("friends_count")), le($$t.getField("user").getField("friends_count"), 150))) limit 2
+ data-scan []<-[$$22, $$t] <- test.TweetMessages condition (and(ge($$t.getField("user").getField("friends_count"), 0), le($$t.getField("user").getField("friends_count"), 150))) limit 2
-- DATASOURCE_SCAN |PARTITIONED|
exchange
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/null-missing/ifmissing/ifmissing.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/null-missing/ifmissing/ifmissing.1.adm
index 0a2275f..14620a1 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/null-missing/ifmissing/ifmissing.1.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/null-missing/ifmissing/ifmissing.1.adm
@@ -1 +1 @@
-{ "a": true, "b": true, "c": true, "d": true, "e": true, "f": true, "g": true, "i": true }
\ No newline at end of file
+{ "a": true, "b": true, "c": true, "d": true, "e": true, "f": true, "g": true, "i": true, "j": [ { "b": { "c": [ 2 ] } } ] }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/null-missing/ifmissingornull/ifmissingornull.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/null-missing/ifmissingornull/ifmissingornull.1.adm
index 633c503..eff2651 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/null-missing/ifmissingornull/ifmissingornull.1.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/null-missing/ifmissingornull/ifmissingornull.1.adm
@@ -1 +1 @@
-{ "a": true, "b": true, "c": true, "d": true, "e": true, "f": true, "g": true, "h": true, "i": true }
\ No newline at end of file
+{ "a": true, "b": true, "c": true, "d": true, "e": true, "f": true, "g": true, "h": true, "i": true, "j": [ { "b": { "c": [ 2 ] } } ] }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/null-missing/ifnull/ifnull.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/null-missing/ifnull/ifnull.1.adm
index 633c503..eff2651 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/null-missing/ifnull/ifnull.1.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/null-missing/ifnull/ifnull.1.adm
@@ -1 +1 @@
-{ "a": true, "b": true, "c": true, "d": true, "e": true, "f": true, "g": true, "h": true, "i": true }
\ No newline at end of file
+{ "a": true, "b": true, "c": true, "d": true, "e": true, "f": true, "g": true, "h": true, "i": true, "j": [ { "b": { "c": [ 2 ] } } ] }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/string/string-concat2/string-concat2.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/string/string-concat2/string-concat2.1.adm
new file mode 100644
index 0000000..770f2cd
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/string/string-concat2/string-concat2.1.adm
@@ -0,0 +1 @@
+{ "t1": null, "t2": null }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/string/string-concat2/string-concat2.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/string/string-concat2/string-concat2.2.adm
new file mode 100644
index 0000000..770f2cd
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/string/string-concat2/string-concat2.2.adm
@@ -0,0 +1 @@
+{ "t1": null, "t2": null }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 0425b79..b5fc053 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -92,6 +92,14 @@
<expected-error>Duplicate alias definitions: s1</expected-error>
</compilation-unit>
</test-case>
+ <test-case FilePath="flwor">
+ <compilation-unit name="query-ASTERIXDB-2446">
+ <output-dir compare="Text">query-ASTERIXDB-883</output-dir>
+ <expected-error>ASX0013: Duplicate field name "a"</expected-error>
+ <expected-error>ASX0013: Duplicate field name "c"</expected-error>
+ <expected-error>ASX0013: Duplicate field name "e"</expected-error>
+ </compilation-unit>
+ </test-case>
</test-group>
<test-group name="explain">
<test-case FilePath="explain">
@@ -3367,6 +3375,11 @@
</compilation-unit>
</test-case>
<test-case FilePath="index-selection">
+ <compilation-unit name="intersection-with-between">
+ <output-dir compare="Text">intersection-with-between</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="index-selection">
<compilation-unit name="inverted-index-ngram-contains">
<output-dir compare="Text">inverted-index-ngram-contains</output-dir>
</compilation-unit>
@@ -6611,6 +6624,11 @@
</compilation-unit>
</test-case>
<test-case FilePath="string">
+ <compilation-unit name="string-concat2">
+ <output-dir compare="Text">string-concat2</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="string">
<compilation-unit name="string-equal1">
<output-dir compare="Text">string-equal1</output-dir>
</compilation-unit>
@@ -9829,6 +9847,11 @@
</compilation-unit>
</test-case>
<test-case FilePath="binary">
+ <compilation-unit name="concat2">
+ <output-dir compare="Text">concat2</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="binary">
<compilation-unit name="subbinary">
<output-dir compare="Text">subbinary</output-dir>
</compilation-unit>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index 4441c6e..d18b6ab 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -40,6 +40,15 @@
IIndex getIndex(int datasetId, long indexId) throws HyracksDataException;
/**
+ * Indicates if the dataset with id {@code datasetId} is currently registered
+ * with this {@link IDatasetLifecycleManager}
+ *
+ * @param datasetId
+ * @return true if the dataset is currently registered. Otherwise false.
+ */
+ boolean isRegistered(int datasetId);
+
+ /**
* Flushes all open datasets synchronously.
*
* @throws HyracksDataException
@@ -76,18 +85,20 @@
*
* @param datasetId
* @param partition
+ * @param path
* @return
*/
- PrimaryIndexOperationTracker getOperationTracker(int datasetId, int partition);
+ PrimaryIndexOperationTracker getOperationTracker(int datasetId, int partition, String path);
/**
* creates (if necessary) and returns the component Id generator of a dataset.
*
* @param datasetId
* @param partition
+ * @param path
* @return
*/
- ILSMComponentIdGenerator getComponentIdGenerator(int datasetId, int partition);
+ ILSMComponentIdGenerator getComponentIdGenerator(int datasetId, int partition, String path);
/**
* creates (if necessary) and returns the dataset virtual buffer caches.
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java
index 6476e68..811da78 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java
@@ -53,7 +53,7 @@
IDatasetLifecycleManager dslcManager =
((INcApplicationContext) serviceCtx.getApplicationContext()).getDatasetLifecycleManager();
int partition = StoragePathUtil.getPartitionNumFromRelativePath(resource.getPath());
- return dslcManager.getComponentIdGenerator(datasetId, partition);
+ return dslcManager.getComponentIdGenerator(datasetId, partition, resource.getPath());
}
@Override
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 50a4bef..1dff69d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -19,6 +19,7 @@
package org.apache.asterix.common.context;
import static org.apache.asterix.common.metadata.MetadataIndexImmutableProperties.METADATA_DATASETS_PARTITIONS;
+import static org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId.MIN_VALID_COMPONENT_ID;
import java.io.IOException;
import java.io.OutputStream;
@@ -37,7 +38,9 @@
import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
import org.apache.asterix.common.replication.IReplicationStrategy;
import org.apache.asterix.common.storage.DatasetResourceReference;
+import org.apache.asterix.common.storage.IIndexCheckpointManager;
import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
+import org.apache.asterix.common.storage.ResourceReference;
import org.apache.asterix.common.transactions.ILogManager;
import org.apache.asterix.common.transactions.LogRecord;
import org.apache.asterix.common.transactions.LogType;
@@ -322,32 +325,36 @@
}
@Override
- public synchronized PrimaryIndexOperationTracker getOperationTracker(int datasetId, int partition) {
+ public synchronized PrimaryIndexOperationTracker getOperationTracker(int datasetId, int partition, String path) {
DatasetResource dataset = datasets.get(datasetId);
PrimaryIndexOperationTracker opTracker = dataset.getOpTracker(partition);
if (opTracker == null) {
- populateOpTrackerAndIdGenerator(dataset, partition);
+ populateOpTrackerAndIdGenerator(dataset, partition, path);
opTracker = dataset.getOpTracker(partition);
}
return opTracker;
}
@Override
- public synchronized ILSMComponentIdGenerator getComponentIdGenerator(int datasetId, int partition) {
+ public synchronized ILSMComponentIdGenerator getComponentIdGenerator(int datasetId, int partition, String path) {
DatasetResource dataset = datasets.get(datasetId);
- if (dataset == null) {
- return null;
- }
ILSMComponentIdGenerator generator = dataset.getComponentIdGenerator(partition);
if (generator == null) {
- populateOpTrackerAndIdGenerator(dataset, partition);
+ populateOpTrackerAndIdGenerator(dataset, partition, path);
generator = dataset.getComponentIdGenerator(partition);
}
return generator;
}
- private void populateOpTrackerAndIdGenerator(DatasetResource dataset, int partition) {
- ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(storageProperties.getMemoryComponentsNum());
+ @Override
+ public synchronized boolean isRegistered(int datasetId) {
+ return datasets.containsKey(datasetId);
+ }
+
+ private void populateOpTrackerAndIdGenerator(DatasetResource dataset, int partition, String path) {
+ final long lastValidId = getDatasetLastValidComponentId(path);
+ ILSMComponentIdGenerator idGenerator =
+ new LSMComponentIdGenerator(storageProperties.getMemoryComponentsNum(), lastValidId);
PrimaryIndexOperationTracker opTracker = new PrimaryIndexOperationTracker(dataset.getDatasetID(), partition,
logManager, dataset.getDatasetInfo(), idGenerator);
dataset.setPrimaryIndexOperationTracker(partition, opTracker);
@@ -600,4 +607,18 @@
indexInfo.setOpen(false);
}
}
+
+ private long getDatasetLastValidComponentId(String indexPath) {
+ try {
+ final ResourceReference indexRef = ResourceReference.ofIndex(indexPath);
+ final ResourceReference primaryIndexRef = indexRef.getDatasetReference();
+ final IIndexCheckpointManager indexCheckpointManager = indexCheckpointManagerProvider.get(primaryIndexRef);
+ if (indexCheckpointManager.getCheckpointCount() > 0) {
+ return Math.max(indexCheckpointManager.getLatest().getLastComponentId(), MIN_VALID_COMPONENT_ID);
+ }
+ return MIN_VALID_COMPONENT_ID;
+ } catch (HyracksDataException e) {
+ throw new IllegalStateException(e);
+ }
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
index ea53d68..0aa46a8 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
@@ -44,9 +44,9 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
-import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
import org.apache.hyracks.storage.am.lsm.common.impls.DiskComponentMetadata;
import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
+import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils;
import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils;
@@ -73,12 +73,12 @@
private int pendingFlushes = 0;
private Deque<ILSMComponentId> componentIds = new ArrayDeque<>();
- public LSMIOOperationCallback(DatasetInfo dsInfo, ILSMIndex lsmIndex, ILSMComponentId nextComponentId,
+ public LSMIOOperationCallback(DatasetInfo dsInfo, ILSMIndex lsmIndex, ILSMComponentId componentId,
IIndexCheckpointManagerProvider indexCheckpointManagerProvider) {
this.dsInfo = dsInfo;
this.lsmIndex = lsmIndex;
this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
- componentIds.add(nextComponentId);
+ componentIds.add(componentId);
}
@Override
@@ -132,8 +132,8 @@
operation.getIOOpertionType() == LSMIOOperationType.FLUSH ? (Long) map.get(KEY_FLUSH_LOG_LSN) : 0L;
final LSMComponentId id = (LSMComponentId) map.get(KEY_FLUSHED_COMPONENT_ID);
final ResourceReference ref = ResourceReference.of(target.getAbsolutePath());
- final String componentEndTime = AbstractLSMIndexFileManager.getComponentEndTime(ref.getName());
- indexCheckpointManagerProvider.get(ref).flushed(componentEndTime, lsn, id.getMaxId());
+ final long componentSequence = IndexComponentFileReference.of(ref.getName()).getSequenceEnd();
+ indexCheckpointManagerProvider.get(ref).flushed(componentSequence, lsn, id.getMaxId());
}
private void deleteComponentsFromCheckpoint(ILSMIOOperation operation) throws HyracksDataException {
@@ -275,6 +275,6 @@
@Override
public void allocated(ILSMMemoryComponent component) throws HyracksDataException {
- // No Op
+ // no op
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
index 2c0872c..dd9ede5 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
@@ -27,33 +27,33 @@
/**
* Initializes the first checkpoint of an index with low watermark {@code lsn}
*
- * @param componentTimestamp
+ * @param validComponentSequence
* @param lsn
* @throws HyracksDataException
*/
- void init(String componentTimestamp, long lsn) throws HyracksDataException;
+ void init(long validComponentSequence, long lsn) throws HyracksDataException;
/**
* Called when a new LSM disk component is flushed. When called, the index checkpoint is updated
- * with the latest valid {@code componentTimestamp} and low watermark {@code lsn}
+ * with the latest valid {@code componentSequence} and low watermark {@code lsn}
*
- * @param componentTimestamp
+ * @param componentSequence
* @param lsn
* @throws HyracksDataException
*/
- void flushed(String componentTimestamp, long lsn, long componentId) throws HyracksDataException;
+ void flushed(long componentSequence, long lsn, long componentId) throws HyracksDataException;
/**
* Called when a new LSM disk component is replicated from master. When called, the index checkpoint is updated
- * with the latest valid {@code componentTimestamp} and the local lsn mapping of {@code masterLsn} is set as the
+ * with the latest valid {@code componentSequence} and the local lsn mapping of {@code masterLsn} is set as the
* new low watermark.
*
- * @param componentTimestamp
+ * @param componentSequence
* @param masterLsn
* @param componentId
* @throws HyracksDataException
*/
- void replicated(String componentTimestamp, long masterLsn, long componentId) throws HyracksDataException;
+ void replicated(long componentSequence, long masterLsn, long componentId) throws HyracksDataException;
/**
* Called when a flush log is received and replicated from master. The mapping between
@@ -88,12 +88,12 @@
void delete();
/**
- * Gets the index last valid component timestamp if the index has any components. Otherwise {@link Optional#empty()}
+ * Gets the index last valid component sequence.
*
- * @return the index last valid component timestamp
+ * @return the index last valid component sequence
* @throws HyracksDataException
*/
- Optional<String> getValidComponentTimestamp() throws HyracksDataException;
+ long getValidComponentSequence() throws HyracksDataException;
/**
* Gets the number of valid checkpoints the index has.
@@ -110,12 +110,12 @@
IndexCheckpoint getLatest() throws HyracksDataException;
/**
- * Advance the last valid component timestamp. Used for replicated bulkloaded components
+ * Advance the last valid component sequence. Used for replicated bulkloaded components
*
- * @param timeStamp
+ * @param componentSequence
* @throws HyracksDataException
*/
- void advanceValidComponentTimestamp(String timeStamp) throws HyracksDataException;
+ void advanceValidComponentSequence(long componentSequence) throws HyracksDataException;
/**
* Set the last component id. Used during recovery or after component delete
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
index 73d3122..f84167e 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
@@ -34,22 +34,22 @@
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final long INITIAL_CHECKPOINT_ID = 0;
private long id;
- private String validComponentTimestamp;
+ private long validComponentSequence;
private long lowWatermark;
private long lastComponentId;
private Map<Long, Long> masterNodeFlushMap;
- public static IndexCheckpoint first(String lastComponentTimestamp, long lowWatermark) {
+ public static IndexCheckpoint first(long lastComponentSequence, long lowWatermark) {
IndexCheckpoint firstCheckpoint = new IndexCheckpoint();
firstCheckpoint.id = INITIAL_CHECKPOINT_ID;
firstCheckpoint.lowWatermark = lowWatermark;
- firstCheckpoint.validComponentTimestamp = lastComponentTimestamp;
+ firstCheckpoint.validComponentSequence = lastComponentSequence;
firstCheckpoint.lastComponentId = LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID.getMaxId();
firstCheckpoint.masterNodeFlushMap = new HashMap<>();
return firstCheckpoint;
}
- public static IndexCheckpoint next(IndexCheckpoint latest, long lowWatermark, String validComponentTimestamp,
+ public static IndexCheckpoint next(IndexCheckpoint latest, long lowWatermark, long validComponentSequence,
long lastComponentId) {
if (lowWatermark < latest.getLowWatermark()) {
throw new IllegalStateException("Low watermark should always be increasing");
@@ -58,7 +58,7 @@
next.id = latest.getId() + 1;
next.lowWatermark = lowWatermark;
next.lastComponentId = lastComponentId;
- next.validComponentTimestamp = validComponentTimestamp;
+ next.validComponentSequence = validComponentSequence;
next.masterNodeFlushMap = latest.getMasterNodeFlushMap();
// remove any lsn from the map that wont be used anymore
next.masterNodeFlushMap.values().removeIf(lsn -> lsn <= lowWatermark);
@@ -69,8 +69,8 @@
private IndexCheckpoint() {
}
- public String getValidComponentTimestamp() {
- return validComponentTimestamp;
+ public long getValidComponentSequence() {
+ return validComponentSequence;
}
public long getLowWatermark() {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
index 8bd848d..a24bf72 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
@@ -38,6 +38,10 @@
protected ResourceReference() {
}
+ public static ResourceReference ofIndex(String indexPath) {
+ return of(new File(indexPath, StorageConstants.METADATA_FILE_NAME).toString());
+ }
+
public static ResourceReference of(String localResourcePath) {
ResourceReference lrr = new ResourceReference();
parse(lrr, localResourcePath);
@@ -72,6 +76,11 @@
return Paths.get(root, partition, dataverse, dataset, rebalance, index);
}
+ public ResourceReference getDatasetReference() {
+ return ResourceReference
+ .ofIndex(Paths.get(root, partition, dataverse, dataset, rebalance, dataset).toFile().getPath());
+ }
+
public Path getFileRelativePath() {
return Paths.get(root, partition, dataverse, dataset, rebalance, index, name);
}
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index f0dfd1a..7158b95 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -43,8 +43,8 @@
# Data errors
6 = Invalid format for %1$s in %2$s
-7 = Overflow happend in %1$s
-8 = Underflow happend in %1$s
+7 = Overflow in %1$s
+8 = Underflow in %1$s
9 = Injected failure in %1$s
10 = Invalid value: function %1$s expects its %2$s input parameter to be a non-negative value, but gets %3$s
11 = Index out of bound in %1$s: %2$s
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
index 29a2aa0..db0911b 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
@@ -19,9 +19,8 @@
package org.apache.asterix.test.ioopcallbacks;
-import java.text.Format;
-import java.text.SimpleDateFormat;
-import java.util.Date;
+import static org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId.MIN_VALID_COMPONENT_ID;
+
import java.util.HashMap;
import java.util.Map;
@@ -38,7 +37,6 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
-import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMMemoryComponent;
import org.apache.hyracks.storage.am.lsm.common.impls.DiskComponentMetadata;
import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
@@ -64,13 +62,11 @@
* 7. destroy
*/
- private static final Format FORMATTER =
- new SimpleDateFormat(AbstractLSMIndexFileManager.COMPONENT_TIMESTAMP_FORMAT);
+ private static long COMPONENT_SEQUENCE = 0;
private static String getComponentFileName() {
- Date date = new Date();
- String ts = FORMATTER.format(date);
- return ts + '_' + ts;
+ final String sequence = String.valueOf(COMPONENT_SEQUENCE++);
+ return sequence + '_' + sequence;
}
@Test
@@ -82,7 +78,7 @@
Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(numMemoryComponents);
Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(Mockito.mock(AbstractLSMMemoryComponent.class));
DatasetInfo dsInfo = new DatasetInfo(101, null);
- LSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(numMemoryComponents);
+ LSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(numMemoryComponents, MIN_VALID_COMPONENT_ID);
LSMIOOperationCallback callback = new LSMIOOperationCallback(dsInfo, mockIndex, idGenerator.getId(),
mockIndexCheckpointManagerProvider());
//Flush first
@@ -141,7 +137,7 @@
public void testAllocateComponentId() throws HyracksDataException {
int numMemoryComponents = 2;
DatasetInfo dsInfo = new DatasetInfo(101, null);
- ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(numMemoryComponents);
+ ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(numMemoryComponents, MIN_VALID_COMPONENT_ID);
ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(numMemoryComponents);
ILSMMemoryComponent mockComponent = Mockito.mock(AbstractLSMMemoryComponent.class);
@@ -153,9 +149,6 @@
idGenerator.refresh();
long flushLsn = 1L;
ILSMComponentId nextComponentId = idGenerator.getId();
- Map<String, Object> flushMap = new HashMap<>();
- flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn);
- flushMap.put(LSMIOOperationCallback.KEY_NEXT_COMPONENT_ID, nextComponentId);
callback.allocated(mockComponent);
callback.recycled(mockComponent);
checkMemoryComponent(initialId, mockComponent);
@@ -165,7 +158,7 @@
public void testRecycleComponentId() throws HyracksDataException {
int numMemoryComponents = 2;
DatasetInfo dsInfo = new DatasetInfo(101, null);
- ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(numMemoryComponents);
+ ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(numMemoryComponents, MIN_VALID_COMPONENT_ID);
ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(numMemoryComponents);
ILSMMemoryComponent mockComponent = Mockito.mock(AbstractLSMMemoryComponent.class);
@@ -223,7 +216,8 @@
IIndexCheckpointManagerProvider indexCheckpointManagerProvider =
Mockito.mock(IIndexCheckpointManagerProvider.class);
IIndexCheckpointManager indexCheckpointManager = Mockito.mock(IIndexCheckpointManager.class);
- Mockito.doNothing().when(indexCheckpointManager).flushed(Mockito.any(), Mockito.anyLong(), Mockito.anyLong());
+ Mockito.doNothing().when(indexCheckpointManager).flushed(Mockito.anyLong(), Mockito.anyLong(),
+ Mockito.anyLong());
Mockito.doReturn(indexCheckpointManager).when(indexCheckpointManagerProvider).get(Mockito.any());
return indexCheckpointManagerProvider;
}
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/OperatorExpressionVisitor.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/OperatorExpressionVisitor.java
index 372f0fa..23b4d60 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/OperatorExpressionVisitor.java
+++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/OperatorExpressionVisitor.java
@@ -136,13 +136,13 @@
Expression left = operatorExpr.getExprList().get(1);
Expression right = operatorExpr.getExprList().get(2);
- // Creates the expression left <= target.
- Expression leftComparison =
- createLessThanExpression(left, target, operatorExpr.getHints(), operatorExpr.getSourceLocation());
+ // Creates the expression target >= left.
+ Expression leftComparison = createOperatorExpression(OperatorType.GE, target, left, operatorExpr.getHints(),
+ operatorExpr.getSourceLocation());
// Creates the expression target <= right.
Expression targetCopy = (Expression) SqlppRewriteUtil.deepCopy(target);
- Expression rightComparison =
- createLessThanExpression(targetCopy, right, operatorExpr.getHints(), operatorExpr.getSourceLocation());
+ Expression rightComparison = createOperatorExpression(OperatorType.LE, targetCopy, right,
+ operatorExpr.getHints(), operatorExpr.getSourceLocation());
OperatorExpr andExpr = new OperatorExpr();
andExpr.addOperand(leftComparison);
andExpr.addOperand(rightComparison);
@@ -158,12 +158,12 @@
}
}
- private Expression createLessThanExpression(Expression lhs, Expression rhs, List<IExpressionAnnotation> hints,
- SourceLocation sourceLoc) {
+ private Expression createOperatorExpression(OperatorType opType, Expression lhs, Expression rhs,
+ List<IExpressionAnnotation> hints, SourceLocation sourceLoc) {
OperatorExpr comparison = new OperatorExpr();
comparison.addOperand(lhs);
comparison.addOperand(rhs);
- comparison.addOperator(OperatorType.LE);
+ comparison.addOperator(opType);
comparison.setSourceLocation(sourceLoc);
if (hints != null) {
for (IExpressionAnnotation hint : hints) {
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/common/TypeResolverUtil.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/common/TypeResolverUtil.java
index 561df5e..b7bd8ca 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/common/TypeResolverUtil.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/common/TypeResolverUtil.java
@@ -106,7 +106,7 @@
// Gets the actual types for UNIONs and mark unknownable to be true.
if (leftTypeTag == ATypeTag.UNION || rightTypeTag == ATypeTag.UNION) {
leftType = TypeComputeUtils.getActualType(leftType);
- rightType = TypeComputeUtils.getActualType(leftType);
+ rightType = TypeComputeUtils.getActualType(rightType);
leftTypeTag = leftType.getTypeTag();
rightTypeTag = rightType.getTypeTag();
unknownable = true;
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
index d8eeefc..49db062 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
@@ -68,6 +68,7 @@
import org.apache.asterix.om.typecomputer.impl.CollectionMemberResultType;
import org.apache.asterix.om.typecomputer.impl.CollectionToSequenceTypeComputer;
import org.apache.asterix.om.typecomputer.impl.ConcatNonNullTypeComputer;
+import org.apache.asterix.om.typecomputer.impl.ConcatTypeComputer;
import org.apache.asterix.om.typecomputer.impl.DoubleIfTypeComputer;
import org.apache.asterix.om.typecomputer.impl.FieldAccessByIndexResultType;
import org.apache.asterix.om.typecomputer.impl.FieldAccessByNameResultType;
@@ -1240,7 +1241,7 @@
addFunction(BINARY_LENGTH, UnaryBinaryInt64TypeComputer.INSTANCE, true);
addFunction(PARSE_BINARY, ABinaryTypeComputer.INSTANCE, true);
addFunction(PRINT_BINARY, AStringTypeComputer.INSTANCE, true);
- addFunction(BINARY_CONCAT, ABinaryTypeComputer.INSTANCE, true);
+ addFunction(BINARY_CONCAT, ConcatTypeComputer.INSTANCE_BINARY, true);
addFunction(SUBBINARY_FROM, ABinaryTypeComputer.INSTANCE, true);
addFunction(SUBBINARY_FROM_TO, ABinaryTypeComputer.INSTANCE, true);
addFunction(FIND_BINARY, AInt64TypeComputer.INSTANCE, true);
@@ -1251,7 +1252,7 @@
addFunction(STRING_CONTAINS, ABooleanTypeComputer.INSTANCE, true);
addFunction(STRING_TO_CODEPOINT, StringToInt64ListTypeComputer.INSTANCE, true);
addFunction(CODEPOINT_TO_STRING, AStringTypeComputer.INSTANCE, true);
- addFunction(STRING_CONCAT, AStringTypeComputer.INSTANCE, true);
+ addFunction(STRING_CONCAT, ConcatTypeComputer.INSTANCE_STRING, true);
addFunction(SUBSTRING2, StringIntToStringTypeComputer.INSTANCE_NULLABLE, true);
addFunction(STRING_LENGTH, UnaryStringInt64TypeComputer.INSTANCE, true);
addFunction(STRING_LOWERCASE, StringStringTypeComputer.INSTANCE, true);
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/ClosedRecordConstructorResultType.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/ClosedRecordConstructorResultType.java
index d5e576e..3500435 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/ClosedRecordConstructorResultType.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/ClosedRecordConstructorResultType.java
@@ -21,6 +21,8 @@
import java.util.Iterator;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.om.exceptions.InvalidExpressionException;
import org.apache.asterix.om.typecomputer.base.IResultTypeComputer;
import org.apache.asterix.om.typecomputer.base.TypeCastUtils;
@@ -68,12 +70,18 @@
AUnionType unionType = (AUnionType) e2Type;
e2Type = AUnionType.createUnknownableType(unionType.getActualType());
}
- fieldTypes[i] = e2Type;
- fieldNames[i] = ConstantExpressionUtil.getStringConstant(e1);
- if (fieldNames[i] == null) {
+ String fieldName = ConstantExpressionUtil.getStringConstant(e1);
+ if (fieldName == null) {
throw new InvalidExpressionException(f.getSourceLocation(), funcName, 2 * i, e1,
LogicalExpressionTag.CONSTANT);
}
+ for (int j = 0; j < i; j++) {
+ if (fieldName.equals(fieldNames[j])) {
+ throw new CompilationException(ErrorCode.DUPLICATE_FIELD_NAME, f.getSourceLocation(), fieldName);
+ }
+ }
+ fieldTypes[i] = e2Type;
+ fieldNames[i] = fieldName;
i++;
}
return new ARecordType(null, fieldNames, fieldTypes, false);
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/ConcatTypeComputer.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/ConcatTypeComputer.java
new file mode 100644
index 0000000..db59877
--- /dev/null
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/ConcatTypeComputer.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.om.typecomputer.impl;
+
+import org.apache.asterix.om.typecomputer.base.AbstractResultTypeComputer;
+import org.apache.asterix.om.types.AUnionType;
+import org.apache.asterix.om.types.AbstractCollectionType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+
+public class ConcatTypeComputer extends AbstractResultTypeComputer {
+
+ public static final ConcatTypeComputer INSTANCE_STRING = new ConcatTypeComputer(BuiltinType.ASTRING);
+
+ public static final ConcatTypeComputer INSTANCE_BINARY = new ConcatTypeComputer(BuiltinType.ABINARY);
+
+ private final IAType resultType;
+
+ private ConcatTypeComputer(IAType resultType) {
+ this.resultType = resultType;
+ }
+
+ @Override
+ protected IAType getResultType(ILogicalExpression expr, IAType... strippedInputTypes) throws AlgebricksException {
+ IAType argType = strippedInputTypes[0];
+ IAType outputType = resultType;
+ if (!argType.getTypeTag().isListType() || isUnknownable(((AbstractCollectionType) argType).getItemType())) {
+ outputType = AUnionType.createUnknownableType(outputType);
+ }
+ return outputType;
+ }
+
+ private boolean isUnknownable(IAType type) {
+ switch (type.getTypeTag()) {
+ case ANY:
+ case MISSING:
+ case NULL:
+ return true;
+ case UNION:
+ return ((AUnionType) type).isUnknownableType();
+ default:
+ return false;
+ }
+ }
+}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/OpenRecordConstructorResultType.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/OpenRecordConstructorResultType.java
index 1442dfe..6deb17c 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/OpenRecordConstructorResultType.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/OpenRecordConstructorResultType.java
@@ -25,6 +25,8 @@
import java.util.List;
import java.util.Set;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.om.typecomputer.base.IResultTypeComputer;
import org.apache.asterix.om.typecomputer.base.TypeCastUtils;
import org.apache.asterix.om.types.ARecordType;
@@ -72,6 +74,9 @@
IAType t2 = (IAType) env.getType(e2);
String fieldName = ConstantExpressionUtil.getStringConstant(e1);
if (fieldName != null && t2 != null && TypeHelper.isClosed(t2)) {
+ if (namesList.contains(fieldName)) {
+ throw new CompilationException(ErrorCode.DUPLICATE_FIELD_NAME, f.getSourceLocation(), fieldName);
+ }
namesList.add(fieldName);
if (t2.getTypeTag() == ATypeTag.UNION) {
AUnionType unionType = (AUnionType) t2;
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
index d4d601c..448613b 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
@@ -36,6 +36,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
+import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
import org.apache.hyracks.storage.common.LocalResource;
/**
@@ -62,20 +63,19 @@
DatasetResourceReference ref = DatasetResourceReference.of(ls);
final IIndexCheckpointManager indexCheckpointManager = indexCheckpointManagerProvider.get(ref);
indexCheckpointManager.delete();
- // Get most recent timestamp of existing files to avoid deletion
+ // Get most recent sequence of existing files to avoid deletion
Path indexPath = StoragePathUtil.getIndexPath(ioManager, ref);
String[] files = indexPath.toFile().list(AbstractLSMIndexFileManager.COMPONENT_FILES_FILTER);
if (files == null) {
throw HyracksDataException
.create(new IOException(indexPath + " is not a directory or an IO Error occurred"));
}
- String mostRecentTimestamp = null;
+ long maxComponentSequence = Long.MIN_VALUE;
for (String file : files) {
- String nextTimeStamp = AbstractLSMIndexFileManager.getComponentEndTime(file);
- mostRecentTimestamp = mostRecentTimestamp == null || nextTimeStamp.compareTo(mostRecentTimestamp) > 0
- ? nextTimeStamp : mostRecentTimestamp;
+ maxComponentSequence =
+ Math.max(maxComponentSequence, IndexComponentFileReference.of(file).getSequenceEnd());
}
- indexCheckpointManager.init(mostRecentTimestamp, currentLSN);
+ indexCheckpointManager.init(maxComponentSequence, currentLSN);
}
ReplicationProtocol.sendAck(worker.getChannel(), worker.getReusableBuffer());
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java
index d5dc51d..55dd5d4 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java
@@ -63,8 +63,8 @@
final IIOManager ioManager = appCtx.getIoManager();
final FileReference localPath = ioManager.resolve(componentFile);
final Path resourceDir = Files.createDirectories(localPath.getFile().getParentFile().toPath());
- final String componentId = PersistentLocalResourceRepository.getComponentId(componentFile);
- return Paths.get(resourceDir.toString(), StorageConstants.COMPONENT_MASK_FILE_PREFIX + componentId);
+ final String componentSequence = PersistentLocalResourceRepository.getComponentSequence(componentFile);
+ return Paths.get(resourceDir.toString(), StorageConstants.COMPONENT_MASK_FILE_PREFIX + componentSequence);
}
@Override
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
index a4f9b43..b360a09 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
@@ -37,6 +37,7 @@
import org.apache.asterix.replication.sync.IndexSynchronizer;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
+import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
/**
* A task to mark a replicated LSM component as valid
@@ -57,7 +58,7 @@
public void perform(INcApplicationContext appCtx, IReplicationWorker worker) {
try {
if (masterLsn == IndexSynchronizer.BULKLOAD_LSN) {
- updateBulkLoadedLastComponentTimestamp(appCtx);
+ updateBulkLoadedLastComponentSequence(appCtx);
} else if (masterLsn != IndexSynchronizer.MERGE_LSN) {
ensureComponentLsnFlushed(appCtx);
}
@@ -70,13 +71,12 @@
}
}
- private void updateBulkLoadedLastComponentTimestamp(INcApplicationContext appCtx) throws HyracksDataException {
+ private void updateBulkLoadedLastComponentSequence(INcApplicationContext appCtx) throws HyracksDataException {
final ResourceReference indexRef = ResourceReference.of(file);
final IIndexCheckpointManagerProvider checkpointManagerProvider = appCtx.getIndexCheckpointManagerProvider();
final IIndexCheckpointManager indexCheckpointManager = checkpointManagerProvider.get(indexRef);
- final String componentEndTime = AbstractLSMIndexFileManager.getComponentEndTime(indexRef.getName());
- indexCheckpointManager.advanceValidComponentTimestamp(componentEndTime);
-
+ final long componentSequence = IndexComponentFileReference.of(indexRef.getName()).getSequenceEnd();
+ indexCheckpointManager.advanceValidComponentSequence(componentSequence);
}
private void ensureComponentLsnFlushed(INcApplicationContext appCtx)
@@ -95,8 +95,8 @@
indexCheckpointManager.wait(replicationTimeOut);
replicationTimeOut -= TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
}
- final String componentEndTime = AbstractLSMIndexFileManager.getComponentEndTime(indexRef.getName());
- indexCheckpointManager.replicated(componentEndTime, masterLsn, lastComponentId);
+ final long componentSequence = IndexComponentFileReference.of(indexRef.getName()).getSequenceEnd();
+ indexCheckpointManager.replicated(componentSequence, masterLsn, lastComponentId);
}
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
index 20663d1..f53d448 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
@@ -98,7 +98,7 @@
final IIndexCheckpointManager indexCheckpointManager = checkpointManagerProvider.get(indexRef);
final long currentLSN = appCtx.getTransactionSubsystem().getLogManager().getAppendLSN();
indexCheckpointManager.delete();
- indexCheckpointManager.init(null, currentLSN);
+ indexCheckpointManager.init(Long.MIN_VALUE, currentLSN);
LOGGER.info(() -> "Checkpoint index: " + indexRef);
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/StringConcatDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/StringConcatDescriptor.java
index cb213ae..a2872bf 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/StringConcatDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/StringConcatDescriptor.java
@@ -94,6 +94,7 @@
listAccessor.reset(listBytes, listOffset);
// calculate length first
int utf8Len = 0;
+ boolean itemIsNull = false;
for (int i = 0; i < listAccessor.size(); i++) {
int itemOffset = listAccessor.getItemOffset(i);
ATypeTag itemType = listAccessor.getItemType(itemOffset);
@@ -104,9 +105,8 @@
}
if (itemType != ATypeTag.STRING) {
if (itemType == ATypeTag.NULL) {
- nullSerde.serialize(ANull.NULL, out);
- result.set(resultStorage);
- return;
+ itemIsNull = true;
+ continue;
}
if (itemType == ATypeTag.MISSING) {
missingSerde.serialize(AMissing.MISSING, out);
@@ -118,6 +118,11 @@
}
utf8Len += UTF8StringUtil.getUTFLength(listBytes, itemOffset);
}
+ if (itemIsNull) {
+ nullSerde.serialize(ANull.NULL, out);
+ result.set(resultStorage);
+ return;
+ }
out.writeByte(ATypeTag.SERIALIZED_STRING_TYPE_TAG);
int cbytes = UTF8StringUtil.encodeUTF8Length(utf8Len, tempLengthArray, 0);
out.write(tempLengthArray, 0, cbytes);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/binary/BinaryConcatDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/binary/BinaryConcatDescriptor.java
index 59f3396..4eaa1d1 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/binary/BinaryConcatDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/binary/BinaryConcatDescriptor.java
@@ -94,11 +94,17 @@
try {
listAccessor.reset(data, offset);
int concatLength = 0;
+ boolean itemIsNull = false;
for (int i = 0; i < listAccessor.size(); i++) {
int itemOffset = listAccessor.getItemOffset(i);
ATypeTag itemType = listAccessor.getItemType(itemOffset);
if (itemType != ATypeTag.BINARY) {
- if (serializeUnknownIfAnyUnknown(itemType)) {
+ if (itemType == ATypeTag.NULL) {
+ itemIsNull = true;
+ continue;
+ }
+ if (itemType == ATypeTag.MISSING) {
+ missingSerde.serialize(AMissing.MISSING, dataOutput);
result.set(resultStorage);
return;
}
@@ -107,6 +113,11 @@
}
concatLength += ByteArrayPointable.getContentLength(data, itemOffset);
}
+ if (itemIsNull) {
+ nullSerde.serialize(ANull.NULL, dataOutput);
+ result.set(resultStorage);
+ return;
+ }
dataOutput.writeByte(ATypeTag.SERIALIZED_BINARY_TYPE_TAG);
int metaLen = VarLenIntEncoderDecoder.encode(concatLength, metaBuffer, 0);
dataOutput.write(metaBuffer, 0, metaLen);
@@ -122,20 +133,6 @@
}
result.set(resultStorage);
}
-
- private boolean serializeUnknownIfAnyUnknown(ATypeTag... tags) throws HyracksDataException {
- for (ATypeTag typeTag : tags) {
- if (typeTag == ATypeTag.NULL) {
- nullSerde.serialize(ANull.NULL, dataOutput);
- return true;
- }
- if (typeTag == ATypeTag.MISSING) {
- missingSerde.serialize(AMissing.MISSING, dataOutput);
- return true;
- }
- }
- return false;
- }
};
}
};
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
index 8ea1fa7..b123a5e 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
@@ -26,12 +26,15 @@
import org.apache.hyracks.api.job.resource.IJobCapacityController;
import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity;
import org.apache.hyracks.control.cc.scheduler.IResourceManager;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
// To avoid the computation cost for checking the capacity constraint for each node,
// currently the admit/allocation decisions are based on the aggregated resource information.
// TODO(buyingyi): investigate partition-aware resource control.
public class JobCapacityController implements IJobCapacityController {
+ private static final Logger LOGGER = LogManager.getLogger();
private final IResourceManager resourceManager;
public JobCapacityController(IResourceManager resourceManager) {
@@ -71,6 +74,16 @@
int aggregatedNumCores = currentCapacity.getAggregatedCores();
currentCapacity.setAggregatedMemoryByteSize(aggregatedMemoryByteSize + reqAggregatedMemoryByteSize);
currentCapacity.setAggregatedCores(aggregatedNumCores + reqAggregatedNumCores);
+ ensureMaxCapacity();
}
+ private void ensureMaxCapacity() {
+ final IClusterCapacity currentCapacity = resourceManager.getCurrentCapacity();
+ final IReadOnlyClusterCapacity maximumCapacity = resourceManager.getMaximumCapacity();
+ if (currentCapacity.getAggregatedCores() > maximumCapacity.getAggregatedCores()
+ || currentCapacity.getAggregatedMemoryByteSize() > maximumCapacity.getAggregatedMemoryByteSize()) {
+ LOGGER.warn("Current cluster available capacity {} is more than its maximum capacity {}", currentCapacity,
+ maximumCapacity);
+ }
+ }
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java
index e353714..ab84211 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java
@@ -49,7 +49,7 @@
IDatasetLifecycleManager dslcManager =
((INcApplicationContext) ctx.getApplicationContext()).getDatasetLifecycleManager();
int partition = StoragePathUtil.getPartitionNumFromRelativePath(resource.getPath());
- return dslcManager.getOperationTracker(datasetId, partition);
+ return dslcManager.getOperationTracker(datasetId, partition, resource.getPath());
}
@Override
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index f9718c4..c0da095 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -22,7 +22,6 @@
import static org.apache.asterix.common.utils.StorageConstants.METADATA_FILE_NAME;
import static org.apache.hyracks.api.exceptions.ErrorCode.CANNOT_CREATE_FILE;
import static org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager.COMPONENT_FILES_FILTER;
-import static org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager.COMPONENT_TIMESTAMP_FORMAT;
import java.io.File;
import java.io.FilenameFilter;
@@ -30,12 +29,9 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
-import java.text.Format;
import java.text.ParseException;
-import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -71,6 +67,7 @@
import org.apache.hyracks.api.util.IoUtil;
import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
+import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
import org.apache.hyracks.storage.common.ILocalResourceRepository;
import org.apache.hyracks.storage.common.LocalResource;
import org.apache.hyracks.util.ExitUtil;
@@ -129,9 +126,6 @@
}
};
- private static final ThreadLocal<SimpleDateFormat> THREAD_LOCAL_FORMATTER =
- ThreadLocal.withInitial(() -> new SimpleDateFormat(COMPONENT_TIMESTAMP_FORMAT));
-
// Finals
private final IIOManager ioManager;
private final Cache<String, LocalResource> resourceCache;
@@ -202,7 +196,7 @@
byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(resource.toJson(persistedResourceRegistry));
final Path path = Paths.get(resourceFile.getAbsolutePath());
Files.write(path, bytes);
- indexCheckpointManagerProvider.get(DatasetResourceReference.of(resource)).init(null, 0);
+ indexCheckpointManagerProvider.get(DatasetResourceReference.of(resource)).init(Long.MIN_VALUE, 0);
deleteResourceFileMask(resourceFile);
} catch (Exception e) {
cleanup(resourceFile);
@@ -481,30 +475,18 @@
}
private void deleteIndexInvalidComponents(File index) throws IOException, ParseException {
- final Format formatter = THREAD_LOCAL_FORMATTER.get();
final File[] indexComponentFiles = index.listFiles(COMPONENT_FILES_FILTER);
if (indexComponentFiles == null) {
throw new IOException(index + " doesn't exist or an IO error occurred");
}
- final Optional<String> validComponentTimestamp = getIndexCheckpointManager(index).getValidComponentTimestamp();
- if (!validComponentTimestamp.isPresent()) {
- // index doesn't have any valid component, delete all
- for (File componentFile : indexComponentFiles) {
+ final long validComponentSequence = getIndexCheckpointManager(index).getValidComponentSequence();
+ for (File componentFile : indexComponentFiles) {
+ // delete any file with start sequence > valid component sequence
+ final long fileStart = IndexComponentFileReference.of(componentFile.getName()).getSequenceStart();
+ if (fileStart > validComponentSequence) {
LOGGER.info(() -> "Deleting invalid component file: " + componentFile.getAbsolutePath());
Files.delete(componentFile.toPath());
}
- } else {
- final Date validTimestamp = (Date) formatter.parseObject(validComponentTimestamp.get());
- for (File componentFile : indexComponentFiles) {
- // delete any file with startTime > validTimestamp
- final String fileStartTimeStr =
- AbstractLSMIndexFileManager.getComponentStartTime(componentFile.getName());
- final Date fileStartTime = (Date) formatter.parseObject(fileStartTimeStr);
- if (fileStartTime.after(validTimestamp)) {
- LOGGER.info(() -> "Deleting invalid component file: " + componentFile.getAbsolutePath());
- Files.delete(componentFile.toPath());
- }
- }
}
}
@@ -545,8 +527,8 @@
long fileSize = file.length();
totalSize += fileSize;
if (isComponentFile(resolvedPath.getFile(), file.getName())) {
- String componentId = getComponentId(file.getAbsolutePath());
- componentsStats.put(componentId, componentsStats.getOrDefault(componentId, 0L) + fileSize);
+ String componentSeq = getComponentSequence(file.getAbsolutePath());
+ componentsStats.put(componentSeq, componentsStats.getOrDefault(componentSeq, 0L) + fileSize);
}
}
}
@@ -576,17 +558,16 @@
}
/**
- * Gets a component id based on its unique timestamp.
- * e.g. a component file 2018-01-08-01-08-50-439_2018-01-08-01-08-50-439_b
- * will return a component id 2018-01-08-01-08-50-439_2018-01-08-01-08-50-439
+ * Gets a component sequence based on its unique timestamp.
+ * e.g. a component file 1_3_b
+ * will return a component sequence 1_3
*
- * @param componentFile
- * any component file
- * @return The component id
+ * @param componentFile any component file
+ * @return The component sequence
*/
- public static String getComponentId(String componentFile) {
+ public static String getComponentSequence(String componentFile) {
final ResourceReference ref = ResourceReference.of(componentFile);
- return ref.getName().substring(0, ref.getName().lastIndexOf(AbstractLSMIndexFileManager.DELIMITER));
+ return IndexComponentFileReference.of(ref.getName()).getSequence();
}
private static boolean isComponentMask(File mask) {
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
index ff1e47f..962f826 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
@@ -97,4 +97,11 @@
* @throws HyracksDataException
*/
FileReference createWorkspaceFile(String prefix) throws HyracksDataException;
+
+ /**
+ * Gets the total disk usage in bytes of this {@link IIOManager} io devices handles.
+ *
+ * @return the total disk usage in bytes
+ */
+ long getTotalDiskUsage();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
index 334fb5c..261e7c4 100644
--- a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
+++ b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
@@ -18,9 +18,11 @@
*/
package org.apache.hyracks.comm.channels;
+import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Deque;
+import java.util.Objects;
import org.apache.hyracks.api.comm.IBufferAcceptor;
import org.apache.hyracks.api.comm.IFrameWriter;
@@ -75,7 +77,15 @@
break;
}
try {
- wait();
+ InetSocketAddress remoteAddress = ccb.getRemoteAddress();
+ String nameBefore = Thread.currentThread().getName();
+ try {
+ Thread.currentThread()
+ .setName(nameBefore + ":SendingTo(" + Objects.toString(remoteAddress) + ")");
+ wait();
+ } finally {
+ Thread.currentThread().setName(nameBefore);
+ }
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw HyracksDataException.create(e);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index f7067ff..b2e4a5e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -216,7 +216,7 @@
webServer.start();
info = new ClusterControllerInfo(ccId, ccConfig.getClientPublicAddress(), ccConfig.getClientPublicPort(),
ccConfig.getConsolePublicPort());
- timer.schedule(sweeper, 0, ccConfig.getHeartbeatPeriodMillis());
+ timer.schedule(sweeper, 0, ccConfig.getDeadNodeSweepThreshold());
jobLog.open();
startApplication();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index 7dc636c..7e1ca61 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -229,8 +229,9 @@
}
run.setStatus(run.getPendingStatus(), run.getPendingExceptions());
run.setEndTime(System.currentTimeMillis());
- if (activeRunMap.remove(jobId) == null) {
- LOGGER.warn("Job {} was not found running but is getting archived and capacity released", jobId);
+ if (activeRunMap.remove(jobId) != null) {
+ // non-active jobs have zero capacity
+ releaseJobCapacity(run);
}
runMapArchive.put(jobId, run);
runMapHistory.put(jobId, run.getExceptions());
@@ -247,10 +248,6 @@
}
}
- // Releases cluster capacitys occupied by the job.
- JobSpecification job = run.getJobSpecification();
- jobCapacityController.release(job);
-
// Picks the next job to execute.
pickJobsToRun();
@@ -347,4 +344,9 @@
throw HyracksException.create(ErrorCode.INVALID_INPUT_PARAMETER);
}
}
+
+ private void releaseJobCapacity(JobRun jobRun) {
+ final JobSpecification job = jobRun.getJobSpecification();
+ jobCapacityController.release(job);
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
index f83df6e..1cb2d05 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
@@ -60,6 +60,7 @@
CONSOLE_PUBLIC_PORT(INTEGER, CONSOLE_LISTEN_PORT),
HEARTBEAT_PERIOD(LONG, 10000L), // TODO (mblow): add time unit
HEARTBEAT_MAX_MISSES(INTEGER, 5),
+ DEAD_NODE_SWEEP_THRESHOLD(LONG, HEARTBEAT_PERIOD),
PROFILE_DUMP_PERIOD(INTEGER, 0),
JOB_HISTORY_SIZE(INTEGER, 10),
RESULT_TTL(LONG, 86400000L), // TODO(mblow): add time unit
@@ -154,7 +155,9 @@
case HEARTBEAT_PERIOD:
return "Sets the time duration between two heartbeats from each node controller in milliseconds";
case HEARTBEAT_MAX_MISSES:
- return "Sets the maximum number of missed heartbeats before a node is marked as dead";
+ return "Sets the maximum number of missed heartbeats before a node can be considered dead";
+ case DEAD_NODE_SWEEP_THRESHOLD:
+ return "Sets the frequency (in milliseconds) to process nodes that can be considered dead";
case PROFILE_DUMP_PERIOD:
return "Sets the time duration between two profile dumps from each node controller in "
+ "milliseconds; 0 to disable";
@@ -326,6 +329,14 @@
configManager.set(Option.HEARTBEAT_MAX_MISSES, heartbeatMaxMisses);
}
+ public long getDeadNodeSweepThreshold() {
+ return getAppConfig().getLong(Option.DEAD_NODE_SWEEP_THRESHOLD);
+ }
+
+ public void setDeadNodeSweepThreshold(long deadNodeSweepThreshold) {
+ configManager.set(Option.DEAD_NODE_SWEEP_THRESHOLD, deadNodeSweepThreshold);
+ }
+
public int getProfileDumpPeriod() {
return getAppConfig().getInt(Option.PROFILE_DUMP_PERIOD);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatManager.java
index 0ea6399..08b8c11 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatManager.java
@@ -63,7 +63,6 @@
}
public void notifyAck(HyracksDataException exception) {
- // TODO: we should also reregister in case of no ack
LOGGER.debug("ack rec'd from {} w/ exception: {}", ccId::toString, () -> String.valueOf(exception));
if (exception != null && exception.matches(ErrorCode.HYRACKS, ErrorCode.NO_SUCH_NODE)) {
LOGGER.info("{} indicates it does not recognize us; force a reconnect", ccId);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
index d13ca8d..b5cb21a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
@@ -36,6 +36,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FileUtils;
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
@@ -416,4 +417,13 @@
Thread.currentThread().interrupt();
}
}
+
+ @Override
+ public long getTotalDiskUsage() {
+ long totalSize = 0;
+ for (IODeviceHandle handle : ioDevices) {
+ totalSize += FileUtils.sizeOfDirectory(handle.getMount());
+ }
+ return totalSize;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
index 998acfb..f7ef2aa 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.net.protocols.muxdemux;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -163,4 +164,8 @@
+ " remoteClose: " + remoteClose + " remoteCloseAck:" + remoteCloseAck + " readCredits: "
+ ri.getCredits() + " writeCredits: " + wi.getCredits() + "]";
}
+
+ public InetSocketAddress getRemoteAddress() {
+ return cSet.getMultiplexedConnection().getRemoteAddress();
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java
index 31a37ef..f5cdf2c 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java
@@ -239,4 +239,8 @@
ccbArray = Arrays.copyOf(ccbArray, ccbArray.length * 2);
}
}
+
+ public MultiplexedConnection getMultiplexedConnection() {
+ return mConn;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
index 049cfd8..32bf77e 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
@@ -21,8 +21,8 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
-import java.util.ArrayDeque;
-import java.util.Deque;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
import org.apache.hyracks.api.comm.IBufferFactory;
import org.apache.hyracks.api.comm.IChannelControlBlock;
@@ -33,22 +33,20 @@
public class FullFrameChannelReadInterface extends AbstractChannelReadInterface {
private static final Logger LOGGER = LogManager.getLogger();
- private final Deque<ByteBuffer> riEmptyStack;
+ private final BlockingDeque<ByteBuffer> riEmptyStack;
private final IChannelControlBlock ccb;
FullFrameChannelReadInterface(IChannelControlBlock ccb) {
this.ccb = ccb;
- riEmptyStack = new ArrayDeque<>();
+ riEmptyStack = new LinkedBlockingDeque<>();
credits = 0;
emptyBufferAcceptor = buffer -> {
- int delta = buffer.remaining();
- synchronized (ccb) {
- if (ccb.isRemotelyClosed()) {
- return;
- }
- riEmptyStack.push(buffer);
+ if (ccb.isRemotelyClosed()) {
+ return;
}
+ riEmptyStack.push(buffer);
+ final int delta = buffer.remaining();
ccb.addPendingCredits(delta);
};
}
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
index b6a392e..4c3836a 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.net.protocols.muxdemux;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
@@ -437,4 +438,8 @@
}
}
}
+
+ public InetSocketAddress getRemoteAddress() {
+ return tcpConnection == null ? null : tcpConnection.getRemoteAddress();
+ }
}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
index 7fbde73..2240fd9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
@@ -34,6 +34,7 @@
import org.apache.hyracks.api.util.IoUtil;
import org.apache.hyracks.storage.am.common.api.ITreeIndex;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
+import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
import org.apache.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
@@ -53,21 +54,14 @@
@Override
public LSMComponentFileReferences getRelFlushFileReference() throws HyracksDataException {
- String ts = getCurrentTimestamp();
- String baseName = ts + DELIMITER + ts;
- // Begin timestamp and end timestamp are identical since it is a flush
+ String baseName = getNextComponentSequence(btreeFilter);
return new LSMComponentFileReferences(baseDir.getChild(baseName + DELIMITER + BTREE_SUFFIX), null,
hasBloomFilter ? baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX) : null);
}
@Override
- public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName)
- throws HyracksDataException {
- String[] firstTimestampRange = firstFileName.split(DELIMITER);
- String[] lastTimestampRange = lastFileName.split(DELIMITER);
-
- String baseName = firstTimestampRange[0] + DELIMITER + lastTimestampRange[1];
- // Get the range of timestamps by taking the earliest and the latest timestamps
+ public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName) {
+ final String baseName = IndexComponentFileReference.getMergeSequence(firstFileName, lastFileName);
return new LSMComponentFileReferences(baseDir.getChild(baseName + DELIMITER + BTREE_SUFFIX), null,
hasBloomFilter ? baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX) : null);
}
@@ -75,17 +69,17 @@
@Override
public List<LSMComponentFileReferences> cleanupAndGetValidFiles() throws HyracksDataException {
List<LSMComponentFileReferences> validFiles = new ArrayList<>();
- ArrayList<ComparableFileName> allBTreeFiles = new ArrayList<>();
- ArrayList<ComparableFileName> allBloomFilterFiles = new ArrayList<>();
+ ArrayList<IndexComponentFileReference> allBTreeFiles = new ArrayList<>();
+ ArrayList<IndexComponentFileReference> allBloomFilterFiles = new ArrayList<>();
// create transaction filter <to hide transaction files>
FilenameFilter transactionFilter = getTransactionFileFilter(false);
// List of valid BTree files.
cleanupAndGetValidFilesInternal(getCompoundFilter(transactionFilter, btreeFilter), btreeFactory, allBTreeFiles,
btreeFactory.getBufferCache());
HashSet<String> btreeFilesSet = new HashSet<>();
- for (ComparableFileName cmpFileName : allBTreeFiles) {
- int index = cmpFileName.fileName.lastIndexOf(DELIMITER);
- btreeFilesSet.add(cmpFileName.fileName.substring(0, index));
+ for (IndexComponentFileReference cmpFileName : allBTreeFiles) {
+ int index = cmpFileName.getFileName().lastIndexOf(DELIMITER);
+ btreeFilesSet.add(cmpFileName.getFileName().substring(0, index));
}
if (hasBloomFilter) {
@@ -104,53 +98,51 @@
// Special case: sorting is not required
if (allBTreeFiles.size() == 1 && (!hasBloomFilter || allBloomFilterFiles.size() == 1)) {
- validFiles.add(new LSMComponentFileReferences(allBTreeFiles.get(0).fileRef, null,
- hasBloomFilter ? allBloomFilterFiles.get(0).fileRef : null));
+ validFiles.add(new LSMComponentFileReferences(allBTreeFiles.get(0).getFileRef(), null,
+ hasBloomFilter ? allBloomFilterFiles.get(0).getFileRef() : null));
return validFiles;
}
- // Sorts files names from earliest to latest timestamp.
+ // Sorts files names from earliest to latest sequence.
Collections.sort(allBTreeFiles);
if (hasBloomFilter) {
Collections.sort(allBloomFilterFiles);
}
- List<ComparableFileName> validComparableBTreeFiles = new ArrayList<>();
- ComparableFileName lastBTree = allBTreeFiles.get(0);
+ List<IndexComponentFileReference> validComparableBTreeFiles = new ArrayList<>();
+ IndexComponentFileReference lastBTree = allBTreeFiles.get(0);
validComparableBTreeFiles.add(lastBTree);
- List<ComparableFileName> validComparableBloomFilterFiles = null;
- ComparableFileName lastBloomFilter = null;
+ List<IndexComponentFileReference> validComparableBloomFilterFiles = null;
+ IndexComponentFileReference lastBloomFilter = null;
if (hasBloomFilter) {
validComparableBloomFilterFiles = new ArrayList<>();
lastBloomFilter = allBloomFilterFiles.get(0);
validComparableBloomFilterFiles.add(lastBloomFilter);
}
- ComparableFileName currentBTree;
- ComparableFileName currentBloomFilter = null;
+ IndexComponentFileReference currentBTree;
+ IndexComponentFileReference currentBloomFilter = null;
for (int i = 1; i < allBTreeFiles.size(); i++) {
currentBTree = allBTreeFiles.get(i);
if (hasBloomFilter) {
currentBloomFilter = allBloomFilterFiles.get(i);
}
- // Current start timestamp is greater than last stop timestamp.
- if (currentBTree.interval[0].compareTo(lastBTree.interval[1]) > 0
- && (!hasBloomFilter || currentBloomFilter.interval[0].compareTo(lastBloomFilter.interval[1]) > 0)) {
+ // Current start sequence is greater than last stop sequence.
+ if (currentBTree.isMoreRecentThan(lastBTree)
+ && (!hasBloomFilter || currentBloomFilter.isMoreRecentThan(lastBloomFilter))) {
validComparableBTreeFiles.add(currentBTree);
lastBTree = currentBTree;
if (hasBloomFilter) {
validComparableBloomFilterFiles.add(currentBloomFilter);
lastBloomFilter = currentBloomFilter;
}
- } else if (currentBTree.interval[0].compareTo(lastBTree.interval[0]) >= 0
- && currentBTree.interval[1].compareTo(lastBTree.interval[1]) <= 0
- && (!hasBloomFilter || (currentBloomFilter.interval[0].compareTo(lastBloomFilter.interval[0]) >= 0
- && currentBloomFilter.interval[1].compareTo(lastBloomFilter.interval[1]) <= 0))) {
+ } else if (currentBTree.isWithin(lastBTree)
+ && (!hasBloomFilter || currentBloomFilter.isWithin(lastBloomFilter))) {
// Invalid files are completely contained in last interval.
- delete(btreeFactory.getBufferCache(), currentBTree.fullPath);
+ delete(btreeFactory.getBufferCache(), currentBTree.getFullPath());
if (hasBloomFilter) {
- delete(btreeFactory.getBufferCache(), currentBloomFilter.fullPath);
+ delete(btreeFactory.getBufferCache(), currentBloomFilter.getFullPath());
}
} else {
// This scenario should not be possible.
@@ -161,21 +153,21 @@
// Sort valid files in reverse lexicographical order, such that newer
// files come first.
Collections.sort(validComparableBTreeFiles, recencyCmp);
- Iterator<ComparableFileName> btreeFileIter = validComparableBTreeFiles.iterator();
- Iterator<ComparableFileName> bloomFilterFileIter = null;
+ Iterator<IndexComponentFileReference> btreeFileIter = validComparableBTreeFiles.iterator();
+ Iterator<IndexComponentFileReference> bloomFilterFileIter = null;
if (hasBloomFilter) {
Collections.sort(validComparableBloomFilterFiles, recencyCmp);
bloomFilterFileIter = validComparableBloomFilterFiles.iterator();
}
- ComparableFileName cmpBTreeFileName = null;
- ComparableFileName cmpBloomFilterFileName = null;
+ IndexComponentFileReference cmpBTreeFileName = null;
+ IndexComponentFileReference cmpBloomFilterFileName = null;
while (btreeFileIter.hasNext() && (hasBloomFilter ? bloomFilterFileIter.hasNext() : true)) {
cmpBTreeFileName = btreeFileIter.next();
if (hasBloomFilter) {
cmpBloomFilterFileName = bloomFilterFileIter.next();
}
- validFiles.add(new LSMComponentFileReferences(cmpBTreeFileName.fileRef, null,
- hasBloomFilter ? cmpBloomFilterFileName.fileRef : null));
+ validFiles.add(new LSMComponentFileReferences(cmpBTreeFileName.getFileRef(), null,
+ hasBloomFilter ? cmpBloomFilterFileName.getFileRef() : null));
}
return validFiles;
@@ -183,11 +175,10 @@
@Override
public LSMComponentFileReferences getNewTransactionFileReference() throws IOException {
- String ts = getCurrentTimestamp();
+ String sequence = getNextComponentSequence(btreeFilter);
// Create transaction lock file
- IoUtil.create(baseDir.getChild(TXN_PREFIX + ts));
- String baseName = ts + DELIMITER + ts;
- // Begin timestamp and end timestamp are identical since it is a transaction
+ IoUtil.create(baseDir.getChild(TXN_PREFIX + sequence));
+ String baseName = getNextComponentSequence(btreeFilter);
return new LSMComponentFileReferences(baseDir.getChild(baseName + DELIMITER + BTREE_SUFFIX), null,
baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX));
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyFileManager.java
index 23c2367..8fb3751 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyFileManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyFileManager.java
@@ -35,6 +35,7 @@
import org.apache.hyracks.api.util.IoUtil;
import org.apache.hyracks.storage.am.common.api.ITreeIndex;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
+import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
import org.apache.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
@@ -58,23 +59,15 @@
@Override
public LSMComponentFileReferences getRelFlushFileReference() throws HyracksDataException {
- String ts = getCurrentTimestamp();
- String baseName = ts + DELIMITER + ts;
- // Begin timestamp and end timestamp are identical since it is a flush
+ String baseName = getNextComponentSequence(btreeFilter);
return new LSMComponentFileReferences(baseDir.getChild(baseName + DELIMITER + BTREE_SUFFIX),
baseDir.getChild(baseName + DELIMITER + DELETE_TREE_SUFFIX),
baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX));
}
@Override
- public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName)
- throws HyracksDataException {
- String[] firstTimestampRange = firstFileName.split(DELIMITER);
- String[] lastTimestampRange = lastFileName.split(DELIMITER);
-
- String baseName = firstTimestampRange[0] + DELIMITER + lastTimestampRange[1];
- // Get the range of timestamps by taking the earliest and the latest
- // timestamps
+ public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName) {
+ final String baseName = IndexComponentFileReference.getMergeSequence(firstFileName, lastFileName);
return new LSMComponentFileReferences(baseDir.getChild(baseName + DELIMITER + BTREE_SUFFIX),
baseDir.getChild(baseName + DELIMITER + DELETE_TREE_SUFFIX),
baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX));
@@ -83,18 +76,17 @@
@Override
public List<LSMComponentFileReferences> cleanupAndGetValidFiles() throws HyracksDataException {
List<LSMComponentFileReferences> validFiles = new ArrayList<>();
- ArrayList<ComparableFileName> allBTreeFiles = new ArrayList<>();
- ArrayList<ComparableFileName> allBuddyBTreeFiles = new ArrayList<>();
- ArrayList<ComparableFileName> allBloomFilterFiles = new ArrayList<>();
+ ArrayList<IndexComponentFileReference> allBTreeFiles = new ArrayList<>();
+ ArrayList<IndexComponentFileReference> allBuddyBTreeFiles = new ArrayList<>();
+ ArrayList<IndexComponentFileReference> allBloomFilterFiles = new ArrayList<>();
// Create transaction file filter
FilenameFilter transactionFilefilter = getTransactionFileFilter(false);
// Gather files.
cleanupAndGetValidFilesInternal(getCompoundFilter(btreeFilter, transactionFilefilter), btreeFactory,
allBTreeFiles, btreeFactory.getBufferCache());
HashSet<String> btreeFilesSet = new HashSet<>();
- for (ComparableFileName cmpFileName : allBTreeFiles) {
- int index = cmpFileName.fileName.lastIndexOf(DELIMITER);
- btreeFilesSet.add(cmpFileName.fileName.substring(0, index));
+ for (IndexComponentFileReference cmpFileName : allBTreeFiles) {
+ btreeFilesSet.add(cmpFileName.getSequence());
}
validateFiles(btreeFilesSet, allBuddyBTreeFiles, getCompoundFilter(buddyBtreeFilter, transactionFilefilter),
buddyBtreeFactory, btreeFactory.getBufferCache());
@@ -109,52 +101,47 @@
return validFiles;
}
if (allBTreeFiles.size() == 1 && allBuddyBTreeFiles.size() == 1 && allBloomFilterFiles.size() == 1) {
- validFiles.add(new LSMComponentFileReferences(allBTreeFiles.get(0).fileRef,
- allBuddyBTreeFiles.get(0).fileRef, allBloomFilterFiles.get(0).fileRef));
+ validFiles.add(new LSMComponentFileReferences(allBTreeFiles.get(0).getFileRef(),
+ allBuddyBTreeFiles.get(0).getFileRef(), allBloomFilterFiles.get(0).getFileRef()));
return validFiles;
}
- // Sorts files names from earliest to latest timestamp.
+ // Sorts files names from earliest to latest sequence.
Collections.sort(allBTreeFiles);
Collections.sort(allBuddyBTreeFiles);
Collections.sort(allBloomFilterFiles);
- List<ComparableFileName> validComparableBTreeFiles = new ArrayList<>();
- ComparableFileName lastBTree = allBTreeFiles.get(0);
+ List<IndexComponentFileReference> validComparableBTreeFiles = new ArrayList<>();
+ IndexComponentFileReference lastBTree = allBTreeFiles.get(0);
validComparableBTreeFiles.add(lastBTree);
- List<ComparableFileName> validComparableBuddyBTreeFiles = new ArrayList<>();
- ComparableFileName lastBuddyBTree = allBuddyBTreeFiles.get(0);
+ List<IndexComponentFileReference> validComparableBuddyBTreeFiles = new ArrayList<>();
+ IndexComponentFileReference lastBuddyBTree = allBuddyBTreeFiles.get(0);
validComparableBuddyBTreeFiles.add(lastBuddyBTree);
- List<ComparableFileName> validComparableBloomFilterFiles = new ArrayList<>();
- ComparableFileName lastBloomFilter = allBloomFilterFiles.get(0);
+ List<IndexComponentFileReference> validComparableBloomFilterFiles = new ArrayList<>();
+ IndexComponentFileReference lastBloomFilter = allBloomFilterFiles.get(0);
validComparableBloomFilterFiles.add(lastBloomFilter);
for (int i = 1; i < allBTreeFiles.size(); i++) {
- ComparableFileName currentBTree = allBTreeFiles.get(i);
- ComparableFileName currentBuddyBTree = allBuddyBTreeFiles.get(i);
- ComparableFileName currentBloomFilter = allBloomFilterFiles.get(i);
- // Current start timestamp is greater than last stop timestamp.
- if (currentBTree.interval[0].compareTo(lastBTree.interval[1]) > 0
- && currentBuddyBTree.interval[0].compareTo(lastBuddyBTree.interval[1]) > 0
- && currentBloomFilter.interval[0].compareTo(lastBloomFilter.interval[1]) > 0) {
+ IndexComponentFileReference currentBTree = allBTreeFiles.get(i);
+ IndexComponentFileReference currentBuddyBTree = allBuddyBTreeFiles.get(i);
+ IndexComponentFileReference currentBloomFilter = allBloomFilterFiles.get(i);
+ // Current start sequence is greater than last stop sequence
+ if (currentBTree.isMoreRecentThan(lastBTree) && currentBuddyBTree.isMoreRecentThan(lastBuddyBTree)
+ && currentBloomFilter.isMoreRecentThan(lastBloomFilter)) {
validComparableBTreeFiles.add(currentBTree);
validComparableBuddyBTreeFiles.add(currentBuddyBTree);
validComparableBloomFilterFiles.add(currentBloomFilter);
lastBTree = currentBTree;
lastBuddyBTree = currentBuddyBTree;
lastBloomFilter = currentBloomFilter;
- } else if (currentBTree.interval[0].compareTo(lastBTree.interval[0]) >= 0
- && currentBTree.interval[1].compareTo(lastBTree.interval[1]) <= 0
- && currentBuddyBTree.interval[0].compareTo(lastBuddyBTree.interval[0]) >= 0
- && currentBuddyBTree.interval[1].compareTo(lastBuddyBTree.interval[1]) <= 0
- && currentBloomFilter.interval[0].compareTo(lastBloomFilter.interval[0]) >= 0
- && currentBloomFilter.interval[1].compareTo(lastBloomFilter.interval[1]) <= 0) {
- // Invalid files are completely contained in last interval.
- delete(treeFactory.getBufferCache(), currentBTree.fullPath);
- delete(treeFactory.getBufferCache(), currentBuddyBTree.fullPath);
- delete(treeFactory.getBufferCache(), currentBloomFilter.fullPath);
+ } else if (currentBTree.isWithin(lastBTree) && currentBuddyBTree.isWithin(lastBuddyBTree)
+ && currentBloomFilter.isWithin(lastBloomFilter)) {
+ // Invalid files are completely contained in last sequence.
+ delete(treeFactory.getBufferCache(), currentBTree.getFullPath());
+ delete(treeFactory.getBufferCache(), currentBuddyBTree.getFullPath());
+ delete(treeFactory.getBufferCache(), currentBloomFilter.getFullPath());
} else {
// This scenario should not be possible.
throw HyracksDataException.create(ErrorCode.FOUND_OVERLAPPING_LSM_FILES, baseDir);
@@ -163,19 +150,19 @@
// Sort valid files in reverse lexicographical order, such that newer
// files come first.
- Collections.sort(validComparableBTreeFiles, recencyCmp);
- Collections.sort(validComparableBuddyBTreeFiles, recencyCmp);
- Collections.sort(validComparableBloomFilterFiles, recencyCmp);
+ validComparableBTreeFiles.sort(recencyCmp);
+ validComparableBuddyBTreeFiles.sort(recencyCmp);
+ validComparableBloomFilterFiles.sort(recencyCmp);
- Iterator<ComparableFileName> btreeFileIter = validComparableBTreeFiles.iterator();
- Iterator<ComparableFileName> buddyBtreeFileIter = validComparableBuddyBTreeFiles.iterator();
- Iterator<ComparableFileName> bloomFilterFileIter = validComparableBloomFilterFiles.iterator();
+ Iterator<IndexComponentFileReference> btreeFileIter = validComparableBTreeFiles.iterator();
+ Iterator<IndexComponentFileReference> buddyBtreeFileIter = validComparableBuddyBTreeFiles.iterator();
+ Iterator<IndexComponentFileReference> bloomFilterFileIter = validComparableBloomFilterFiles.iterator();
while (btreeFileIter.hasNext() && buddyBtreeFileIter.hasNext()) {
- ComparableFileName cmpBTreeFileName = btreeFileIter.next();
- ComparableFileName cmpBuddyBTreeFileName = buddyBtreeFileIter.next();
- ComparableFileName cmpBloomFilterFileName = bloomFilterFileIter.next();
- validFiles.add(new LSMComponentFileReferences(cmpBTreeFileName.fileRef, cmpBuddyBTreeFileName.fileRef,
- cmpBloomFilterFileName.fileRef));
+ IndexComponentFileReference cmpBTreeFileName = btreeFileIter.next();
+ IndexComponentFileReference cmpBuddyBTreeFileName = buddyBtreeFileIter.next();
+ IndexComponentFileReference cmpBloomFilterFileName = bloomFilterFileIter.next();
+ validFiles.add(new LSMComponentFileReferences(cmpBTreeFileName.getFileRef(),
+ cmpBuddyBTreeFileName.getFileRef(), cmpBloomFilterFileName.getFileRef()));
}
return validFiles;
@@ -183,10 +170,9 @@
@Override
public LSMComponentFileReferences getNewTransactionFileReference() throws IOException {
- String ts = getCurrentTimestamp();
// Create transaction lock file
- Files.createFile(Paths.get(baseDir + TXN_PREFIX + ts));
- String baseName = ts + DELIMITER + ts;
+ String baseName = getNextComponentSequence(btreeFilter);
+ Files.createFile(Paths.get(baseDir + TXN_PREFIX + baseName));
return new LSMComponentFileReferences(baseDir.getChild(baseName + DELIMITER + BTREE_SUFFIX),
baseDir.getChild(baseName + DELIMITER + DELETE_TREE_SUFFIX),
baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX));
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGenerator.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGenerator.java
index e6aa2d1..a5c6360 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGenerator.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGenerator.java
@@ -38,5 +38,4 @@
* @return the index of the current memory component
*/
int getCurrentComponentIndex();
-
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
index 57fd01d..1f481c9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
@@ -21,12 +21,9 @@
import java.io.FilenameFilter;
import java.io.IOException;
-import java.text.Format;
-import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
-import java.util.Date;
import java.util.HashSet;
import java.util.List;
@@ -42,7 +39,9 @@
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.hyracks.util.annotations.NotThreadSafe;
+@NotThreadSafe
public abstract class AbstractLSMIndexFileManager implements ILSMIndexFileManager {
public enum TreeIndexState {
@@ -76,22 +75,18 @@
*/
public static final String TXN_PREFIX = ".T";
- public static final String COMPONENT_TIMESTAMP_FORMAT = "yyyy-MM-dd-HH-mm-ss-SSS";
-
public static final FilenameFilter COMPONENT_FILES_FILTER = (dir, name) -> !name.startsWith(".");
protected static final FilenameFilter txnFileNameFilter = (dir, name) -> name.startsWith(TXN_PREFIX);
protected static FilenameFilter bloomFilterFilter =
(dir, name) -> !name.startsWith(".") && name.endsWith(BLOOM_FILTER_SUFFIX);
- protected static final FilenameFilter dummyFilter = (dir, name) -> true;
protected static final Comparator<String> cmp = new FileNameComparator();
+ private static final FilenameFilter dummyFilter = (dir, name) -> true;
protected final IIOManager ioManager;
// baseDir should reflect dataset name and partition name and be absolute
protected final FileReference baseDir;
- protected final Format formatter = new SimpleDateFormat(COMPONENT_TIMESTAMP_FORMAT);
- protected final Comparator<ComparableFileName> recencyCmp = new RecencyComparator();
+ protected final Comparator<IndexComponentFileReference> recencyCmp = new RecencyComparator();
protected final TreeIndexFactory<? extends ITreeIndex> treeFactory;
- private String prevTimestamp = null;
public AbstractLSMIndexFileManager(IIOManager ioManager, FileReference file,
TreeIndexFactory<? extends ITreeIndex> treeFactory) {
@@ -131,18 +126,18 @@
}
protected void cleanupAndGetValidFilesInternal(FilenameFilter filter,
- TreeIndexFactory<? extends ITreeIndex> treeFactory, ArrayList<ComparableFileName> allFiles,
+ TreeIndexFactory<? extends ITreeIndex> treeFactory, ArrayList<IndexComponentFileReference> allFiles,
IBufferCache bufferCache) throws HyracksDataException {
String[] files = listDirFiles(baseDir, filter);
for (String fileName : files) {
FileReference fileRef = baseDir.getChild(fileName);
if (treeFactory == null) {
- allFiles.add(new ComparableFileName(fileRef));
+ allFiles.add(IndexComponentFileReference.of(fileRef));
continue;
}
TreeIndexState idxState = isValidTreeIndex(treeFactory.createIndexInstance(fileRef));
if (idxState == TreeIndexState.VALID) {
- allFiles.add(new ComparableFileName(fileRef));
+ allFiles.add(IndexComponentFileReference.of(fileRef));
} else if (idxState == TreeIndexState.INVALID) {
bufferCache.deleteFile(fileRef);
}
@@ -167,18 +162,16 @@
return files;
}
- protected void validateFiles(HashSet<String> groundTruth, ArrayList<ComparableFileName> validFiles,
+ protected void validateFiles(HashSet<String> groundTruth, ArrayList<IndexComponentFileReference> validFiles,
FilenameFilter filter, TreeIndexFactory<? extends ITreeIndex> treeFactory, IBufferCache bufferCache)
throws HyracksDataException {
- ArrayList<ComparableFileName> tmpAllInvListsFiles = new ArrayList<>();
+ ArrayList<IndexComponentFileReference> tmpAllInvListsFiles = new ArrayList<>();
cleanupAndGetValidFilesInternal(filter, treeFactory, tmpAllInvListsFiles, bufferCache);
- for (ComparableFileName cmpFileName : tmpAllInvListsFiles) {
- int index = cmpFileName.fileName.lastIndexOf(DELIMITER);
- String file = cmpFileName.fileName.substring(0, index);
- if (groundTruth.contains(file)) {
+ for (IndexComponentFileReference cmpFileName : tmpAllInvListsFiles) {
+ if (groundTruth.contains(cmpFileName.getSequence())) {
validFiles.add(cmpFileName);
} else {
- delete(bufferCache, cmpFileName.fullPath);
+ delete(bufferCache, cmpFileName.getFullPath());
}
}
}
@@ -198,30 +191,20 @@
@Override
public LSMComponentFileReferences getRelFlushFileReference() throws HyracksDataException {
- String ts = getCurrentTimestamp();
- // Begin timestamp and end timestamp are identical since it is a flush
- return new LSMComponentFileReferences(baseDir.getChild(ts + DELIMITER + ts), null, null);
+ final String sequence = getNextComponentSequence(COMPONENT_FILES_FILTER);
+ return new LSMComponentFileReferences(baseDir.getChild(sequence), null, null);
}
@Override
- public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName)
- throws HyracksDataException {
- String[] firstTimestampRange = firstFileName.split(DELIMITER);
- String[] lastTimestampRange = lastFileName.split(DELIMITER);
- String start = firstTimestampRange[0];
- String end = lastTimestampRange[1];
- if (end.compareTo(start) <= 0) {
- throw new IllegalArgumentException(
- "A Merge file must have end greater than start. Found end: " + end + " and start: " + start);
- }
- // Get the range of timestamps by taking the earliest and the latest timestamps
- return new LSMComponentFileReferences(baseDir.getChild(start + DELIMITER + end), null, null);
+ public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName) {
+ final String baseName = IndexComponentFileReference.getMergeSequence(firstFileName, lastFileName);
+ return new LSMComponentFileReferences(baseDir.getChild(baseName), null, null);
}
@Override
public List<LSMComponentFileReferences> cleanupAndGetValidFiles() throws HyracksDataException {
List<LSMComponentFileReferences> validFiles = new ArrayList<>();
- ArrayList<ComparableFileName> allFiles = new ArrayList<>();
+ ArrayList<IndexComponentFileReference> allFiles = new ArrayList<>();
// Gather files and delete invalid files
// There are two types of invalid files:
@@ -235,40 +218,37 @@
}
if (allFiles.size() == 1) {
- validFiles.add(new LSMComponentFileReferences(allFiles.get(0).fileRef, null, null));
+ validFiles.add(new LSMComponentFileReferences(allFiles.get(0).getFileRef(), null, null));
return validFiles;
}
- // Sorts files names from earliest to latest timestamp.
+ // Sorts files names from earliest to latest
Collections.sort(allFiles);
- List<ComparableFileName> validComparableFiles = new ArrayList<>();
- ComparableFileName last = allFiles.get(0);
+ List<IndexComponentFileReference> validComparableFiles = new ArrayList<>();
+ IndexComponentFileReference last = allFiles.get(0);
validComparableFiles.add(last);
for (int i = 1; i < allFiles.size(); i++) {
- ComparableFileName current = allFiles.get(i);
- // The current start timestamp is greater than last stop timestamp so current is valid.
- if (current.interval[0].compareTo(last.interval[1]) > 0) {
+ IndexComponentFileReference current = allFiles.get(i);
+ if (current.isMoreRecentThan(last)) {
+ // The current start sequence is greater than last stop sequence so current is valid.
validComparableFiles.add(current);
last = current;
- } else if (current.interval[0].compareTo(last.interval[0]) >= 0
- && current.interval[1].compareTo(last.interval[1]) <= 0) {
+ } else if (current.isWithin(last)) {
// The current file is completely contained in the interval of the
// last file. Thus the last file must contain at least as much information
// as the current file, so delete the current file.
- delete(treeFactory.getBufferCache(), current.fullPath);
+ delete(treeFactory.getBufferCache(), current.getFullPath());
} else {
// This scenario should not be possible since timestamps are monotonically increasing.
throw HyracksDataException.create(ErrorCode.FOUND_OVERLAPPING_LSM_FILES, baseDir);
}
}
-
// Sort valid files in reverse lexicographical order, such that newer files come first.
- Collections.sort(validComparableFiles, recencyCmp);
- for (ComparableFileName cmpFileName : validComparableFiles) {
- validFiles.add(new LSMComponentFileReferences(cmpFileName.fileRef, null, null));
+ validComparableFiles.sort(recencyCmp);
+ for (IndexComponentFileReference cmpFileName : validComparableFiles) {
+ validFiles.add(new LSMComponentFileReferences(cmpFileName.getFileRef(), null, null));
}
-
return validFiles;
}
@@ -287,8 +267,7 @@
private static class FileNameComparator implements Comparator<String> {
@Override
public int compare(String a, String b) {
- // Consciously ignoring locale.
- return -a.compareTo(b);
+ return IndexComponentFileReference.of(b).compareTo(IndexComponentFileReference.of(a));
}
}
@@ -309,45 +288,14 @@
}
}
- protected class ComparableFileName implements Comparable<ComparableFileName> {
- public final FileReference fileRef;
- public final String fullPath;
- public final String fileName;
-
- // Timestamp interval.
- public final String[] interval;
-
- public ComparableFileName(FileReference fileRef) {
- this.fileRef = fileRef;
- this.fullPath = fileRef.getFile().getAbsolutePath();
- this.fileName = fileRef.getFile().getName();
- interval = fileName.split(DELIMITER);
- }
-
+ private class RecencyComparator implements Comparator<IndexComponentFileReference> {
@Override
- public int compareTo(ComparableFileName b) {
- int startCmp = interval[0].compareTo(b.interval[0]);
+ public int compare(IndexComponentFileReference a, IndexComponentFileReference b) {
+ int startCmp = -Long.compare(a.getSequenceStart(), b.getSequenceStart());
if (startCmp != 0) {
return startCmp;
}
- return b.interval[1].compareTo(interval[1]);
- }
-
- @Override
- public String toString() {
- return "{\"type\" : \"" + (interval[0].equals(interval[1]) ? "flush" : "merge") + "\", \"start\" : \""
- + interval[0] + "\", \"end\" : \"" + interval[1] + "\"}";
- }
- }
-
- private class RecencyComparator implements Comparator<ComparableFileName> {
- @Override
- public int compare(ComparableFileName a, ComparableFileName b) {
- int cmp = -a.interval[0].compareTo(b.interval[0]);
- if (cmp != 0) {
- return cmp;
- }
- return -a.interval[1].compareTo(b.interval[1]);
+ return -Long.compare(a.getSequenceEnd(), b.getSequenceEnd());
}
}
@@ -382,10 +330,10 @@
return null;
}
- protected static FilenameFilter createTransactionFilter(String transactionFileName, final boolean inclusive) {
+ private static FilenameFilter createTransactionFilter(String transactionFileName, final boolean inclusive) {
final String timeStamp =
transactionFileName.substring(transactionFileName.indexOf(TXN_PREFIX) + TXN_PREFIX.length());
- return (dir, name) -> inclusive ? name.startsWith(timeStamp) : !name.startsWith(timeStamp);
+ return (dir, name) -> inclusive == name.startsWith(timeStamp);
}
protected FilenameFilter getTransactionFileFilter(boolean inclusive) throws HyracksDataException {
@@ -406,34 +354,12 @@
return (dir, name) -> filter1.accept(dir, name) && filter2.accept(dir, name);
}
- /**
- * @return The string format of the current timestamp.
- * The returned results of this method are guaranteed to not have duplicates.
- */
- protected String getCurrentTimestamp() {
- Date date = new Date();
- String ts = formatter.format(date);
- /**
- * prevent a corner case where the same timestamp can be given.
- */
- while (prevTimestamp != null && ts.compareTo(prevTimestamp) == 0) {
- try {
- Thread.sleep(1);
- date = new Date();
- ts = formatter.format(date);
- } catch (InterruptedException e) {
- //ignore
- }
+ protected String getNextComponentSequence(FilenameFilter filenameFilter) throws HyracksDataException {
+ long maxComponentSeq = -1;
+ final String[] files = listDirFiles(baseDir, filenameFilter);
+ for (String fileName : files) {
+ maxComponentSeq = Math.max(maxComponentSeq, IndexComponentFileReference.of(fileName).getSequenceEnd());
}
- prevTimestamp = ts;
- return ts;
- }
-
- public static String getComponentStartTime(String fileName) {
- return fileName.split(DELIMITER)[0];
- }
-
- public static String getComponentEndTime(String fileName) {
- return fileName.split(DELIMITER)[1];
+ return IndexComponentFileReference.getFlushSequence(maxComponentSeq + 1);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IndexComponentFileReference.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IndexComponentFileReference.java
new file mode 100644
index 0000000..bbadf60
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IndexComponentFileReference.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.impls;
+
+import static org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager.DELIMITER;
+
+import java.util.Objects;
+
+import org.apache.hyracks.api.io.FileReference;
+
+public class IndexComponentFileReference implements Comparable<IndexComponentFileReference> {
+
+ private FileReference fileRef;
+ private String fullPath;
+ private String fileName;
+ private long sequenceStart;
+ private long sequenceEnd;
+
+ private IndexComponentFileReference() {
+ }
+
+ public static IndexComponentFileReference of(String file) {
+ final IndexComponentFileReference ref = new IndexComponentFileReference();
+ ref.fileName = file;
+ final String[] splits = file.split(DELIMITER);
+ ref.sequenceStart = Long.parseLong(splits[0]);
+ ref.sequenceEnd = Long.parseLong(splits[1]);
+ return ref;
+ }
+
+ public static IndexComponentFileReference of(FileReference fileRef) {
+ final IndexComponentFileReference ref = of(fileRef.getFile().getName());
+ ref.fileRef = fileRef;
+ ref.fullPath = fileRef.getFile().getAbsolutePath();
+ return ref;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ IndexComponentFileReference that = (IndexComponentFileReference) o;
+ return Objects.equals(fileName, that.fileName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(fileName);
+ }
+
+ @Override
+ public int compareTo(IndexComponentFileReference o) {
+ int startCmp = Long.compare(sequenceStart, o.sequenceStart);
+ if (startCmp != 0) {
+ return startCmp;
+ }
+ return Long.compare(o.sequenceEnd, sequenceEnd);
+ }
+
+ public String getFileName() {
+ return fileName;
+ }
+
+ public long getSequenceStart() {
+ return sequenceStart;
+ }
+
+ public long getSequenceEnd() {
+ return sequenceEnd;
+ }
+
+ public String getFullPath() {
+ return fullPath;
+ }
+
+ public FileReference getFileRef() {
+ return fileRef;
+ }
+
+ public String getSequence() {
+ return sequenceStart + DELIMITER + sequenceEnd;
+ }
+
+ public boolean isMoreRecentThan(IndexComponentFileReference other) {
+ return sequenceStart > other.getSequenceEnd();
+ }
+
+ public boolean isWithin(IndexComponentFileReference other) {
+ return sequenceStart >= other.getSequenceStart() && sequenceEnd <= other.getSequenceEnd();
+ }
+
+ @Override
+ public String toString() {
+ return "{\"type\" : \"" + (isFlush() ? "flush" : "merge") + "\", \"start\" : \"" + sequenceStart
+ + "\", \"end\" : \"" + sequenceEnd + "\"}";
+ }
+
+ private boolean isFlush() {
+ return sequenceStart == sequenceEnd;
+ }
+
+ public static String getFlushSequence(long componentSequence) {
+ return componentSequence + DELIMITER + componentSequence;
+ }
+
+ public static String getMergeSequence(String firstComponentName, String lastComponentName) {
+ long mergeSequenceStart = IndexComponentFileReference.of(firstComponentName).getSequenceStart();
+ long mergeSequenceEnd = IndexComponentFileReference.of(lastComponentName).getSequenceEnd();
+ return mergeSequenceStart + DELIMITER + mergeSequenceEnd;
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java
index c7990a6..cf6c4a2 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java
@@ -24,12 +24,14 @@
public class LSMComponentId implements ILSMComponentId {
public static final long NOT_FOUND = -1;
+ public static final long MIN_VALID_COMPONENT_ID = 0;
// Used to represent an empty index with no components
public static final LSMComponentId EMPTY_INDEX_LAST_COMPONENT_ID = new LSMComponentId(NOT_FOUND, NOT_FOUND);
// A default component id used for bulk loaded component
- public static final LSMComponentId DEFAULT_COMPONENT_ID = new LSMComponentId(0, 0);
+ public static final LSMComponentId DEFAULT_COMPONENT_ID =
+ new LSMComponentId(MIN_VALID_COMPONENT_ID, MIN_VALID_COMPONENT_ID);
private long minId;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGenerator.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGenerator.java
index 3da57fd..21a27a9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGenerator.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGenerator.java
@@ -21,55 +21,40 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
+import org.apache.hyracks.util.annotations.ThreadSafe;
/**
* A default implementation of {@link ILSMComponentIdGenerator}.
- *
*/
+@ThreadSafe
public class LSMComponentIdGenerator implements ILSMComponentIdGenerator {
private final int numComponents;
private int currentComponentIndex;
- protected long previousTimestamp = -1L;
+ private long lastUsedId;
private ILSMComponentId componentId;
- public LSMComponentIdGenerator(int numComponents) {
+ public LSMComponentIdGenerator(int numComponents, long lastUsedId) {
this.numComponents = numComponents;
+ this.lastUsedId = lastUsedId;
refresh();
currentComponentIndex = 0;
}
@Override
- public void refresh() {
- long ts = getCurrentTimestamp();
- componentId = new LSMComponentId(ts, ts);
+ public synchronized void refresh() {
+ final long nextId = ++lastUsedId;
+ componentId = new LSMComponentId(nextId, nextId);
currentComponentIndex = (currentComponentIndex + 1) % numComponents;
}
@Override
- public ILSMComponentId getId() {
+ public synchronized ILSMComponentId getId() {
return componentId;
}
@Override
- public int getCurrentComponentIndex() {
+ public synchronized int getCurrentComponentIndex() {
return currentComponentIndex;
}
-
- protected long getCurrentTimestamp() {
- long timestamp = System.currentTimeMillis();
- while (timestamp <= previousTimestamp) {
- // make sure timestamp is strictly increasing
- try {
- Thread.sleep(1);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- timestamp = System.currentTimeMillis();
- }
- previousTimestamp = timestamp;
- return timestamp;
-
- }
-
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java
index 2f1eb87..4471102 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java
@@ -32,6 +32,7 @@
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
import org.apache.hyracks.storage.am.lsm.common.impls.BTreeFactory;
+import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexFileNameMapper;
@@ -57,21 +58,15 @@
@Override
public LSMComponentFileReferences getRelFlushFileReference() throws HyracksDataException {
- String ts = getCurrentTimestamp();
- String baseName = ts + DELIMITER + ts;
- // Begin timestamp and end timestamp are identical since it is a flush
+ String baseName = getNextComponentSequence(deletedKeysBTreeFilter);
return new LSMComponentFileReferences(baseDir.getChild(baseName + DELIMITER + DICT_BTREE_SUFFIX),
baseDir.getChild(baseName + DELIMITER + DELETED_KEYS_BTREE_SUFFIX),
baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX));
}
@Override
- public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName)
- throws HyracksDataException {
- String[] firstTimestampRange = firstFileName.split(DELIMITER);
- String[] lastTimestampRange = lastFileName.split(DELIMITER);
- String baseName = firstTimestampRange[0] + DELIMITER + lastTimestampRange[1];
- // Get the range of timestamps by taking the earliest and the latest timestamps
+ public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName) {
+ final String baseName = IndexComponentFileReference.getMergeSequence(firstFileName, lastFileName);
return new LSMComponentFileReferences(baseDir.getChild(baseName + DELIMITER + DICT_BTREE_SUFFIX),
baseDir.getChild(baseName + DELIMITER + DELETED_KEYS_BTREE_SUFFIX),
baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX));
@@ -80,18 +75,17 @@
@Override
public List<LSMComponentFileReferences> cleanupAndGetValidFiles() throws HyracksDataException {
List<LSMComponentFileReferences> validFiles = new ArrayList<>();
- ArrayList<ComparableFileName> allDictBTreeFiles = new ArrayList<>();
- ArrayList<ComparableFileName> allInvListsFiles = new ArrayList<>();
- ArrayList<ComparableFileName> allDeletedKeysBTreeFiles = new ArrayList<>();
- ArrayList<ComparableFileName> allBloomFilterFiles = new ArrayList<>();
+ ArrayList<IndexComponentFileReference> allDictBTreeFiles = new ArrayList<>();
+ ArrayList<IndexComponentFileReference> allInvListsFiles = new ArrayList<>();
+ ArrayList<IndexComponentFileReference> allDeletedKeysBTreeFiles = new ArrayList<>();
+ ArrayList<IndexComponentFileReference> allBloomFilterFiles = new ArrayList<>();
// Gather files.
cleanupAndGetValidFilesInternal(deletedKeysBTreeFilter, btreeFactory, allDeletedKeysBTreeFiles,
btreeFactory.getBufferCache());
HashSet<String> deletedKeysBTreeFilesSet = new HashSet<>();
- for (ComparableFileName cmpFileName : allDeletedKeysBTreeFiles) {
- int index = cmpFileName.fileName.lastIndexOf(DELIMITER);
- deletedKeysBTreeFilesSet.add(cmpFileName.fileName.substring(0, index));
+ for (IndexComponentFileReference cmpFileName : allDeletedKeysBTreeFiles) {
+ deletedKeysBTreeFilesSet.add(cmpFileName.getSequence());
}
// TODO: do we really need to validate the inverted lists files or is validating the dict. BTrees is enough?
@@ -116,52 +110,48 @@
if (allDictBTreeFiles.size() == 1 && allInvListsFiles.size() == 1 && allDeletedKeysBTreeFiles.size() == 1
&& allBloomFilterFiles.size() == 1) {
- validFiles.add(new LSMComponentFileReferences(allDictBTreeFiles.get(0).fileRef,
- allDeletedKeysBTreeFiles.get(0).fileRef, allBloomFilterFiles.get(0).fileRef));
+ validFiles.add(new LSMComponentFileReferences(allDictBTreeFiles.get(0).getFileRef(),
+ allDeletedKeysBTreeFiles.get(0).getFileRef(), allBloomFilterFiles.get(0).getFileRef()));
return validFiles;
}
- // Sorts files names from earliest to latest timestamp.
+ // Sorts files names from earliest to latest sequence.
Collections.sort(allDeletedKeysBTreeFiles);
Collections.sort(allDictBTreeFiles);
Collections.sort(allBloomFilterFiles);
- List<ComparableFileName> validComparableDictBTreeFiles = new ArrayList<>();
- ComparableFileName lastDictBTree = allDictBTreeFiles.get(0);
+ List<IndexComponentFileReference> validComparableDictBTreeFiles = new ArrayList<>();
+ IndexComponentFileReference lastDictBTree = allDictBTreeFiles.get(0);
validComparableDictBTreeFiles.add(lastDictBTree);
- List<ComparableFileName> validComparableDeletedKeysBTreeFiles = new ArrayList<>();
- ComparableFileName lastDeletedKeysBTree = allDeletedKeysBTreeFiles.get(0);
+ List<IndexComponentFileReference> validComparableDeletedKeysBTreeFiles = new ArrayList<>();
+ IndexComponentFileReference lastDeletedKeysBTree = allDeletedKeysBTreeFiles.get(0);
validComparableDeletedKeysBTreeFiles.add(lastDeletedKeysBTree);
- List<ComparableFileName> validComparableBloomFilterFiles = new ArrayList<>();
- ComparableFileName lastBloomFilter = allBloomFilterFiles.get(0);
+ List<IndexComponentFileReference> validComparableBloomFilterFiles = new ArrayList<>();
+ IndexComponentFileReference lastBloomFilter = allBloomFilterFiles.get(0);
validComparableBloomFilterFiles.add(lastBloomFilter);
for (int i = 1; i < allDictBTreeFiles.size(); i++) {
- ComparableFileName currentDeletedKeysBTree = allDeletedKeysBTreeFiles.get(i);
- ComparableFileName currentDictBTree = allDictBTreeFiles.get(i);
- ComparableFileName currentBloomFilter = allBloomFilterFiles.get(i);
- // Current start timestamp is greater than last stop timestamp.
- if (currentDeletedKeysBTree.interval[0].compareTo(lastDeletedKeysBTree.interval[1]) > 0
- && currentDictBTree.interval[0].compareTo(lastDictBTree.interval[1]) > 0
- && currentBloomFilter.interval[0].compareTo(lastBloomFilter.interval[1]) > 0) {
+ IndexComponentFileReference currentDeletedKeysBTree = allDeletedKeysBTreeFiles.get(i);
+ IndexComponentFileReference currentDictBTree = allDictBTreeFiles.get(i);
+ IndexComponentFileReference currentBloomFilter = allBloomFilterFiles.get(i);
+ // Current start sequence is greater than last stop sequence.
+ if (currentDeletedKeysBTree.isMoreRecentThan(lastDeletedKeysBTree)
+ && currentDictBTree.isMoreRecentThan(lastDictBTree)
+ && currentBloomFilter.isMoreRecentThan(lastBloomFilter)) {
validComparableDictBTreeFiles.add(currentDictBTree);
validComparableDeletedKeysBTreeFiles.add(currentDeletedKeysBTree);
validComparableBloomFilterFiles.add(currentBloomFilter);
lastDictBTree = currentDictBTree;
lastDeletedKeysBTree = currentDeletedKeysBTree;
lastBloomFilter = currentBloomFilter;
- } else if (currentDeletedKeysBTree.interval[0].compareTo(lastDeletedKeysBTree.interval[0]) >= 0
- && currentDeletedKeysBTree.interval[1].compareTo(lastDeletedKeysBTree.interval[1]) <= 0
- && currentDictBTree.interval[0].compareTo(lastDictBTree.interval[0]) >= 0
- && currentDictBTree.interval[1].compareTo(lastDictBTree.interval[1]) <= 0
- && currentBloomFilter.interval[0].compareTo(lastBloomFilter.interval[0]) >= 0
- && currentBloomFilter.interval[1].compareTo(lastBloomFilter.interval[1]) <= 0) {
- // Invalid files are completely contained in last interval.
- delete(treeFactory.getBufferCache(), currentDeletedKeysBTree.fullPath);
- delete(treeFactory.getBufferCache(), currentDictBTree.fullPath);
- delete(treeFactory.getBufferCache(), currentBloomFilter.fullPath);
+ } else if (currentDeletedKeysBTree.isWithin(lastDeletedKeysBTree)
+ && currentDictBTree.isWithin(lastDictBTree) && currentBloomFilter.isWithin(lastBloomFilter)) {
+ // Invalid files are completely contained in last sequence.
+ delete(treeFactory.getBufferCache(), currentDeletedKeysBTree.getFullPath());
+ delete(treeFactory.getBufferCache(), currentDictBTree.getFullPath());
+ delete(treeFactory.getBufferCache(), currentBloomFilter.getFullPath());
} else {
// This scenario should not be possible.
throw HyracksDataException.create(ErrorCode.FOUND_OVERLAPPING_LSM_FILES, baseDir);
@@ -170,21 +160,20 @@
// Sort valid files in reverse lexicographical order, such that newer
// files come first.
- Collections.sort(validComparableDictBTreeFiles, recencyCmp);
- Collections.sort(validComparableDeletedKeysBTreeFiles, recencyCmp);
- Collections.sort(validComparableBloomFilterFiles, recencyCmp);
+ validComparableDictBTreeFiles.sort(recencyCmp);
+ validComparableDeletedKeysBTreeFiles.sort(recencyCmp);
+ validComparableBloomFilterFiles.sort(recencyCmp);
- Iterator<ComparableFileName> dictBTreeFileIter = validComparableDictBTreeFiles.iterator();
- Iterator<ComparableFileName> deletedKeysBTreeIter = validComparableDeletedKeysBTreeFiles.iterator();
- Iterator<ComparableFileName> bloomFilterFileIter = validComparableBloomFilterFiles.iterator();
+ Iterator<IndexComponentFileReference> dictBTreeFileIter = validComparableDictBTreeFiles.iterator();
+ Iterator<IndexComponentFileReference> deletedKeysBTreeIter = validComparableDeletedKeysBTreeFiles.iterator();
+ Iterator<IndexComponentFileReference> bloomFilterFileIter = validComparableBloomFilterFiles.iterator();
while (dictBTreeFileIter.hasNext() && deletedKeysBTreeIter.hasNext()) {
- ComparableFileName cmpDictBTreeFile = dictBTreeFileIter.next();
- ComparableFileName cmpDeletedKeysBTreeFile = deletedKeysBTreeIter.next();
- ComparableFileName cmpBloomFilterFileName = bloomFilterFileIter.next();
- validFiles.add(new LSMComponentFileReferences(cmpDictBTreeFile.fileRef, cmpDeletedKeysBTreeFile.fileRef,
- cmpBloomFilterFileName.fileRef));
+ IndexComponentFileReference cmpDictBTreeFile = dictBTreeFileIter.next();
+ IndexComponentFileReference cmpDeletedKeysBTreeFile = deletedKeysBTreeIter.next();
+ IndexComponentFileReference cmpBloomFilterFileName = bloomFilterFileIter.next();
+ validFiles.add(new LSMComponentFileReferences(cmpDictBTreeFile.getFileRef(),
+ cmpDeletedKeysBTreeFile.getFileRef(), cmpBloomFilterFileName.getFileRef()));
}
-
return validFiles;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFileManager.java
index 2512776..3348407 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFileManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFileManager.java
@@ -36,6 +36,7 @@
import org.apache.hyracks.api.util.IoUtil;
import org.apache.hyracks.storage.am.common.api.ITreeIndex;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
+import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
import org.apache.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
@@ -58,22 +59,15 @@
@Override
public LSMComponentFileReferences getRelFlushFileReference() throws HyracksDataException {
- String ts = getCurrentTimestamp();
- String baseName = ts + DELIMITER + ts;
- // Begin timestamp and end timestamp are identical since it is a flush
+ String baseName = getNextComponentSequence(btreeFilter);
return new LSMComponentFileReferences(baseDir.getChild(baseName + DELIMITER + RTREE_SUFFIX),
baseDir.getChild(baseName + DELIMITER + BTREE_SUFFIX),
baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX));
}
@Override
- public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName)
- throws HyracksDataException {
- String[] firstTimestampRange = firstFileName.split(DELIMITER);
- String[] lastTimestampRange = lastFileName.split(DELIMITER);
- String baseName = firstTimestampRange[0] + DELIMITER + lastTimestampRange[1];
- // Get the range of timestamps by taking the earliest and the latest
- // timestamps
+ public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName) {
+ final String baseName = IndexComponentFileReference.getMergeSequence(firstFileName, lastFileName);
return new LSMComponentFileReferences(baseDir.getChild(baseName + DELIMITER + RTREE_SUFFIX),
baseDir.getChild(baseName + DELIMITER + BTREE_SUFFIX),
baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX));
@@ -82,9 +76,9 @@
@Override
public List<LSMComponentFileReferences> cleanupAndGetValidFiles() throws HyracksDataException {
List<LSMComponentFileReferences> validFiles = new ArrayList<>();
- ArrayList<ComparableFileName> allRTreeFiles = new ArrayList<>();
- ArrayList<ComparableFileName> allBTreeFiles = new ArrayList<>();
- ArrayList<ComparableFileName> allBloomFilterFiles = new ArrayList<>();
+ ArrayList<IndexComponentFileReference> allRTreeFiles = new ArrayList<>();
+ ArrayList<IndexComponentFileReference> allBTreeFiles = new ArrayList<>();
+ ArrayList<IndexComponentFileReference> allBloomFilterFiles = new ArrayList<>();
// Create a transaction filter <- to hide transaction components->
FilenameFilter transactionFilter = getTransactionFileFilter(false);
@@ -93,9 +87,8 @@
cleanupAndGetValidFilesInternal(getCompoundFilter(transactionFilter, btreeFilter), btreeFactory, allBTreeFiles,
btreeFactory.getBufferCache());
HashSet<String> btreeFilesSet = new HashSet<>();
- for (ComparableFileName cmpFileName : allBTreeFiles) {
- int index = cmpFileName.fileName.lastIndexOf(DELIMITER);
- btreeFilesSet.add(cmpFileName.fileName.substring(0, index));
+ for (IndexComponentFileReference cmpFileName : allBTreeFiles) {
+ btreeFilesSet.add(cmpFileName.getSequence());
}
validateFiles(btreeFilesSet, allRTreeFiles, getCompoundFilter(transactionFilter, rtreeFilter), rtreeFactory,
btreeFactory.getBufferCache());
@@ -113,52 +106,47 @@
}
if (allRTreeFiles.size() == 1 && allBTreeFiles.size() == 1 && allBloomFilterFiles.size() == 1) {
- validFiles.add(new LSMComponentFileReferences(allRTreeFiles.get(0).fileRef, allBTreeFiles.get(0).fileRef,
- allBloomFilterFiles.get(0).fileRef));
+ validFiles.add(new LSMComponentFileReferences(allRTreeFiles.get(0).getFileRef(),
+ allBTreeFiles.get(0).getFileRef(), allBloomFilterFiles.get(0).getFileRef()));
return validFiles;
}
- // Sorts files names from earliest to latest timestamp.
+ // Sorts files names from earliest to latest sequence.
Collections.sort(allRTreeFiles);
Collections.sort(allBTreeFiles);
Collections.sort(allBloomFilterFiles);
- List<ComparableFileName> validComparableRTreeFiles = new ArrayList<>();
- ComparableFileName lastRTree = allRTreeFiles.get(0);
+ List<IndexComponentFileReference> validComparableRTreeFiles = new ArrayList<>();
+ IndexComponentFileReference lastRTree = allRTreeFiles.get(0);
validComparableRTreeFiles.add(lastRTree);
- List<ComparableFileName> validComparableBTreeFiles = new ArrayList<>();
- ComparableFileName lastBTree = allBTreeFiles.get(0);
+ List<IndexComponentFileReference> validComparableBTreeFiles = new ArrayList<>();
+ IndexComponentFileReference lastBTree = allBTreeFiles.get(0);
validComparableBTreeFiles.add(lastBTree);
- List<ComparableFileName> validComparableBloomFilterFiles = new ArrayList<>();
- ComparableFileName lastBloomFilter = allBloomFilterFiles.get(0);
+ List<IndexComponentFileReference> validComparableBloomFilterFiles = new ArrayList<>();
+ IndexComponentFileReference lastBloomFilter = allBloomFilterFiles.get(0);
validComparableBloomFilterFiles.add(lastBloomFilter);
for (int i = 1; i < allRTreeFiles.size(); i++) {
- ComparableFileName currentRTree = allRTreeFiles.get(i);
- ComparableFileName currentBTree = allBTreeFiles.get(i);
- ComparableFileName currentBloomFilter = allBloomFilterFiles.get(i);
- // Current start timestamp is greater than last stop timestamp.
- if (currentRTree.interval[0].compareTo(lastRTree.interval[1]) > 0
- && currentBTree.interval[0].compareTo(lastBTree.interval[1]) > 0
- && currentBloomFilter.interval[0].compareTo(lastBloomFilter.interval[1]) > 0) {
+ IndexComponentFileReference currentRTree = allRTreeFiles.get(i);
+ IndexComponentFileReference currentBTree = allBTreeFiles.get(i);
+ IndexComponentFileReference currentBloomFilter = allBloomFilterFiles.get(i);
+ // Current start sequence is greater than last stop sequence.
+ if (currentRTree.isMoreRecentThan(lastRTree) && currentBTree.isMoreRecentThan(lastBTree)
+ && currentBloomFilter.isMoreRecentThan(lastBloomFilter)) {
validComparableRTreeFiles.add(currentRTree);
validComparableBTreeFiles.add(currentBTree);
validComparableBloomFilterFiles.add(currentBloomFilter);
lastRTree = currentRTree;
lastBTree = currentBTree;
lastBloomFilter = currentBloomFilter;
- } else if (currentRTree.interval[0].compareTo(lastRTree.interval[0]) >= 0
- && currentRTree.interval[1].compareTo(lastRTree.interval[1]) <= 0
- && currentBTree.interval[0].compareTo(lastBTree.interval[0]) >= 0
- && currentBTree.interval[1].compareTo(lastBTree.interval[1]) <= 0
- && currentBloomFilter.interval[0].compareTo(lastBloomFilter.interval[0]) >= 0
- && currentBloomFilter.interval[1].compareTo(lastBloomFilter.interval[1]) <= 0) {
- // Invalid files are completely contained in last interval.
- delete(treeFactory.getBufferCache(), currentRTree.fullPath);
- delete(treeFactory.getBufferCache(), currentBTree.fullPath);
- delete(treeFactory.getBufferCache(), currentBloomFilter.fullPath);
+ } else if (currentRTree.isWithin(lastRTree) && currentBTree.isWithin(lastBTree)
+ && currentBloomFilter.isWithin(lastBloomFilter)) {
+ // Invalid files are completely contained in last sequence.
+ delete(treeFactory.getBufferCache(), currentRTree.getFullPath());
+ delete(treeFactory.getBufferCache(), currentBTree.getFullPath());
+ delete(treeFactory.getBufferCache(), currentBloomFilter.getFullPath());
} else {
// This scenario should not be possible.
throw HyracksDataException.create(ErrorCode.FOUND_OVERLAPPING_LSM_FILES, baseDir);
@@ -167,29 +155,28 @@
// Sort valid files in reverse lexicographical order, such that newer
// files come first.
- Collections.sort(validComparableRTreeFiles, recencyCmp);
- Collections.sort(validComparableBTreeFiles, recencyCmp);
- Collections.sort(validComparableBloomFilterFiles, recencyCmp);
+ validComparableRTreeFiles.sort(recencyCmp);
+ validComparableBTreeFiles.sort(recencyCmp);
+ validComparableBloomFilterFiles.sort(recencyCmp);
- Iterator<ComparableFileName> rtreeFileIter = validComparableRTreeFiles.iterator();
- Iterator<ComparableFileName> btreeFileIter = validComparableBTreeFiles.iterator();
- Iterator<ComparableFileName> bloomFilterFileIter = validComparableBloomFilterFiles.iterator();
+ Iterator<IndexComponentFileReference> rtreeFileIter = validComparableRTreeFiles.iterator();
+ Iterator<IndexComponentFileReference> btreeFileIter = validComparableBTreeFiles.iterator();
+ Iterator<IndexComponentFileReference> bloomFilterFileIter = validComparableBloomFilterFiles.iterator();
while (rtreeFileIter.hasNext() && btreeFileIter.hasNext()) {
- ComparableFileName cmpRTreeFileName = rtreeFileIter.next();
- ComparableFileName cmpBTreeFileName = btreeFileIter.next();
- ComparableFileName cmpBloomFilterFileName = bloomFilterFileIter.next();
- validFiles.add(new LSMComponentFileReferences(cmpRTreeFileName.fileRef, cmpBTreeFileName.fileRef,
- cmpBloomFilterFileName.fileRef));
+ IndexComponentFileReference cmpRTreeFileName = rtreeFileIter.next();
+ IndexComponentFileReference cmpBTreeFileName = btreeFileIter.next();
+ IndexComponentFileReference cmpBloomFilterFileName = bloomFilterFileIter.next();
+ validFiles.add(new LSMComponentFileReferences(cmpRTreeFileName.getFileRef(), cmpBTreeFileName.getFileRef(),
+ cmpBloomFilterFileName.getFileRef()));
}
return validFiles;
}
@Override
public LSMComponentFileReferences getNewTransactionFileReference() throws IOException {
- String ts = getCurrentTimestamp();
+ String baseName = getNextComponentSequence(btreeFilter);
// Create transaction lock file
- Files.createFile(Paths.get(baseDir + TXN_PREFIX + ts));
- String baseName = ts + DELIMITER + ts;
+ Files.createFile(Paths.get(baseDir + TXN_PREFIX + baseName));
return new LSMComponentFileReferences(baseDir.getChild(baseName + DELIMITER + RTREE_SUFFIX),
baseDir.getChild(baseName + DELIMITER + BTREE_SUFFIX),
baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX));
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/component/TestLsmIndexFileManager.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/component/TestLsmIndexFileManager.java
index 8ecbcc4..c255ee5 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/component/TestLsmIndexFileManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/component/TestLsmIndexFileManager.java
@@ -22,16 +22,19 @@
import java.io.FilenameFilter;
import java.util.ArrayList;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.storage.am.common.api.ITreeIndex;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
+import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
import org.apache.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
public class TestLsmIndexFileManager extends AbstractLSMIndexFileManager {
+ private long componentSeq = 0;
+
public TestLsmIndexFileManager(IIOManager ioManager, FileReference file,
TreeIndexFactory<? extends ITreeIndex> treeIndexFactory) {
super(ioManager, file, treeIndexFactory);
@@ -39,12 +42,18 @@
@Override
protected void cleanupAndGetValidFilesInternal(FilenameFilter filter,
- TreeIndexFactory<? extends ITreeIndex> treeFactory, ArrayList<ComparableFileName> allFiles,
- IBufferCache bufferCache) throws HyracksDataException {
+ TreeIndexFactory<? extends ITreeIndex> treeFactory, ArrayList<IndexComponentFileReference> allFiles,
+ IBufferCache bufferCache) {
String[] files = baseDir.getFile().list(filter);
for (String fileName : files) {
FileReference fileRef = baseDir.getChild(fileName);
- allFiles.add(new ComparableFileName(fileRef));
+ allFiles.add(IndexComponentFileReference.of(fileRef));
}
}
+
+ @Override
+ public LSMComponentFileReferences getRelFlushFileReference() {
+ String sequence = IndexComponentFileReference.getFlushSequence(componentSeq++);
+ return new LSMComponentFileReferences(baseDir.getChild(sequence), null, null);
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/Span.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/Span.java
index e75a961..183cb6f 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/Span.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/Span.java
@@ -29,6 +29,10 @@
spanNanos = unit.toNanos(span);
}
+ public long getSpanNanos() {
+ return spanNanos;
+ }
+
public static Span start(long span, TimeUnit unit) {
return new Span(span, unit);
}