Merge commit '142aad08' from stabilization-f69489
Change-Id: I7ebe1f339fa27fc269f56f4e394622b452310638
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
index b1a1fcc..453ffa0 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
@@ -26,7 +26,6 @@
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
@@ -135,9 +134,4 @@
public final IFrameWriter getInputFrameWriter(int index) {
return null;
}
-
- @Override
- public JobId getJobId() {
- return ctx.getJobletContext().getJobId();
- }
}
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
index 2da7193..a52f01e 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
@@ -21,7 +21,6 @@
import java.util.concurrent.TimeUnit;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobId;
public interface IActiveRuntime {
@@ -44,11 +43,6 @@
void stop(long timeout, TimeUnit unit) throws HyracksDataException, InterruptedException;
/**
- * @return the job id associated with this active runtime
- */
- JobId getJobId();
-
- /**
* @return the runtime stats for monitoring purposes
*/
default String getStats() {
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java
index 1c762af..03c4bc5 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java
@@ -23,8 +23,10 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.asterix.algebra.base.ILangExpressionToPlanTranslator;
import org.apache.asterix.common.exceptions.CompilationException;
@@ -736,16 +738,18 @@
throws CompilationException {
List<Expression> recordExprs = new ArrayList<>();
List<FieldBinding> fieldBindings = new ArrayList<>();
+ Set<String> fieldNames = new HashSet<>();
+
for (Projection projection : selectRegular.getProjections()) {
if (projection.varStar()) {
- SourceLocation sourceLoc = projection.getSourceLocation();
if (!fieldBindings.isEmpty()) {
RecordConstructor recordConstr = new RecordConstructor(new ArrayList<>(fieldBindings));
- recordConstr.setSourceLocation(sourceLoc);
+ recordConstr.setSourceLocation(selectRegular.getSourceLocation());
recordExprs.add(recordConstr);
fieldBindings.clear();
}
Expression projectionExpr = projection.getExpression();
+ SourceLocation sourceLoc = projection.getSourceLocation();
CallExpr toObjectExpr = new CallExpr(new FunctionSignature(BuiltinFunctions.TO_OBJECT),
Collections.singletonList(projectionExpr));
toObjectExpr.setSourceLocation(sourceLoc);
@@ -755,21 +759,22 @@
recordExprs.add(ifMissingOrNullExpr);
} else if (projection.star()) {
if (selectBlock.hasGroupbyClause()) {
- getGroupBindings(selectBlock.getGroupbyClause(), fieldBindings);
+ getGroupBindings(selectBlock.getGroupbyClause(), fieldBindings, fieldNames);
if (selectBlock.hasLetClausesAfterGroupby()) {
- getLetBindings(selectBlock.getLetListAfterGroupby(), fieldBindings);
+ getLetBindings(selectBlock.getLetListAfterGroupby(), fieldBindings, fieldNames);
}
} else if (selectBlock.hasFromClause()) {
- getFromBindings(selectBlock.getFromClause(), fieldBindings);
+ getFromBindings(selectBlock.getFromClause(), fieldBindings, fieldNames);
if (selectBlock.hasLetClauses()) {
- getLetBindings(selectBlock.getLetList(), fieldBindings);
+ getLetBindings(selectBlock.getLetList(), fieldBindings, fieldNames);
}
} else if (selectBlock.hasLetClauses()) {
- getLetBindings(selectBlock.getLetList(), fieldBindings);
+ getLetBindings(selectBlock.getLetList(), fieldBindings, fieldNames);
}
+ } else if (projection.hasName()) {
+ fieldBindings.add(getFieldBinding(projection, fieldNames));
} else {
- fieldBindings.add(new FieldBinding(new LiteralExpr(new StringLiteral(projection.getName())),
- projection.getExpression()));
+ throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, projection.getSourceLocation());
}
}
if (!fieldBindings.isEmpty()) {
@@ -789,49 +794,68 @@
}
// Generates all field bindings according to the from clause.
- private void getFromBindings(FromClause fromClause, List<FieldBinding> outFieldBindings) {
+ private void getFromBindings(FromClause fromClause, List<FieldBinding> outFieldBindings, Set<String> outFieldNames)
+ throws CompilationException {
for (FromTerm fromTerm : fromClause.getFromTerms()) {
- outFieldBindings.add(getFieldBinding(fromTerm.getLeftVariable()));
+ outFieldBindings.add(getFieldBinding(fromTerm.getLeftVariable(), outFieldNames));
if (fromTerm.hasPositionalVariable()) {
- outFieldBindings.add(getFieldBinding(fromTerm.getPositionalVariable()));
+ outFieldBindings.add(getFieldBinding(fromTerm.getPositionalVariable(), outFieldNames));
}
if (!fromTerm.hasCorrelateClauses()) {
continue;
}
for (AbstractBinaryCorrelateClause correlateClause : fromTerm.getCorrelateClauses()) {
- outFieldBindings.add(getFieldBinding(correlateClause.getRightVariable()));
+ outFieldBindings.add(getFieldBinding(correlateClause.getRightVariable(), outFieldNames));
if (correlateClause.hasPositionalVariable()) {
- outFieldBindings.add(getFieldBinding(correlateClause.getPositionalVariable()));
+ outFieldBindings.add(getFieldBinding(correlateClause.getPositionalVariable(), outFieldNames));
}
}
}
}
// Generates all field bindings according to the from clause.
- private void getGroupBindings(GroupbyClause groupbyClause, List<FieldBinding> outFieldBindings) {
+ private void getGroupBindings(GroupbyClause groupbyClause, List<FieldBinding> outFieldBindings,
+ Set<String> outFieldNames) throws CompilationException {
for (GbyVariableExpressionPair pair : groupbyClause.getGbyPairList()) {
- outFieldBindings.add(getFieldBinding(pair.getVar()));
+ outFieldBindings.add(getFieldBinding(pair.getVar(), outFieldNames));
}
if (groupbyClause.hasGroupVar()) {
- outFieldBindings.add(getFieldBinding(groupbyClause.getGroupVar()));
+ outFieldBindings.add(getFieldBinding(groupbyClause.getGroupVar(), outFieldNames));
}
if (groupbyClause.hasWithMap()) {
- throw new IllegalStateException(groupbyClause.getWithVarMap().values().toString()); // no WITH in SQLPP
+ // no WITH in SQLPP
+ throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, groupbyClause.getSourceLocation(),
+ groupbyClause.getWithVarMap().values().toString());
}
}
// Generates all field bindings according to the let clause.
- private void getLetBindings(List<LetClause> letClauses, List<FieldBinding> outFieldBindings) {
+ private void getLetBindings(List<LetClause> letClauses, List<FieldBinding> outFieldBindings,
+ Set<String> outFieldNames) throws CompilationException {
for (LetClause letClause : letClauses) {
- outFieldBindings.add(getFieldBinding(letClause.getVarExpr()));
+ outFieldBindings.add(getFieldBinding(letClause.getVarExpr(), outFieldNames));
}
}
// Generates a field binding for a variable.
- private FieldBinding getFieldBinding(VariableExpr var) {
- LiteralExpr fieldName = new LiteralExpr(
- new StringLiteral(SqlppVariableUtil.variableNameToDisplayedFieldName(var.getVar().getValue())));
- return new FieldBinding(fieldName, var);
+ private FieldBinding getFieldBinding(VariableExpr varExpr, Set<String> outFieldNames) throws CompilationException {
+ String fieldName = SqlppVariableUtil.variableNameToDisplayedFieldName(varExpr.getVar().getValue());
+ return generateFieldBinding(fieldName, varExpr, outFieldNames, varExpr.getSourceLocation());
+ }
+
+ // Generates a field binding for a named projection.
+ private FieldBinding getFieldBinding(Projection projection, Set<String> outFieldNames) throws CompilationException {
+ String fieldName = projection.getName();
+ Expression fieldValueExpr = projection.getExpression();
+ return generateFieldBinding(fieldName, fieldValueExpr, outFieldNames, projection.getSourceLocation());
+ }
+
+ private FieldBinding generateFieldBinding(String fieldName, Expression fieldValueExpr, Set<String> outFieldNames,
+ SourceLocation sourceLoc) throws CompilationException {
+ if (!outFieldNames.add(fieldName)) {
+ throw new CompilationException(ErrorCode.DUPLICATE_FIELD_NAME, sourceLoc, fieldName);
+ }
+ return new FieldBinding(new LiteralExpr(new StringLiteral(fieldName)), fieldValueExpr);
}
@Override
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
index 5d722e7..0172b28 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
@@ -47,7 +47,6 @@
private final IRetryPolicyFactory retryPolicyFactory;
private final MetadataProvider metadataProvider;
private final IClusterStateManager clusterStateManager;
- private Exception failure;
public RecoveryTask(ICcApplicationContext appCtx, ActiveEntityEventsListener listener,
IRetryPolicyFactory retryPolicyFactory) {
@@ -105,50 +104,46 @@
}
}
- protected Void doRecover(IRetryPolicy policy)
- throws AlgebricksException, HyracksDataException, InterruptedException {
+ protected Void doRecover(IRetryPolicy policy) throws AlgebricksException, InterruptedException {
LOGGER.log(level, "Actual Recovery task has started");
- if (listener.getState() != ActivityState.TEMPORARILY_FAILED) {
- LOGGER.log(level, "but its state is not temp failure and so we're just returning");
- return null;
- }
- LOGGER.log(level, "calling the policy");
+ Exception failure = null;
while (policy.retry(failure)) {
synchronized (listener) {
- if (cancelRecovery) {
- return null;
- }
- while (clusterStateManager.getState() != ClusterState.ACTIVE) {
- if (cancelRecovery) {
- return null;
- }
+ while (!cancelRecovery && clusterStateManager.getState() != ClusterState.ACTIVE) {
listener.wait();
}
+ if (cancelRecovery) {
+ LOGGER.log(level, "Recovery has been cancelled");
+ return null;
+ }
}
IMetadataLockManager lockManager = metadataProvider.getApplicationContext().getMetadataLockManager();
- lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(),
- listener.getEntityId().getDataverse() + '.' + listener.getEntityId().getEntityName());
- for (Dataset dataset : listener.getDatasets()) {
- lockManager.acquireDataverseReadLock(metadataProvider.getLocks(), dataset.getDataverseName());
- lockManager.acquireDatasetExclusiveModificationLock(metadataProvider.getLocks(),
- DatasetUtil.getFullyQualifiedName(dataset));
- }
- synchronized (listener) {
- try {
- if (cancelRecovery) {
- return null;
- }
- listener.setState(ActivityState.RECOVERING);
- listener.doStart(metadataProvider);
- return null;
- } catch (Exception e) {
- LOGGER.log(level, "Attempt to revive " + listener.getEntityId() + " failed", e);
- listener.setState(ActivityState.TEMPORARILY_FAILED);
- failure = e;
- } finally {
- metadataProvider.getLocks().reset();
+ try {
+ lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(),
+ listener.getEntityId().getDataverse() + '.' + listener.getEntityId().getEntityName());
+ for (Dataset dataset : listener.getDatasets()) {
+ lockManager.acquireDataverseReadLock(metadataProvider.getLocks(), dataset.getDataverseName());
+ lockManager.acquireDatasetExclusiveModificationLock(metadataProvider.getLocks(),
+ DatasetUtil.getFullyQualifiedName(dataset));
}
- listener.notifyAll();
+ synchronized (listener) {
+ try {
+ if (!cancelRecovery && listener.getState() == ActivityState.TEMPORARILY_FAILED) {
+ listener.setState(ActivityState.RECOVERING);
+ listener.doStart(metadataProvider);
+ }
+ LOGGER.log(level, "Recovery completed successfully");
+ return null;
+ } finally {
+ listener.notifyAll();
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.log(level, "Attempt to revive " + listener.getEntityId() + " failed", e);
+ listener.setState(ActivityState.TEMPORARILY_FAILED);
+ failure = e;
+ } finally {
+ metadataProvider.getLocks().reset();
}
}
// Recovery task is essntially over now either through failure or through cancellation(stop)
@@ -160,6 +155,9 @@
// 1. set the state to permanent failure.
// 2. set the entity to not running to avoid auto recovery attempt
&& listener.getState() != ActivityState.SUSPENDED) {
+ LOGGER.log(level, "Recovery is cancelled because the current state {} is neither {} nor {}",
+ listener.getState(), ActivityState.TEMPORARILY_FAILED,
+ listener.getState() != ActivityState.SUSPENDED);
return null;
}
}
@@ -172,10 +170,7 @@
DatasetUtil.getFullyQualifiedName(dataset));
}
synchronized (listener) {
- if (cancelRecovery) {
- return null;
- }
- if (listener.getState() == ActivityState.TEMPORARILY_FAILED) {
+ if (!cancelRecovery && listener.getState() == ActivityState.TEMPORARILY_FAILED) {
LOGGER.warn("Recovery for {} permanently failed", listener.getEntityId());
listener.setState(ActivityState.STOPPED);
listener.setRunning(metadataProvider, false);
@@ -187,8 +182,4 @@
}
return null;
}
-
- public Exception getFailure() {
- return failure;
- }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelWriteInterface.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelWriteInterface.java
index 9ad870c..84f7831 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelWriteInterface.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelWriteInterface.java
@@ -54,7 +54,7 @@
ecodeSent = true;
ccb.reportLocalEOS();
adjustChannelWritability();
- } else if (eos && !eosSent) {
+ } else if (isPendingCloseWrite()) {
writerState.getCommand().setChannelId(channelId);
writerState.getCommand().setCommandType(MuxDemuxCommand.CommandType.CLOSE_CHANNEL);
writerState.getCommand().setData(0);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
index 22dec60..e67246a 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
@@ -65,7 +65,6 @@
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import org.mockito.stubbing.Answer;
public class CheckpointingTest {
@@ -134,7 +133,7 @@
ICheckpointManager checkpointManager = nc.getTransactionSubsystem().getCheckpointManager();
LogManager logManager = (LogManager) nc.getTransactionSubsystem().getLogManager();
// Number of log files after node startup should be one
- int numberOfLogFiles = logManager.getLogFileIds().size();
+ int numberOfLogFiles = logManager.getOrderedLogFileIds().size();
Assert.assertEquals(1, numberOfLogFiles);
// Low-water mark LSN
@@ -142,10 +141,10 @@
// Low-water mark log file id
long initialLowWaterMarkFileId = logManager.getLogFileId(lowWaterMarkLSN);
// Initial Low-water mark should be in the only available log file
- Assert.assertEquals(initialLowWaterMarkFileId, logManager.getLogFileIds().get(0).longValue());
+ Assert.assertEquals(initialLowWaterMarkFileId, logManager.getOrderedLogFileIds().get(0).longValue());
// Insert records until a new log file is created
- while (logManager.getLogFileIds().size() == 1) {
+ while (logManager.getOrderedLogFileIds().size() == 1) {
ITupleReference tuple = tupleGenerator.next();
DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
}
@@ -160,9 +159,9 @@
* the low-water mark is still in it (i.e. it is still required for
* recovery)
*/
- int numberOfLogFilesBeforeCheckpoint = logManager.getLogFileIds().size();
+ int numberOfLogFilesBeforeCheckpoint = logManager.getOrderedLogFileIds().size();
checkpointManager.tryCheckpoint(logManager.getAppendLSN());
- int numberOfLogFilesAfterCheckpoint = logManager.getLogFileIds().size();
+ int numberOfLogFilesAfterCheckpoint = logManager.getOrderedLogFileIds().size();
Assert.assertEquals(numberOfLogFilesBeforeCheckpoint, numberOfLogFilesAfterCheckpoint);
/*
@@ -203,7 +202,7 @@
checkpointManager.tryCheckpoint(lowWaterMarkLSN);
// Validate initialLowWaterMarkFileId was deleted
- for (Long fileId : logManager.getLogFileIds()) {
+ for (Long fileId : logManager.getOrderedLogFileIds()) {
Assert.assertNotEquals(initialLowWaterMarkFileId, fileId.longValue());
}
@@ -298,7 +297,7 @@
// Make sure the valid checkout wouldn't force full recovery
Assert.assertTrue(validCheckpoint.getMinMCTFirstLsn() >= minFirstLSN);
// Add a corrupted (empty) checkpoint file with a timestamp > than current checkpoint
- Path corruptedCheckpointPath = checkpointManager.getCheckpointPath(validCheckpoint.getTimeStamp() + 1);
+ Path corruptedCheckpointPath = checkpointManager.getCheckpointPath(validCheckpoint.getId() + 1);
File corruptedCheckpoint = corruptedCheckpointPath.toFile();
corruptedCheckpoint.createNewFile();
// Make sure the corrupted checkpoint file was created
@@ -306,11 +305,11 @@
// Try to get the latest checkpoint again
Checkpoint cpAfterCorruption = checkpointManager.getLatest();
// Make sure the valid checkpoint was returned
- Assert.assertEquals(validCheckpoint.getTimeStamp(), cpAfterCorruption.getTimeStamp());
+ Assert.assertEquals(validCheckpoint.getId(), cpAfterCorruption.getId());
// Make sure the corrupted checkpoint file was deleted
Assert.assertFalse(corruptedCheckpoint.exists());
// Corrupt the valid checkpoint by replacing its content
- final Path validCheckpointPath = checkpointManager.getCheckpointPath(validCheckpoint.getTimeStamp());
+ final Path validCheckpointPath = checkpointManager.getCheckpointPath(validCheckpoint.getId());
File validCheckpointFile = validCheckpointPath.toFile();
Assert.assertTrue(validCheckpointFile.exists());
// Delete the valid checkpoint file and create it as an empty file
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
index 4453a1d..964bf66 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
@@ -147,7 +147,7 @@
public void interruptedLogFileSwitch() throws Exception {
final INcApplicationContext ncAppCtx = (INcApplicationContext) integrationUtil.ncs[0].getApplicationContext();
final LogManager logManager = (LogManager) ncAppCtx.getTransactionSubsystem().getLogManager();
- int logFileCountBeforeInterrupt = logManager.getLogFileIds().size();
+ int logFileCountBeforeInterrupt = logManager.getOrderedLogFileIds().size();
// ensure an interrupted transactor will create next log file but will fail to position the log channel
final AtomicBoolean failed = new AtomicBoolean(false);
@@ -162,7 +162,7 @@
interruptedTransactor.start();
interruptedTransactor.join();
// ensure a new log file was created and survived interrupt
- int logFileCountAfterInterrupt = logManager.getLogFileIds().size();
+ int logFileCountAfterInterrupt = logManager.getOrderedLogFileIds().size();
Assert.assertEquals(logFileCountBeforeInterrupt + 1, logFileCountAfterInterrupt);
Assert.assertFalse(failed.get());
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446-2/query-ASTERIXDB-2446-2.01.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446-2/query-ASTERIXDB-2446-2.01.ddl.sqlpp
new file mode 100644
index 0000000..12a6034
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446-2/query-ASTERIXDB-2446-2.01.ddl.sqlpp
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+
+use test;
+
+create type test.empInfoType as
+{
+ id : bigint
+};
+
+create dataset empDataset(empInfoType) primary key id;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446-2/query-ASTERIXDB-2446-2.02.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446-2/query-ASTERIXDB-2446-2.02.update.sqlpp
new file mode 100644
index 0000000..04ab807
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446-2/query-ASTERIXDB-2446-2.02.update.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+load dataset empDataset using localfs ((`path`=`asterix_nc1://data/types/empDataset.adm`),(`format`=`adm`));
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446-2/query-ASTERIXDB-2446-2.03.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446-2/query-ASTERIXDB-2446-2.03.query.sqlpp
new file mode 100644
index 0000000..9577855
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446-2/query-ASTERIXDB-2446-2.03.query.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Description: Ensure error if there's a duplicate field name in the SELECT clause
+ * Project expression and constant
+ */
+
+use test;
+
+select empno as a, 2 as a
+from empDataset
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446-2/query-ASTERIXDB-2446-2.04.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446-2/query-ASTERIXDB-2446-2.04.query.sqlpp
new file mode 100644
index 0000000..5eac30e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446-2/query-ASTERIXDB-2446-2.04.query.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Description: Ensure error if there's a duplicate field name in the SELECT clause
+ * Project expression and expression
+ */
+
+use test;
+
+select empno as b, name as b
+from empDataset
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446-2/query-ASTERIXDB-2446-2.05.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446-2/query-ASTERIXDB-2446-2.05.query.sqlpp
new file mode 100644
index 0000000..ea4372a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446-2/query-ASTERIXDB-2446-2.05.query.sqlpp
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Description: Ensure error if there's a duplicate field name in the SELECT clause
+ * FROM variable and another field
+ */
+
+use test;
+
+select c, empno as c
+from empDataset c
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446-2/query-ASTERIXDB-2446-2.06.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446-2/query-ASTERIXDB-2446-2.06.query.sqlpp
new file mode 100644
index 0000000..788577b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446-2/query-ASTERIXDB-2446-2.06.query.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Description: Ensure error if there's a duplicate field name in the SELECT clause
+ * Project * (FROM variable) and another field
+ */
+
+use test;
+
+select *, empno as d
+from empDataset d
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446-2/query-ASTERIXDB-2446-2.07.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446-2/query-ASTERIXDB-2446-2.07.query.sqlpp
new file mode 100644
index 0000000..232d29c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446-2/query-ASTERIXDB-2446-2.07.query.sqlpp
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Description: Ensure error if there's a duplicate field name in the SELECT clause
+ * Project * (LET variable) and another field
+ */
+
+use test;
+
+select *, emp.name as e
+from empDataset emp
+let e = d.deptno
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446-2/query-ASTERIXDB-2446-2.08.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446-2/query-ASTERIXDB-2446-2.08.query.sqlpp
new file mode 100644
index 0000000..48c96a0
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446-2/query-ASTERIXDB-2446-2.08.query.sqlpp
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Description: Ensure error if there's a duplicate field name in the SELECT clause
+ * Project * (GROUP BY key variable) and another field
+ */
+
+use test;
+
+select *, count(*) as f
+from empDataset emp
+group by emp.age f
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446-2/query-ASTERIXDB-2446-2.09.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446-2/query-ASTERIXDB-2446-2.09.query.sqlpp
new file mode 100644
index 0000000..9655bb7
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446-2/query-ASTERIXDB-2446-2.09.query.sqlpp
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Description: Ensure error if there's a duplicate field name in the SELECT clause
+ * Project * (LET after GROUP BY variable) and another field
+ */
+
+use test;
+
+select *, count(*) as g
+from empDataset emp
+group by emp.age f
+let g = f + 1
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 6edc7bf..9bb995e 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -100,6 +100,18 @@
<expected-error>ASX0013: Duplicate field name "e"</expected-error>
</compilation-unit>
</test-case>
+ <test-case FilePath="flwor">
+ <compilation-unit name="query-ASTERIXDB-2446-2">
+ <output-dir compare="Text">query-ASTERIXDB-883</output-dir>
+ <expected-error>ASX0013: Duplicate field name "a" (in line 27, at column 20)</expected-error>
+ <expected-error>ASX0013: Duplicate field name "b" (in line 27, at column 20)</expected-error>
+ <expected-error>ASX0013: Duplicate field name "c" (in line 27, at column 11)</expected-error>
+ <expected-error>ASX0013: Duplicate field name "d" (in line 27, at column 11)</expected-error>
+ <expected-error>ASX0013: Duplicate field name "e" (in line 27, at column 14)</expected-error>
+ <expected-error>ASX0013: Duplicate field name "f" (in line 27, at column 11)</expected-error>
+ <expected-error>ASX0013: Duplicate field name "g" (in line 27, at column 11)</expected-error>
+ </compilation-unit>
+ </test-case>
</test-group>
<test-group name="explain">
<test-case FilePath="explain">
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
index f84167e..9654473 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
@@ -24,6 +24,8 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -31,6 +33,7 @@
public class IndexCheckpoint {
+ private static final Logger LOGGER = LogManager.getLogger();
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final long INITIAL_CHECKPOINT_ID = 0;
private long id;
@@ -52,6 +55,9 @@
public static IndexCheckpoint next(IndexCheckpoint latest, long lowWatermark, long validComponentSequence,
long lastComponentId) {
if (lowWatermark < latest.getLowWatermark()) {
+ if (LOGGER.isErrorEnabled()) {
+ LOGGER.error("low watermark {} less than the latest checkpoint low watermark {}", lowWatermark, latest);
+ }
throw new IllegalStateException("Low watermark should always be increasing");
}
IndexCheckpoint next = new IndexCheckpoint();
@@ -104,4 +110,13 @@
throw HyracksDataException.create(e);
}
}
+
+ @Override
+ public String toString() {
+ try {
+ return asJson();
+ } catch (HyracksDataException e) {
+ throw new IllegalStateException(e);
+ }
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java
index 7f1a1c7..8fe0353 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java
@@ -18,28 +18,29 @@
*/
package org.apache.asterix.common.transactions;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IJsonSerializable;
import org.apache.hyracks.api.io.IPersistedResourceRegistry;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
public class Checkpoint implements Comparable<Checkpoint>, IJsonSerializable {
private static final long serialVersionUID = 1L;
private final long checkpointLsn;
private final long minMCTFirstLsn;
private final long maxTxnId;
- private final long timeStamp;
private final boolean sharp;
private final int storageVersion;
+ private long id;
- public Checkpoint(long checkpointLsn, long minMCTFirstLsn, long maxTxnId, long timeStamp, boolean sharp,
+ public Checkpoint(long id, long checkpointLsn, long minMCTFirstLsn, long maxTxnId, boolean sharp,
int storageVersion) {
+ this.id = id;
this.checkpointLsn = checkpointLsn;
this.minMCTFirstLsn = minMCTFirstLsn;
this.maxTxnId = maxTxnId;
- this.timeStamp = timeStamp;
this.sharp = sharp;
this.storageVersion = storageVersion;
}
@@ -56,8 +57,8 @@
return maxTxnId;
}
- public long getTimeStamp() {
- return timeStamp;
+ public long getId() {
+ return id;
}
public boolean isSharp() {
@@ -69,68 +70,47 @@
}
@Override
- public int compareTo(Checkpoint checkpoint) {
- long compareTimeStamp = checkpoint.getTimeStamp();
-
- // Descending order
- long diff = compareTimeStamp - this.timeStamp;
- if (diff > 0) {
- return 1;
- } else if (diff == 0) {
- return 0;
- } else {
- return -1;
- }
+ public int compareTo(Checkpoint other) {
+ return Long.compare(this.id, other.id);
}
@Override
- public boolean equals(Object obj) {
- if (this == obj) {
+ public boolean equals(Object o) {
+ if (this == o) {
return true;
}
- if (obj == null) {
+ if (o == null || getClass() != o.getClass()) {
return false;
}
- if (!(obj instanceof Checkpoint)) {
- return false;
- }
- Checkpoint other = (Checkpoint) obj;
- return compareTo(other) == 0;
+ Checkpoint that = (Checkpoint) o;
+ return id == that.id;
}
@Override
public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + (int) (checkpointLsn ^ (checkpointLsn >>> 32));
- result = prime * result + Long.hashCode(maxTxnId);
- result = prime * result + (int) (minMCTFirstLsn ^ (minMCTFirstLsn >>> 32));
- result = prime * result + (sharp ? 1231 : 1237);
- result = prime * result + storageVersion;
- result = prime * result + (int) (timeStamp ^ (timeStamp >>> 32));
- return result;
+ return Long.hashCode(id);
}
@Override
public JsonNode toJson(IPersistedResourceRegistry registry) throws HyracksDataException {
final ObjectNode checkpointJson = registry.getClassIdentifier(getClass(), serialVersionUID);
+ checkpointJson.put("id", id);
checkpointJson.put("checkpointLsn", checkpointLsn);
checkpointJson.put("minMCTFirstLsn", minMCTFirstLsn);
checkpointJson.put("maxTxnId", maxTxnId);
- checkpointJson.put("timeStamp", timeStamp);
- checkpointJson.put("sharp", timeStamp);
+ checkpointJson.put("sharp", sharp);
checkpointJson.put("storageVersion", storageVersion);
return checkpointJson;
}
@SuppressWarnings("squid:S1172") // unused parameter
public static IJsonSerializable fromJson(IPersistedResourceRegistry registry, JsonNode json) {
+ long id = json.get("id").asLong();
long checkpointLsn = json.get("checkpointLsn").asLong();
long minMCTFirstLsn = json.get("minMCTFirstLsn").asLong();
long maxTxnId = json.get("maxTxnId").asLong();
- long timeStamp = json.get("timeStamp").asLong();
boolean sharp = json.get("sharp").asBoolean();
int storageVersion = json.get("storageVersion").asInt();
- return new Checkpoint(checkpointLsn, minMCTFirstLsn, maxTxnId, timeStamp, sharp, storageVersion);
+ return new Checkpoint(id, checkpointLsn, minMCTFirstLsn, maxTxnId, sharp, storageVersion);
}
}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
index e3cf8b8..36cea55 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
@@ -18,7 +18,6 @@
*/
package org.apache.asterix.common.transactions;
-import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
@@ -26,10 +25,8 @@
/**
* @return The latest checkpoint on disk if any exists. Otherwise null.
- * @throws ACIDException
- * when a checkpoint file cannot be read.
*/
- Checkpoint getLatest() throws ACIDException;
+ Checkpoint getLatest();
/**
* Performs a sharp checkpoint.
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
index 19b006f..644f3c0 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
@@ -41,7 +41,7 @@
/**
* The storage version of AsterixDB related artifacts (e.g. log files, checkpoint files, etc..).
*/
- private static final int LOCAL_STORAGE_VERSION = 4;
+ private static final int LOCAL_STORAGE_VERSION = 5;
/**
* The storage version of AsterixDB stack.
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index ba8074fe..b855981 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -206,6 +206,12 @@
public void finish() throws HyracksDataException {
lsmAccessor.getCtx().setOperation(IndexOperation.UPSERT);
}
+
+ @Override
+ public void fail(Throwable th) {
+ // We must fail before we exit the components
+ frameOpCallback.fail(th);
+ }
};
tracer = ctx.getJobletContext().getServiceContext().getTracer();
traceCategory = tracer.getRegistry().get(TraceUtils.LATENCY);
@@ -314,12 +320,7 @@
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
accessor.reset(buffer);
int itemCount = accessor.getTupleCount();
- try {
- lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback);
- } catch (Throwable th) {// NOSONAR: Must notify of all failures
- frameOpCallback.fail(th);
- throw th;
- }
+ lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback);
if (itemCount > 0) {
lastRecordInTimeStamp = System.currentTimeMillis();
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index c0d18df..0a6dda9 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -29,7 +29,6 @@
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.Comparator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
@@ -103,7 +102,8 @@
nodeId = txnSubsystem.getId();
flushLogsQ = new LinkedBlockingQueue<>();
txnSubsystem.getApplicationContext().getThreadExecutor().execute(new FlushLogsLogger());
- initializeLogManager(SMALLEST_LOG_FILE_ID);
+ final long onDiskMaxLogFileId = getOnDiskMaxLogFileId();
+ initializeLogManager(onDiskMaxLogFileId);
}
private void initializeLogManager(long nextLogFileId) {
@@ -365,56 +365,32 @@
}
}
- private long initializeLogAnchor(long nextLogFileId) {
- long fileId = 0;
- long offset = 0;
- File fileLogDir = new File(logDir);
- try {
- if (fileLogDir.exists()) {
- List<Long> logFileIds = getLogFileIds();
- if (logFileIds.isEmpty()) {
- fileId = nextLogFileId;
- createFileIfNotExists(getLogFilePath(fileId));
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("created a log file: " + getLogFilePath(fileId));
- }
- } else {
- fileId = logFileIds.get(logFileIds.size() - 1);
- File logFile = new File(getLogFilePath(fileId));
- offset = logFile.length();
- }
- } else {
- fileId = nextLogFileId;
- createNewDirectory(logDir);
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("created the log directory: " + logManagerProperties.getLogDir());
- }
- createFileIfNotExists(getLogFilePath(fileId));
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("created a log file: " + getLogFilePath(fileId));
- }
- }
- } catch (IOException ioe) {
- throw new IllegalStateException("Failed to initialize the log anchor", ioe);
- }
+ private long initializeLogAnchor(long fileId) {
+ final String logFilePath = getLogFilePath(fileId);
+ createFileIfNotExists(logFilePath);
+ final File logFile = new File(logFilePath);
+ long offset = logFile.length();
if (LOGGER.isInfoEnabled()) {
- LOGGER.info("log file Id: " + fileId + ", offset: " + offset);
+ LOGGER.info("initializing log anchor with log file Id: {} at offset: {}", fileId, offset);
}
- return logFileSize * fileId + offset;
+ return getLogFileFirstLsn(fileId) + offset;
}
@Override
public void renewLogFiles() {
terminateLogFlusher();
closeCurrentLogFile();
- long lastMaxLogFileId = deleteAllLogFiles();
- initializeLogManager(lastMaxLogFileId + 1);
+ long nextLogFileId = getNextLogFileId();
+ createFileIfNotExists(getLogFilePath(nextLogFileId));
+ final long logFileFirstLsn = getLogFileFirstLsn(nextLogFileId);
+ deleteOldLogFiles(logFileFirstLsn);
+ initializeLogManager(nextLogFileId);
}
@Override
public void deleteOldLogFiles(long checkpointLSN) {
Long checkpointLSNLogFileID = getLogFileId(checkpointLSN);
- List<Long> logFileIds = getLogFileIds();
+ List<Long> logFileIds = getOrderedLogFileIds();
if (!logFileIds.isEmpty()) {
//sort log files from oldest to newest
Collections.sort(logFileIds);
@@ -461,24 +437,7 @@
}
}
- private long deleteAllLogFiles() {
- List<Long> logFileIds = getLogFileIds();
- if (!logFileIds.isEmpty()) {
- for (Long id : logFileIds) {
- File file = new File(getLogFilePath(id));
- LOGGER.info("Deleting log file: " + file.getAbsolutePath());
- if (!file.delete()) {
- throw new IllegalStateException("Failed to delete a file: " + file.getAbsolutePath());
- }
- LOGGER.info("log file: " + file.getAbsolutePath() + " was deleted successfully");
- }
- return logFileIds.get(logFileIds.size() - 1);
- } else {
- throw new IllegalStateException("Couldn't find any log files.");
- }
- }
-
- public List<Long> getLogFileIds() {
+ public List<Long> getOrderedLogFileIds() {
File fileLogDir = new File(logDir);
String[] logFileNames = null;
List<Long> logFileIds = null;
@@ -510,12 +469,7 @@
for (String fileName : logFileNames) {
logFileIds.add(Long.parseLong(fileName.substring(logFilePrefix.length() + 1)));
}
- Collections.sort(logFileIds, new Comparator<Long>() {
- @Override
- public int compare(Long arg0, Long arg1) {
- return arg0.compareTo(arg1);
- }
- });
+ logFileIds.sort(Long::compareTo);
return logFileIds;
}
@@ -531,17 +485,21 @@
return lsn / logFileSize;
}
- private static boolean createFileIfNotExists(String path) throws IOException {
- File file = new File(path);
- File parentFile = file.getParentFile();
- if (parentFile != null) {
- parentFile.mkdirs();
+ private static void createFileIfNotExists(String path) {
+ try {
+ File file = new File(path);
+ if (file.exists()) {
+ return;
+ }
+ File parentFile = file.getParentFile();
+ if (parentFile != null) {
+ parentFile.mkdirs();
+ }
+ Files.createFile(file.toPath());
+ LOGGER.info("Created log file {}", path);
+ } catch (IOException e) {
+ throw new IllegalStateException("Failed to create file in " + path, e);
}
- return file.createNewFile();
- }
-
- private static boolean createNewDirectory(String path) {
- return (new File(path)).mkdir();
}
private void createNextLogFile() throws IOException {
@@ -579,7 +537,7 @@
@Override
public long getReadableSmallestLSN() {
- List<Long> logFileIds = getLogFileIds();
+ List<Long> logFileIds = getOrderedLogFileIds();
if (!logFileIds.isEmpty()) {
return logFileIds.get(0) * logFileSize;
} else {
@@ -629,6 +587,22 @@
fileChannel.close();
}
+ private long getNextLogFileId() {
+ return getOnDiskMaxLogFileId() + 1;
+ }
+
+ private long getLogFileFirstLsn(long logFileId) {
+ return logFileId * logFileSize;
+ }
+
+ private long getOnDiskMaxLogFileId() {
+ final List<Long> logFileIds = getOrderedLogFileIds();
+ if (logFileIds.isEmpty()) {
+ return SMALLEST_LOG_FILE_ID;
+ }
+ return logFileIds.get(logFileIds.size() - 1);
+ }
+
/**
* This class is used to log FLUSH logs.
* FLUSH logs are flushed on a different thread to avoid a possible deadlock in {@link LogBuffer} batchUnlock
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java
index 0cbd6c6..e221da8 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java
@@ -60,6 +60,7 @@
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static final long SHARP_CHECKPOINT_LSN = -1;
private static final FilenameFilter filter = (File dir, String name) -> name.startsWith(CHECKPOINT_FILENAME_PREFIX);
+ private static final long FIRST_CHECKPOINT_ID = 0;
private final File checkpointDir;
private final int historyToKeep;
private final int lsnThreshold;
@@ -88,25 +89,118 @@
lsnThreshold = checkpointProperties.getLsnThreshold();
pollFrequency = checkpointProperties.getPollFrequency();
// We must keep at least the latest checkpoint
- historyToKeep = checkpointProperties.getHistoryToKeep() == 0 ? 1 : checkpointProperties.getHistoryToKeep();
+ historyToKeep = checkpointProperties.getHistoryToKeep() + 1;
persistedResourceRegistry = txnSubsystem.getApplicationContext().getPersistedResourceRegistry();
}
@Override
- public Checkpoint getLatest() throws ACIDException {
- // Read all checkpointObjects from the existing checkpoint files
+ public Checkpoint getLatest() {
LOGGER.log(Level.INFO, "Getting latest checkpoint");
+ final List<File> checkpointFiles = getCheckpointFiles();
+ if (checkpointFiles.isEmpty()) {
+ return null;
+ }
+ final List<Checkpoint> orderedCheckpoints = getOrderedCheckpoints(checkpointFiles);
+ if (orderedCheckpoints.isEmpty()) {
+ /*
+ * If all checkpoint files are corrupted, we have no option but to try to perform recovery.
+ * We will forge a checkpoint that forces recovery to start from the beginning of the log.
+ * This shouldn't happen unless a hardware corruption happens.
+ */
+ return forgeForceRecoveryCheckpoint();
+ }
+ return orderedCheckpoints.get(orderedCheckpoints.size() - 1);
+ }
+
+ @Override
+ public void start() {
+ checkpointer = new CheckpointThread(this, txnSubsystem.getLogManager(), lsnThreshold, pollFrequency);
+ checkpointer.start();
+ }
+
+ @Override
+ public void stop(boolean dumpState, OutputStream ouputStream) throws IOException {
+ checkpointer.shutdown();
+ checkpointer.interrupt();
+ try {
+ // Wait until checkpoint thread stops
+ checkpointer.join();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ @Override
+ public void dumpState(OutputStream os) throws IOException {
+ // Nothing to dump
+ }
+
+ public Path getCheckpointPath(long checkpointId) {
+ return Paths.get(checkpointDir.getAbsolutePath() + File.separator + CHECKPOINT_FILENAME_PREFIX
+ + Long.toString(checkpointId));
+ }
+
+ protected void capture(long minMCTFirstLSN, boolean sharp) throws HyracksDataException {
+ ILogManager logMgr = txnSubsystem.getLogManager();
+ ITransactionManager txnMgr = txnSubsystem.getTransactionManager();
+ final long nextCheckpointId = getNextCheckpointId();
+ final Checkpoint checkpointObject = new Checkpoint(nextCheckpointId, logMgr.getAppendLSN(), minMCTFirstLSN,
+ txnMgr.getMaxTxnId(), sharp, StorageConstants.VERSION);
+ persist(checkpointObject);
+ cleanup();
+ }
+
+ private Checkpoint forgeForceRecoveryCheckpoint() {
+ /*
+ * By setting the checkpoint first LSN (low watermark) to Long.MIN_VALUE, the recovery manager will start from
+ * the first available log.
+ * We set the storage version to the current version. If there is a version mismatch, it will be detected
+ * during recovery.
+ */
+ return new Checkpoint(Long.MIN_VALUE, Long.MIN_VALUE, Integer.MIN_VALUE, FIRST_CHECKPOINT_ID, false,
+ StorageConstants.VERSION);
+ }
+
+ private void persist(Checkpoint checkpoint) throws HyracksDataException {
+ // Get checkpoint file path
+ Path path = getCheckpointPath(checkpoint.getId());
+
+ if (LOGGER.isInfoEnabled()) {
+ File file = path.toFile();
+ LOGGER.log(Level.INFO, "Persisting checkpoint file to " + file + " which "
+ + (file.exists() ? "already exists" : "doesn't exist yet"));
+ }
+ // Write checkpoint file to disk
+ try {
+ byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(checkpoint.toJson(persistedResourceRegistry));
+ Files.write(path, bytes);
+ } catch (IOException e) {
+ LOGGER.log(Level.ERROR, "Failed to write checkpoint to disk", e);
+ throw HyracksDataException.create(e);
+ }
+ if (LOGGER.isInfoEnabled()) {
+ File file = path.toFile();
+ LOGGER.log(Level.INFO, "Completed persisting checkpoint file to " + file + " which now "
+ + (file.exists() ? "exists" : " still doesn't exist"));
+ }
+ }
+
+ private List<File> getCheckpointFiles() {
File[] checkpoints = checkpointDir.listFiles(filter);
if (checkpoints == null || checkpoints.length == 0) {
if (LOGGER.isInfoEnabled()) {
LOGGER.log(Level.INFO,
"Listing of files in the checkpoint dir returned " + (checkpoints == null ? "null" : "empty"));
}
- return null;
+ return Collections.emptyList();
}
if (LOGGER.isInfoEnabled()) {
LOGGER.log(Level.INFO, "Listing of files in the checkpoint dir returned " + Arrays.toString(checkpoints));
}
+ return Arrays.asList(checkpoints);
+ }
+
+ private List<Checkpoint> getOrderedCheckpoints(List<File> checkpoints) {
List<Checkpoint> checkpointObjectList = new ArrayList<>();
for (File file : checkpoints) {
try {
@@ -134,106 +228,30 @@
}
}
}
- /**
- * If all checkpoint files are corrupted, we have no option but to try to perform recovery.
- * We will forge a checkpoint that forces recovery to start from the beginning of the log.
- * This shouldn't happen unless a hardware corruption happens.
- */
- if (checkpointObjectList.isEmpty()) {
- LOGGER.error("All checkpoint files are corrupted. Forcing recovery from the beginning of the log");
- checkpointObjectList.add(forgeForceRecoveryCheckpoint());
- }
-
- // Sort checkpointObjects in descending order by timeStamp to find out the most recent one.
Collections.sort(checkpointObjectList);
-
- // Return the most recent one (the first one in sorted list)
- return checkpointObjectList.get(0);
- }
-
- @Override
- public void start() {
- checkpointer = new CheckpointThread(this, txnSubsystem.getLogManager(), lsnThreshold, pollFrequency);
- checkpointer.start();
- }
-
- @Override
- public void stop(boolean dumpState, OutputStream ouputStream) throws IOException {
- checkpointer.shutdown();
- checkpointer.interrupt();
- try {
- // Wait until checkpoint thread stops
- checkpointer.join();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
-
- @Override
- public void dumpState(OutputStream os) throws IOException {
- // Nothing to dump
- }
-
- public Path getCheckpointPath(long checkpointTimestamp) {
- return Paths.get(checkpointDir.getAbsolutePath() + File.separator + CHECKPOINT_FILENAME_PREFIX
- + Long.toString(checkpointTimestamp));
- }
-
- protected void capture(long minMCTFirstLSN, boolean sharp) throws HyracksDataException {
- ILogManager logMgr = txnSubsystem.getLogManager();
- ITransactionManager txnMgr = txnSubsystem.getTransactionManager();
- Checkpoint checkpointObject = new Checkpoint(logMgr.getAppendLSN(), minMCTFirstLSN, txnMgr.getMaxTxnId(),
- System.currentTimeMillis(), sharp, StorageConstants.VERSION);
- persist(checkpointObject);
- cleanup();
- }
-
- protected Checkpoint forgeForceRecoveryCheckpoint() {
- /**
- * By setting the checkpoint first LSN (low watermark) to Long.MIN_VALUE, the recovery manager will start from
- * the first available log.
- * We set the storage version to the current version. If there is a version mismatch, it will be detected
- * during recovery.
- */
- return new Checkpoint(Long.MIN_VALUE, Long.MIN_VALUE, Integer.MIN_VALUE, System.currentTimeMillis(), false,
- StorageConstants.VERSION);
- }
-
- private void persist(Checkpoint checkpoint) throws HyracksDataException {
- // Get checkpoint file path
- Path path = getCheckpointPath(checkpoint.getTimeStamp());
-
- if (LOGGER.isInfoEnabled()) {
- File file = path.toFile();
- LOGGER.log(Level.INFO, "Persisting checkpoint file to " + file + " which "
- + (file.exists() ? "already exists" : "doesn't exist yet"));
- }
- // Write checkpoint file to disk
- try {
- byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(checkpoint.toJson(persistedResourceRegistry));
- Files.write(path, bytes);
- } catch (IOException e) {
- LOGGER.log(Level.ERROR, "Failed to write checkpoint to disk", e);
- throw HyracksDataException.create(e);
- }
- if (LOGGER.isInfoEnabled()) {
- File file = path.toFile();
- LOGGER.log(Level.INFO, "Completed persisting checkpoint file to " + file + " which now "
- + (file.exists() ? "exists" : " still doesn't exist"));
- }
+ return checkpointObjectList;
}
private void cleanup() {
- File[] checkpointFiles = checkpointDir.listFiles(filter);
- // Sort the filenames lexicographically to keep the latest checkpoint history files.
- Arrays.sort(checkpointFiles);
- for (int i = 0; i < checkpointFiles.length - historyToKeep; i++) {
- if (LOGGER.isWarnEnabled()) {
- LOGGER.warn("Deleting checkpoint file at: " + checkpointFiles[i].getAbsolutePath());
- }
- if (!checkpointFiles[i].delete() && LOGGER.isWarnEnabled()) {
- LOGGER.warn("Could not delete checkpoint file at: " + checkpointFiles[i].getAbsolutePath());
+ final List<File> checkpointFiles = getCheckpointFiles();
+ final List<Checkpoint> orderedCheckpoints = getOrderedCheckpoints(checkpointFiles);
+ final int deleteCount = orderedCheckpoints.size() - historyToKeep;
+ for (int i = 0; i < deleteCount; i++) {
+ final Checkpoint checkpoint = orderedCheckpoints.get(i);
+ final Path checkpointPath = getCheckpointPath(checkpoint.getId());
+ LOGGER.warn("Deleting checkpoint file at: {}", checkpointPath);
+ if (!checkpointPath.toFile().delete()) {
+ LOGGER.warn("Could not delete checkpoint file at: {}", checkpointPath);
}
}
}
+
+ private long getNextCheckpointId() {
+ final Checkpoint latest = getLatest();
+ if (latest == null) {
+ return FIRST_CHECKPOINT_ID;
+ }
+ return latest.getId() + 1;
+ }
+
}
\ No newline at end of file
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
index 6efd0e5..ce523db 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
@@ -38,7 +38,7 @@
public class CheckpointManager extends AbstractCheckpointManager {
private static final Logger LOGGER = LogManager.getLogger();
- private static final long NO_SECURED_LSN = -1l;
+ private static final long NO_SECURED_LSN = -1L;
private final Map<TxnId, Long> securedLSNs;
public CheckpointManager(ITransactionSubsystem txnSubsystem, CheckpointProperties checkpointProperties) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java
index 9ac7168..7c8fb34 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java
@@ -107,6 +107,7 @@
public synchronized void registerPartitionRequest(PartitionId partitionId, NetworkOutputChannel writer) {
if (failedJobsCache.getIfPresent(partitionId.getJobId()) != null) {
writer.abort(AbstractChannelWriteInterface.REMOTE_ERROR_CODE);
+ return;
}
List<IPartition> pList = availablePartitionMap.get(partitionId);
if (pList != null && !pList.isEmpty()) {
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
index 0a28e93..5c927f9 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
@@ -75,7 +75,7 @@
if (writableDataPresent) {
return credits > 0;
}
- if (eos && !eosSent) {
+ if (isPendingCloseWrite()) {
return true;
}
return ecode.get() == REMOTE_ERROR_CODE && !ecodeSent;
@@ -116,6 +116,10 @@
return credits;
}
+ protected boolean isPendingCloseWrite() {
+ return eos && !eosSent && !ecodeSent;
+ }
+
private class CloseableBufferAcceptor implements ICloseableBufferAcceptor {
@Override
public void accept(ByteBuffer buffer) {
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java
index 628007d..3f4618b 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java
@@ -58,7 +58,7 @@
ecodeSent = true;
ccb.reportLocalEOS();
adjustChannelWritability();
- } else if (eos && !eosSent) {
+ } else if (isPendingCloseWrite()) {
writerState.getCommand().setChannelId(channelId);
writerState.getCommand().setCommandType(MuxDemuxCommand.CommandType.CLOSE_CHANNEL);
writerState.getCommand().setData(0);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITreeIndexFrame.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITreeIndexFrame.java
index 7ac49c6..dc59612 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITreeIndexFrame.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITreeIndexFrame.java
@@ -36,7 +36,7 @@
* Storage version #. Change this if you alter any tree frame formats to stop
* possible corruption from old versions reading new formats.
*/
- public static final int VERSION = 6;
+ public static final int VERSION = 7;
public static final int TUPLE_COUNT_OFFSET = 0;
public static final int FREE_SPACE_OFFSET = TUPLE_COUNT_OFFSET + 4;
public static final int LEVEL_OFFSET = FREE_SPACE_OFFSET + 4;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameTupleProcessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameTupleProcessor.java
index 3fbe6cd..b6192c1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameTupleProcessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameTupleProcessor.java
@@ -43,4 +43,11 @@
* Called once per batch before ending the batch process
*/
void finish() throws HyracksDataException;
+
+ /**
+ * Called when a failure is encountered processing a frame
+ *
+ * @param th
+ */
+ void fail(Throwable th);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
index 1f481c9..904029b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
@@ -81,12 +81,13 @@
(dir, name) -> !name.startsWith(".") && name.endsWith(BLOOM_FILTER_SUFFIX);
protected static final Comparator<String> cmp = new FileNameComparator();
private static final FilenameFilter dummyFilter = (dir, name) -> true;
-
+ private static final long UNINITALIZED_COMPONENT_SEQ = -1;
protected final IIOManager ioManager;
// baseDir should reflect dataset name and partition name and be absolute
protected final FileReference baseDir;
protected final Comparator<IndexComponentFileReference> recencyCmp = new RecencyComparator();
protected final TreeIndexFactory<? extends ITreeIndex> treeFactory;
+ private long lastUsedComponentSeq = UNINITALIZED_COMPONENT_SEQ;
public AbstractLSMIndexFileManager(IIOManager ioManager, FileReference file,
TreeIndexFactory<? extends ITreeIndex> treeFactory) {
@@ -355,11 +356,18 @@
}
protected String getNextComponentSequence(FilenameFilter filenameFilter) throws HyracksDataException {
+ if (lastUsedComponentSeq == UNINITALIZED_COMPONENT_SEQ) {
+ lastUsedComponentSeq = getOnDiskLastUsedComponentSequence(filenameFilter);
+ }
+ return IndexComponentFileReference.getFlushSequence(++lastUsedComponentSeq);
+ }
+
+ private long getOnDiskLastUsedComponentSequence(FilenameFilter filenameFilter) throws HyracksDataException {
long maxComponentSeq = -1;
final String[] files = listDirFiles(baseDir, filenameFilter);
for (String fileName : files) {
maxComponentSeq = Math.max(maxComponentSeq, IndexComponentFileReference.of(fileName).getSequenceEnd());
}
- return IndexComponentFileReference.getFlushSequence(maxComponentSeq + 1);
+ return maxComponentSeq;
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index aa7be86..e9f6f20 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -700,6 +700,9 @@
try {
processFrame(accessor, tuple, processor);
frameOpCallback.frameCompleted();
+ } catch (Throwable th) {
+ processor.fail(th);
+ throw th;
} finally {
processor.finish();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
index f9d9b1b..e2ae73a 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
@@ -161,9 +161,12 @@
Field hooksField = clazz.getDeclaredField("hooks");
hooksField.setAccessible(true);
IdentityHashMap hooks = (IdentityHashMap) hooksField.get(null);
- LOGGER.info("the following ({}) shutdown hooks have been registered: {}", hooks::size, hooks::toString);
+ if (hooks != null) {
+ LOGGER.info("the following ({}) shutdown hooks have been registered: {}", hooks::size,
+ hooks::toString);
+ }
} catch (Exception e) {
- LOGGER.warn("ignoring exception trying to determine number of shutdown hooks", e);
+ LOGGER.debug("ignoring exception trying to log shutdown hooks", e);
}
}
}