Merge branch 'gerrit/neo'
Change-Id: I7ba10b2a046866640ab3d646ea6b71aae399f436
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java
index 4e496e4..5408915 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java
@@ -98,6 +98,7 @@
import org.apache.hyracks.data.std.api.IPointable;
import org.apache.hyracks.data.std.primitive.VoidPointable;
import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+import org.apache.hyracks.util.LogRedactionUtil;
import com.google.common.collect.ImmutableMap;
@@ -338,7 +339,7 @@
IWarningCollector warningCollector = optContext.getWarningCollector();
if (warningCollector.shouldWarn()) {
warningCollector.warn(Warning.of(fieldNameExpr.second.getSourceLocation(),
- ErrorCode.COMPILATION_DUPLICATE_FIELD_NAME, fieldName));
+ ErrorCode.COMPILATION_DUPLICATE_FIELD_NAME, LogRedactionUtil.userData(fieldName)));
}
iterator.remove();
iterator.next();
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveDuplicateFieldsRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveDuplicateFieldsRule.java
index 6c4f8c6..e924a4c 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveDuplicateFieldsRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveDuplicateFieldsRule.java
@@ -41,6 +41,7 @@
import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
import org.apache.hyracks.api.exceptions.IWarningCollector;
import org.apache.hyracks.api.exceptions.Warning;
+import org.apache.hyracks.util.LogRedactionUtil;
/**
* <pre>
@@ -116,7 +117,7 @@
IWarningCollector warningCollector = context.getWarningCollector();
if (warningCollector.shouldWarn()) {
warningCollector.warn(Warning.of(fieldNameExpr.getSourceLocation(),
- ErrorCode.COMPILATION_DUPLICATE_FIELD_NAME, fieldName));
+ ErrorCode.COMPILATION_DUPLICATE_FIELD_NAME, LogRedactionUtil.userData(fieldName)));
}
iterator.remove();
iterator.next();
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java
index ea80ea6..3c4537f 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java
@@ -136,6 +136,7 @@
import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.util.LogRedactionUtil;
/**
* Each visit returns a pair of an operator and a variable. The variable
@@ -1009,7 +1010,8 @@
private FieldBinding generateFieldBinding(String fieldName, Expression fieldValueExpr, Set<String> outFieldNames,
SourceLocation sourceLoc) throws CompilationException {
if (!outFieldNames.add(fieldName)) {
- throw new CompilationException(ErrorCode.DUPLICATE_FIELD_NAME, sourceLoc, fieldName);
+ throw new CompilationException(ErrorCode.DUPLICATE_FIELD_NAME, sourceLoc,
+ LogRedactionUtil.userData(fieldName));
}
return new FieldBinding(new LiteralExpr(new StringLiteral(fieldName)), fieldValueExpr);
}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/TypeTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/TypeTranslator.java
index c482269..c54cdc1 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/TypeTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/TypeTranslator.java
@@ -55,6 +55,7 @@
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.util.LogRedactionUtil;
public class TypeTranslator {
@@ -314,7 +315,8 @@
int i = 0;
for (String s : names) {
if (names.indexOf(s) < i) {
- throw new CompilationException(ErrorCode.DUPLICATE_FIELD_NAME, rtd.getSourceLocation(), s);
+ throw new CompilationException(ErrorCode.DUPLICATE_FIELD_NAME, rtd.getSourceLocation(),
+ LogRedactionUtil.userData(s));
}
fldNames[i++] = s;
}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/util/ValidateUtil.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/util/ValidateUtil.java
index 248e6d0..e51a539 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/util/ValidateUtil.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/util/ValidateUtil.java
@@ -242,7 +242,7 @@
throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc,
"The field '"
+ LogRedactionUtil.userData(RecordUtil.toFullyQualifiedName(displayFieldName))
- + "' which is " + "of type " + fieldType.getTypeTag()
+ + "' which is of type " + fieldType.getTypeTag()
+ " cannot be indexed using the BTree index.");
}
break;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
index 96e89a7..290734f 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
@@ -102,6 +102,7 @@
public synchronized void masterFlush(long masterLsn, long localLsn) throws HyracksDataException {
final IndexCheckpoint latest = getLatest();
latest.getMasterNodeFlushMap().put(masterLsn, localLsn);
+ LOGGER.trace("index {} master flush {} -> {}", indexPath, masterLsn, localLsn);
final IndexCheckpoint next = IndexCheckpoint.next(latest, latest.getLowWatermark(),
latest.getValidComponentSequence(), latest.getLastComponentId(), null);
persist(next);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 5e4b730..ebc8097 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -1235,13 +1235,15 @@
if (stmtCreateIndex.isEnforced()) {
if (!projectTypeExpr.isUnknownable()) {
throw new CompilationException(ErrorCode.INDEX_ILLEGAL_ENFORCED_NON_OPTIONAL,
- indexedElement.getSourceLocation(), String.valueOf(projectPath));
+ indexedElement.getSourceLocation(),
+ LogRedactionUtil.userData(String.valueOf(projectPath)));
}
// don't allow creating an enforced index on a closed-type field, fields that
// are part of schema get the field type, if it's not null, then the field is closed-type
if (isFieldFromSchema) {
throw new CompilationException(ErrorCode.INDEX_ILLEGAL_ENFORCED_ON_CLOSED_FIELD,
- indexedElement.getSourceLocation(), String.valueOf(projectPath));
+ indexedElement.getSourceLocation(),
+ LogRedactionUtil.userData(String.valueOf(projectPath)));
}
} else {
if (indexType != IndexType.BTREE && indexType != IndexType.ARRAY) {
@@ -1272,7 +1274,7 @@
if (fieldTypePrime == null) {
throw new CompilationException(ErrorCode.UNKNOWN_TYPE, indexedElement.getSourceLocation(),
- String.valueOf(projectPath));
+ LogRedactionUtil.userData(String.valueOf(projectPath)));
}
validateIndexFieldType(indexType, fieldTypePrime, projectPath, indexedElement.getSourceLocation());
@@ -1284,7 +1286,8 @@
// Try to add the key & its source to the set of keys for duplicate detection.
if (!indexKeysSet.add(indexedElement.toIdentifier())) {
throw new AsterixException(ErrorCode.INDEX_ILLEGAL_REPETITIVE_FIELD,
- indexedElement.getSourceLocation(), indexedElement.getProjectListDisplayForm());
+ indexedElement.getSourceLocation(),
+ LogRedactionUtil.userData(indexedElement.getProjectListDisplayForm()));
}
indexFieldTypes.add(fieldTypes);
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
index d99f2e9..54793dc 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
@@ -44,7 +44,7 @@
"replication\.log\.buffer\.numpages" : 8,
"replication\.log\.buffer\.pagesize" : 131072,
"replication\.strategy" : "none",
- "replication\.timeout" : 30,
+ "replication\.timeout" : 120,
"ssl\.enabled" : false,
"storage.compression.block" : "snappy",
"storage.global.cleanup.timeout" : 600,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
index 479de3e..06da0ad 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
@@ -44,7 +44,7 @@
"replication\.log\.buffer\.numpages" : 8,
"replication\.log\.buffer\.pagesize" : 131072,
"replication\.strategy" : "none",
- "replication\.timeout" : 30,
+ "replication\.timeout" : 120,
"ssl\.enabled" : false,
"storage.compression.block" : "snappy",
"storage.global.cleanup.timeout" : 600,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
index 3349f11..bef14c1 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
@@ -44,7 +44,7 @@
"replication\.log\.buffer\.numpages" : 8,
"replication\.log\.buffer\.pagesize" : 131072,
"replication\.strategy" : "none",
- "replication\.timeout" : 30,
+ "replication\.timeout" : 120,
"ssl\.enabled" : false,
"storage.compression.block" : "snappy",
"storage.global.cleanup.timeout" : 600,
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ReplicationProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ReplicationProperties.java
index dd42936..ada3875 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ReplicationProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ReplicationProperties.java
@@ -48,7 +48,7 @@
"The size in bytes to replicate in each batch"),
REPLICATION_TIMEOUT(
LONG,
- TimeUnit.SECONDS.toSeconds(30),
+ TimeUnit.SECONDS.toSeconds(120),
"The time in seconds to timeout waiting for master or replica to ack"),
REPLICATION_ENABLED(BOOLEAN, false, "Whether or not data replication is enabled"),
REPLICATION_FACTOR(NONNEGATIVE_INTEGER, 2, "Number of replicas (backups) to maintain per master replica"),
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/base/JRecord.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/base/JRecord.java
index 9bd6461..622533b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/base/JRecord.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/base/JRecord.java
@@ -40,6 +40,7 @@
import org.apache.asterix.om.types.IAType;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.util.LogRedactionUtil;
public final class JRecord extends JComplexObject<Map<String, Object>> {
@@ -89,7 +90,8 @@
// check open part
IJObject fieldValue = openFields.get(fieldName);
if (fieldValue == null) {
- throw new RuntimeDataException(ErrorCode.LIBRARY_JAVA_JOBJECTS_UNKNOWN_FIELD, fieldName);
+ throw new RuntimeDataException(ErrorCode.LIBRARY_JAVA_JOBJECTS_UNKNOWN_FIELD,
+ LogRedactionUtil.userData(fieldName));
}
return fieldValue;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java
index 4a46717..d75b84d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java
@@ -55,6 +55,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.IMutableValueStorage;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.util.LogRedactionUtil;
/**
* Parser for ADM formatted data.
@@ -472,7 +473,7 @@
fieldId = recBuilder.getFieldId(fldName);
if ((fieldId < 0) && !recType.isOpen()) {
throw new ParseException(ErrorCode.PARSER_ADM_DATA_PARSER_EXTRA_FIELD_IN_CLOSED_RECORD,
- fldName);
+ LogRedactionUtil.userData(fldName));
} else if ((fieldId < 0) && recType.isOpen()) {
parseString(tmpTokenImage.getBuffer(), tmpTokenImage.getBegin() + 1,
tmpTokenImage.getLength() - 2, fieldNameBuffer.getDataOutput());
@@ -527,7 +528,7 @@
final int nullableFieldId = checkOptionalConstraints(recType, nulls);
if (nullableFieldId != -1) {
throw new ParseException(ErrorCode.PARSER_ADM_DATA_PARSER_FIELD_NOT_NULL,
- recType.getFieldNames()[nullableFieldId]);
+ LogRedactionUtil.userData(recType.getFieldNames()[nullableFieldId]));
}
}
recBuilder.write(out, true);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AbstractJsonDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AbstractJsonDataParser.java
index 9a602de..4f8b628 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AbstractJsonDataParser.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AbstractJsonDataParser.java
@@ -48,6 +48,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.util.ExceptionUtils;
import org.apache.hyracks.data.std.api.IMutableValueStorage;
+import org.apache.hyracks.util.LogRedactionUtil;
import org.apache.hyracks.util.ParseUtil;
import com.fasterxml.jackson.core.JsonFactory;
@@ -198,7 +199,7 @@
if (!recordType.isOpen() && fieldIndex < 0) {
throw new RuntimeDataException(ErrorCode.PARSER_ADM_DATA_PARSER_EXTRA_FIELD_IN_CLOSED_RECORD,
- fieldName);
+ LogRedactionUtil.userData(fieldName));
}
valueBuffer.reset();
nextToken();
@@ -213,7 +214,8 @@
//fail fast if the current field is not nullable
if (currentToken() == ADMToken.NULL && !isNullableType(fieldType)) {
- throw new RuntimeDataException(ErrorCode.PARSER_EXT_DATA_PARSER_CLOSED_FIELD_NULL, fieldName);
+ throw new RuntimeDataException(ErrorCode.PARSER_EXT_DATA_PARSER_CLOSED_FIELD_NULL,
+ LogRedactionUtil.userData(fieldName));
}
nullBitMap.set(fieldIndex);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AbstractNestedDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AbstractNestedDataParser.java
index bc15f1e..b8a74d9 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AbstractNestedDataParser.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AbstractNestedDataParser.java
@@ -35,6 +35,7 @@
import org.apache.asterix.om.types.hierachy.ATypeHierarchy;
import org.apache.asterix.om.utils.RecordUtil;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.util.LogRedactionUtil;
/**
* Abstract class for nested formats (ADM, JSON, XML ... etc)
@@ -117,7 +118,7 @@
for (int i = 0; i < recordType.getFieldTypes().length; i++) {
if (!nullBitmap.get(i) && !isMissableType(recordType.getFieldTypes()[i])) {
throw new RuntimeDataException(ErrorCode.PARSER_EXT_DATA_PARSER_CLOSED_FIELD_NULL,
- recordType.getFieldNames()[i]);
+ LogRedactionUtil.userData(recordType.getFieldNames()[i]));
}
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
index 590f51d..2936d11 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
@@ -51,6 +51,7 @@
import org.apache.hyracks.dataflow.common.data.parsers.IValueParser;
import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
import org.apache.hyracks.dataflow.std.file.FieldCursorForDelimitedDataParser;
+import org.apache.hyracks.util.LogRedactionUtil;
import org.apache.hyracks.util.ParseUtil;
public class DelimitedDataParser extends AbstractDataParser implements IStreamDataParser, IRecordDataParser<char[]> {
@@ -108,7 +109,8 @@
fldIds[i] = recBuilder.getFieldId(name);
if (fldIds[i] < 0) {
if (!recordType.isOpen()) {
- throw new RuntimeDataException(ErrorCode.PARSER_DELIMITED_ILLEGAL_FIELD, name, recordType);
+ throw new RuntimeDataException(ErrorCode.PARSER_DELIMITED_ILLEGAL_FIELD,
+ LogRedactionUtil.userData(name), recordType);
} else {
nameBuffers[i] = new ArrayBackedValueStorage();
str.setValue(name);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java
index b970b04..99d3a5b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/TweetParser.java
@@ -43,6 +43,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.IMutableValueStorage;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.util.LogRedactionUtil;
import org.apache.hyracks.util.string.UTF8StringWriter;
import com.fasterxml.jackson.databind.JsonNode;
@@ -200,7 +201,7 @@
if (obj.get(curFNames[iter1]).isNull() && !(curTypes[iter1] instanceof AUnionType)) {
if (curRecType.isClosedField(curFNames[iter1])) {
throw new RuntimeDataException(ErrorCode.PARSER_EXT_DATA_PARSER_CLOSED_FIELD_NULL,
- curFNames[iter1]);
+ LogRedactionUtil.userData(curFNames[iter1]));
} else {
continue;
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/KeyFieldTypeUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/KeyFieldTypeUtil.java
index 61effa8..c4cc1de 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/KeyFieldTypeUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/KeyFieldTypeUtil.java
@@ -40,6 +40,7 @@
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.common.utils.Triple;
import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.util.LogRedactionUtil;
public class KeyFieldTypeUtil {
@@ -323,7 +324,7 @@
} else {
// closed record type and we couldn't find the field -> error.
throw new CompilationException(ErrorCode.COMPILATION_FIELD_NOT_FOUND, sourceLoc,
- RecordUtil.toFullyQualifiedName(path));
+ LogRedactionUtil.userData(RecordUtil.toFullyQualifiedName(path)));
}
}
if (fieldType.getTypeTag() == ATypeTag.UNION) {
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/RTreeResourceFactoryProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/RTreeResourceFactoryProvider.java
index 463cffb..aa37ebb 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/RTreeResourceFactoryProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/RTreeResourceFactoryProvider.java
@@ -39,7 +39,7 @@
import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.om.utils.NonTaggedFormatUtil;
-import org.apache.commons.lang3.StringUtils;
+import org.apache.asterix.om.utils.RecordUtil;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
@@ -59,6 +59,7 @@
import org.apache.hyracks.storage.am.rtree.frames.RTreePolicyType;
import org.apache.hyracks.storage.common.IResourceFactory;
import org.apache.hyracks.storage.common.IStorageManager;
+import org.apache.hyracks.util.LogRedactionUtil;
public class RTreeResourceFactoryProvider implements IResourceFactoryProvider {
@@ -82,7 +83,7 @@
indexDetails.getKeyFieldNames().get(0), recordType).first;
if (spatialType == null) {
throw new CompilationException(ErrorCode.COMPILATION_FIELD_NOT_FOUND,
- StringUtils.join(indexDetails.getKeyFieldNames().get(0), '.'));
+ LogRedactionUtil.userData(RecordUtil.toFullyQualifiedName(indexDetails.getKeyFieldNames().get(0))));
}
List<List<String>> primaryKeyFields = dataset.getPrimaryKeys();
int numPrimaryKeys = primaryKeyFields.size();
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/ClosedRecordConstructorResultType.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/ClosedRecordConstructorResultType.java
index 077e9b3..d3cb2d7 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/ClosedRecordConstructorResultType.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/ClosedRecordConstructorResultType.java
@@ -38,6 +38,7 @@
import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import org.apache.hyracks.util.LogRedactionUtil;
public class ClosedRecordConstructorResultType implements IResultTypeComputer {
@@ -76,7 +77,8 @@
}
for (int j = 0; j < i; j++) {
if (fieldName.equals(fieldNames[j])) {
- throw new CompilationException(ErrorCode.DUPLICATE_FIELD_NAME, f.getSourceLocation(), fieldName);
+ throw new CompilationException(ErrorCode.DUPLICATE_FIELD_NAME, f.getSourceLocation(),
+ LogRedactionUtil.userData(fieldName));
}
}
fieldTypes[i] = e2Type;
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/OpenRecordConstructorResultType.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/OpenRecordConstructorResultType.java
index 6b08230..838f6f8 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/OpenRecordConstructorResultType.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/OpenRecordConstructorResultType.java
@@ -43,6 +43,7 @@
import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import org.apache.hyracks.util.LogRedactionUtil;
public class OpenRecordConstructorResultType implements IResultTypeComputer {
@@ -78,7 +79,8 @@
String fieldName = ConstantExpressionUtil.getStringConstant(e1);
if (fieldName != null && t2 != null && TypeHelper.isClosed(t2)) {
if (namesList.contains(fieldName)) {
- throw new CompilationException(ErrorCode.DUPLICATE_FIELD_NAME, f.getSourceLocation(), fieldName);
+ throw new CompilationException(ErrorCode.DUPLICATE_FIELD_NAME, f.getSourceLocation(),
+ LogRedactionUtil.userData(fieldName));
}
namesList.add(fieldName);
if (t2.getTypeTag() == ATypeTag.UNION) {
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/RecordMergeTypeComputer.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/RecordMergeTypeComputer.java
index b1e4021..9985a4c 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/RecordMergeTypeComputer.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/RecordMergeTypeComputer.java
@@ -40,6 +40,7 @@
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.util.LogRedactionUtil;
public class RecordMergeTypeComputer implements IResultTypeComputer {
@@ -111,7 +112,7 @@
// If the ignore duplicates flag is not set, we throw a duplicate field exception
else {
throw new CompilationException(ErrorCode.COMPILATION_DUPLICATE_FIELD_NAME,
- f.getSourceLocation(), fieldNames[i]);
+ f.getSourceLocation(), LogRedactionUtil.userData(fieldNames[i]));
}
}
@@ -144,7 +145,8 @@
private IAType mergedNestedType(String fieldName, IAType fieldType1, IAType fieldType0, SourceLocation sourceLoc)
throws AlgebricksException {
if (fieldType1.getTypeTag() != ATypeTag.OBJECT || fieldType0.getTypeTag() != ATypeTag.OBJECT) {
- throw new CompilationException(ErrorCode.COMPILATION_DUPLICATE_FIELD_NAME, sourceLoc, fieldName);
+ throw new CompilationException(ErrorCode.COMPILATION_DUPLICATE_FIELD_NAME, sourceLoc,
+ LogRedactionUtil.userData(fieldName));
}
ARecordType resultType = (ARecordType) fieldType0;
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/runtime/RuntimeRecordTypeInfo.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/runtime/RuntimeRecordTypeInfo.java
index ccdfa0e..9d53ab3 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/runtime/RuntimeRecordTypeInfo.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/runtime/RuntimeRecordTypeInfo.java
@@ -32,6 +32,7 @@
import org.apache.hyracks.data.std.accessors.UTF8StringBinaryComparatorFactory;
import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
import org.apache.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
+import org.apache.hyracks.util.LogRedactionUtil;
import org.apache.hyracks.util.string.UTF8StringUtil;
import org.apache.hyracks.util.string.UTF8StringWriter;
@@ -100,7 +101,8 @@
int j = getFieldIndex(baaos.getByteArray(), serializedFieldNameOffsets[i],
UTF8StringUtil.getStringLength(baaos.getByteArray(), serializedFieldNameOffsets[i]));
if (j != i) {
- throw new RuntimeDataException(ErrorCode.DUPLICATE_FIELD_NAME, fieldNames[i]);
+ throw new RuntimeDataException(ErrorCode.DUPLICATE_FIELD_NAME,
+ LogRedactionUtil.userData(fieldNames[i]));
}
}
} catch (IOException e) {
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationWorker.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationWorker.java
index c7b2561..d6cccc0 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationWorker.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationWorker.java
@@ -35,4 +35,9 @@
* @return the reusable buffer
*/
ByteBuffer getReusableBuffer();
+
+ /**
+ * @return The remote address of the sender
+ */
+ String getRemoteAddress();
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
index e1f99f4..27da909 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
@@ -93,6 +93,7 @@
ExecutorService threadExecutor = (ExecutorService) appCtx.getThreadExecutor();
syncFuture = threadExecutor.submit(() -> {
try {
+ Thread.currentThread().setName("Replica " + id.toString() + " Synchronizer");
new ReplicaSynchronizer(appCtx, this).sync(register, deltaRecovery);
setStatus(IN_SYNC);
} catch (Exception e) {
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java
index 004b640..80bb3c8 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java
@@ -22,7 +22,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -58,7 +57,7 @@
@Override
public void run() {
final String nodeId = appCtx.getServiceContext().getNodeId();
- Thread.currentThread().setName(nodeId + RemoteLogsNotifier.class.getSimpleName());
+ Thread.currentThread().setName(RemoteLogsNotifier.class.getSimpleName() + ":" + nodeId);
while (!Thread.currentThread().isInterrupted()) {
try {
final RemoteLogRecord logRecord = remoteLogsQ.take();
@@ -86,11 +85,9 @@
private void checkpointReplicaIndexes(RemoteLogRecord remoteLogMapping, int datasetId, int resourcePartition)
throws HyracksDataException {
- final Set<Integer> masterPartitions = appCtx.getReplicaManager().getPartitions();
final Predicate<LocalResource> replicaIndexesPredicate = lr -> {
DatasetLocalResource dls = (DatasetLocalResource) lr.getResource();
- return dls.getDatasetId() == datasetId && dls.getPartition() == resourcePartition
- && !masterPartitions.contains(dls.getPartition());
+ return dls.getDatasetId() == datasetId && dls.getPartition() == resourcePartition;
};
final Map<Long, LocalResource> resources =
localResourceRep.getResources(replicaIndexesPredicate, Collections.singleton(resourcePartition));
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
index 48eb8e3..063709a 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
@@ -38,7 +38,6 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.replication.IReplicationJob;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexReplicationJob;
-import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -153,7 +152,7 @@
if (!replicationJobsQ.isEmpty()) {
return;
}
- LOGGER.log(Level.INFO, "No pending replication jobs. Closing connections to replicas");
+ LOGGER.trace("no pending replication jobs; closing connections to replicas");
for (ReplicationDestination dest : destinations) {
dest.getReplicas().stream().map(PartitionReplica.class::cast).forEach(PartitionReplica::close);
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
index 7f6439c..b38f0aa 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
@@ -49,10 +49,10 @@
byteBuffer.clear();
byteBuffer.limit(length);
- while (byteBuffer.remaining() > 0 && socketChannel.read(byteBuffer) > 0);
+ while (byteBuffer.remaining() > 0 && socketChannel.read(byteBuffer) >= 0);
if (byteBuffer.remaining() > 0) {
- throw new EOFException();
+ throw new EOFException("could not read all data from source; remaining bytes: " + byteBuffer.remaining());
}
byteBuffer.flip();
@@ -114,6 +114,7 @@
while (requestBuffer.hasRemaining()) {
socketChannel.write(requestBuffer);
}
+ socketChannel.getSocketChannel().socket().getOutputStream().flush();
}
//unused
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index 3dc094e..6c6a10a 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -25,6 +25,7 @@
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.config.ReplicationProperties;
@@ -52,6 +53,7 @@
private ServerSocketChannel serverSocketChannel = null;
private final INcApplicationContext appCtx;
private final RemoteLogsProcessor logsProcessor;
+ private final AtomicInteger replicationWorkerCounter = new AtomicInteger(0);
public ReplicationChannel(INcApplicationContext appCtx) {
this.appCtx = appCtx;
@@ -123,19 +125,25 @@
@Override
public void run() {
- Thread.currentThread().setName("Replication Worker");
+ Thread.currentThread().setName("Replication Worker-" + replicationWorkerCounter.incrementAndGet() + "("
+ + getRemoteAddress() + ")");
try {
if (socketChannel.requiresHandshake() && !socketChannel.handshake()) {
+ LOGGER.warn("failed to complete handshake with {}", this::getRemoteAddress);
return;
}
socketChannel.getSocketChannel().configureBlocking(true);
+ LOGGER.trace("reading replication worker initial request");
ReplicationRequestType requestType = ReplicationProtocol.getRequestType(socketChannel, inBuffer);
+ LOGGER.trace("got request type: {}", requestType);
while (requestType != ReplicationRequestType.GOODBYE) {
handle(requestType);
+ LOGGER.trace("handled request type: {}", requestType);
requestType = ReplicationProtocol.getRequestType(socketChannel, inBuffer);
+ LOGGER.trace("got request type: {}", requestType);
}
} catch (Exception e) {
- LOGGER.warn("Unexpected error during replication.", e);
+ LOGGER.warn("unexpected error during replication.", e);
} finally {
NetworkUtil.closeQuietly(socketChannel);
}
@@ -151,6 +159,15 @@
return outBuffer;
}
+ @Override
+ public String getRemoteAddress() {
+ try {
+ return socketChannel.getSocketChannel().getRemoteAddress().toString();
+ } catch (Exception e) {
+ return "unknown";
+ }
+ }
+
private void handle(ReplicationRequestType requestType) throws HyracksDataException {
final IReplicaTask task =
(IReplicaTask) ReplicationProtocol.readMessage(requestType, socketChannel, inBuffer);
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java
index 1e93228..92e4989 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java
@@ -60,7 +60,7 @@
((PersistentLocalResourceRepository) appCtx.getLocalResourceRepository())
.invalidateResource(replicaRes.getRelativePath().toString());
}
- LOGGER.info(() -> "Deleted file: " + localFile.getAbsolutePath());
+ LOGGER.debug(() -> "Deleted file: " + localFile.getAbsolutePath());
} else {
LOGGER.warn(() -> "Requested to delete a non-existing file: " + localFile.getAbsolutePath());
}
@@ -85,6 +85,11 @@
}
}
+ @Override
+ public String toString() {
+ return "DeleteFileTask{" + "file='" + file + '\'' + '}';
+ }
+
public static DeleteFileTask create(DataInput input) throws IOException {
return new DeleteFileTask(input.readUTF());
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
index 1ea076d..fa77378 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
@@ -37,12 +37,16 @@
import org.apache.asterix.replication.sync.IndexSynchronizer;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
+import org.apache.hyracks.util.ThreadDumpUtil;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
/**
* A task to mark a replicated LSM component as valid
*/
public class MarkComponentValidTask implements IReplicaTask {
+ private static final Logger LOGGER = LogManager.getLogger();
private final long masterLsn;
private final long lastComponentId;
private final String file;
@@ -90,7 +94,10 @@
// wait until the lsn mapping is flushed to disk
while (!indexCheckpointManager.isFlushed(masterLsn)) {
if (replicationTimeOut <= 0) {
- throw new ReplicationException(new TimeoutException("Couldn't receive flush lsn from master"));
+ LOGGER.warn("{} seconds passed without receiving flush lsn {} from master for component {}",
+ appCtx.getReplicationProperties().getReplicationTimeOut(), masterLsn, file);
+ LOGGER.debug("thead dump on receiving flush lsn timeout {}", ThreadDumpUtil::takeDumpString);
+ throw new ReplicationException(new TimeoutException("couldn't receive flush lsn from master"));
}
final long startTime = System.nanoTime();
indexCheckpointManager.wait(replicationTimeOut);
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
index d9b3b0c..0f5949e 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
@@ -33,12 +33,15 @@
import org.apache.asterix.replication.api.IReplicationWorker;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
/**
* A task to get the list of the files in a partition on a replica
*/
public class PartitionResourcesListTask implements IReplicaTask {
+ private static final Logger LOGGER = LogManager.getLogger();
private final int partition;
public PartitionResourcesListTask(int partition) {
@@ -47,20 +50,25 @@
@Override
public void perform(INcApplicationContext appCtx, IReplicationWorker worker) throws HyracksDataException {
+ LOGGER.debug("processing {}", this);
final PersistentLocalResourceRepository localResourceRepository =
(PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
localResourceRepository.cleanup(partition);
+ LOGGER.debug("cleaned up partition {}", partition);
final IReplicationStrategy replicationStrategy = appCtx.getReplicationManager().getReplicationStrategy();
// .metadata file -> resource id
Map<String, Long> partitionReplicatedResources =
localResourceRepository.getPartitionReplicatedResources(partition, replicationStrategy);
+ LOGGER.debug("got partition {} resources", partition);
// all data files in partitions + .metadata files
final List<String> partitionFiles =
localResourceRepository.getPartitionReplicatedFiles(partition, replicationStrategy).stream()
.map(StoragePathUtil::getFileRelativePath).collect(Collectors.toList());
+ LOGGER.debug("got partition {} files ({})", partition, partitionFiles.size());
final PartitionResourcesListResponse response = new PartitionResourcesListResponse(partition,
partitionReplicatedResources, partitionFiles, appCtx.getReplicaManager().isPartitionOrigin(partition));
ReplicationProtocol.sendTo(worker.getChannel(), response, worker.getReusableBuffer());
+ LOGGER.debug("sent partition {} files list to requester", partition);
}
@Override
@@ -78,6 +86,11 @@
}
}
+ @Override
+ public String toString() {
+ return "PartitionResourcesListTask{" + "partition=" + partition + '}';
+ }
+
public static PartitionResourcesListTask create(DataInput input) throws HyracksDataException {
try {
int partition = input.readInt();
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateLogsTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateLogsTask.java
index d9357df..b67a71e 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateLogsTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateLogsTask.java
@@ -34,12 +34,15 @@
import org.apache.asterix.replication.management.ReplicationChannel;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.network.ISocketChannel;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
/**
* A task to replicate transaction logs from master replica
*/
public class ReplicateLogsTask implements IReplicaTask {
+ private static final Logger LOGGER = LogManager.getLogger();
public static final int END_REPLICATION_LOG_SIZE = 1;
private final String nodeId;
@@ -61,6 +64,7 @@
logsBuffer = ReplicationProtocol.readRequest(channel, logsBuffer);
// check if it is end of handshake
if (logsBuffer.remaining() == END_REPLICATION_LOG_SIZE) {
+ LOGGER.debug("ending log replication with {}", worker.getRemoteAddress());
break;
}
logsProcessor.process(logsBuffer, reusableLog, worker);
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/FileSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/FileSynchronizer.java
index 73fca9c..0e27a51 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/FileSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/FileSynchronizer.java
@@ -58,7 +58,7 @@
String masterNode = appCtx.getReplicaManager().isPartitionOrigin(replica.getIdentifier().getPartition())
? appCtx.getServiceContext().getNodeId() : null;
ReplicateFileTask task = new ReplicateFileTask(file, filePath.getFile().length(), metadata, masterNode);
- LOGGER.debug("attempting to replicate {} to replica {}", task, replica);
+ LOGGER.trace("attempting {} to replica {}", task, replica);
ReplicationProtocol.sendTo(replica, task);
// send the file itself
try (RandomAccessFile fromFile = new RandomAccessFile(filePath.getFile(), "r");
@@ -66,6 +66,7 @@
NetworkingUtil.sendFile(fileChannel, channel);
}
ReplicationProtocol.waitForAck(replica);
+ LOGGER.debug("completed {} to replica {}", task, replica);
} catch (IOException e) {
throw new ReplicationException(e);
}
@@ -74,8 +75,10 @@
public void delete(String file) {
try {
final DeleteFileTask task = new DeleteFileTask(file);
+ LOGGER.trace("attempting {} from replica {}", task, replica);
ReplicationProtocol.sendTo(replica, task);
ReplicationProtocol.waitForAck(replica);
+ LOGGER.debug("completed {} from replica {}", task, replica);
} catch (IOException e) {
throw new ReplicationException(e);
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
index 44c9404..5d217a4 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
@@ -70,7 +70,9 @@
if (!deltaRecovery) {
deletePartitionFromReplica(partition);
}
+ LOGGER.trace("getting replica files");
PartitionResourcesListResponse replicaResourceResponse = getReplicaFiles(partition);
+ LOGGER.trace("got replica files");
Map<ResourceReference, Long> resourceReferenceLongMap = getValidReplicaResources(
replicaResourceResponse.getPartitionReplicatedResources(), replicaResourceResponse.isOrigin());
// clean up files for invalid resources (deleted or recreated while the replica was down)
@@ -82,6 +84,7 @@
final Set<String> masterFiles =
localResourceRepository.getPartitionReplicatedFiles(partition, replicationStrategy).stream()
.map(StoragePathUtil::getFileRelativePath).collect(Collectors.toSet());
+ LOGGER.trace("got master partition files");
// exclude from the replica files the list of invalid deleted files
final Set<String> replicaFiles = new HashSet<>(replicaResourceResponse.getFiles());
replicaFiles.removeAll(deletedReplicaFiles);
@@ -131,8 +134,8 @@
// sort files to ensure index metadata files starting with "." are deleted last
files.sort(String::compareTo);
Collections.reverse(files);
- LOGGER.info("deleting {}", files);
files.forEach(sync::delete);
+ LOGGER.debug("completed invalid files deletion");
}
private long getResourceMasterValidSeq(ResourceReference rr) throws HyracksDataException {
@@ -169,7 +172,7 @@
}
}
if (!invalidFiles.isEmpty()) {
- LOGGER.info("will delete the following files from replica {}", invalidFiles);
+ LOGGER.debug("will delete the following files from replica {}", invalidFiles);
deleteInvalidFiles(new ArrayList<>(invalidFiles));
}
return invalidFiles;
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
index 0d0ef19..459ff01 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
@@ -28,6 +28,8 @@
import org.apache.asterix.replication.messaging.ReplicationProtocol;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
/**
* Performs the steps required to ensure any newly added replica
@@ -35,6 +37,7 @@
*/
public class ReplicaSynchronizer {
+ private static final Logger LOGGER = LogManager.getLogger();
private final INcApplicationContext appCtx;
private final PartitionReplica replica;
@@ -44,16 +47,23 @@
}
public void sync(boolean register, boolean deltaRecovery) throws IOException {
+ LOGGER.debug("starting replica sync process for replica {}", replica);
Object partitionLock = appCtx.getReplicaManager().getPartitionSyncLock(replica.getIdentifier().getPartition());
synchronized (partitionLock) {
+ LOGGER.trace("acquired partition replica lock");
final ICheckpointManager checkpointManager = appCtx.getTransactionSubsystem().getCheckpointManager();
try {
// suspend checkpointing datasets to prevent async IO operations while sync'ing replicas
checkpointManager.suspend();
+ LOGGER.debug("starting replica files sync");
syncFiles(deltaRecovery);
+ LOGGER.debug("completed replica files sync");
checkpointReplicaIndexes();
+ LOGGER.debug("replica indexes checkpoint completed");
if (register) {
+ LOGGER.debug("registering replica");
appCtx.getReplicationManager().register(replica);
+ LOGGER.debug("replica registered");
}
} finally {
checkpointManager.resume();
@@ -68,6 +78,7 @@
appCtx.getDatasetLifecycleManager().flushDataset(replStrategy,
p -> p == replica.getIdentifier().getPartition());
waitForReplicatedDatasetsIO();
+ LOGGER.debug("flushed partition datasets");
fileSync.sync();
}
@@ -77,6 +88,7 @@
appCtx.getReplicaManager().isPartitionOrigin(partition) ? appCtx.getServiceContext().getNodeId() : null;
CheckpointPartitionIndexesTask task =
new CheckpointPartitionIndexesTask(partition, getPartitionMaxComponentId(partition), masterNode);
+ LOGGER.debug("asking replica to checkpoint indexes");
ReplicationProtocol.sendTo(replica, task);
ReplicationProtocol.waitForAck(replica);
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 27e753f..bb3cde5 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -153,33 +153,38 @@
@SuppressWarnings("squid:S1181")
@Override
- public synchronized void insert(LocalResource resource) throws HyracksDataException {
- String relativePath = getFileName(resource.getPath());
- FileReference resourceFile = ioManager.resolve(relativePath);
- if (resourceFile.getFile().exists()) {
- throw new HyracksDataException("Duplicate resource: " + resourceFile.getAbsolutePath());
- }
+ public void insert(LocalResource resource) throws HyracksDataException {
+ FileReference resourceFile;
+ synchronized (this) {
+ String relativePath = getFileName(resource.getPath());
+ resourceFile = ioManager.resolve(relativePath);
+ if (resourceFile.getFile().exists()) {
+ throw new HyracksDataException("Duplicate resource: " + resourceFile.getAbsolutePath());
+ }
- final File parent = resourceFile.getFile().getParentFile();
- if (!parent.exists() && !parent.mkdirs()) {
- throw HyracksDataException.create(CANNOT_CREATE_FILE, parent.getAbsolutePath());
+ final File parent = resourceFile.getFile().getParentFile();
+ if (!parent.exists() && !parent.mkdirs()) {
+ throw HyracksDataException.create(CANNOT_CREATE_FILE, parent.getAbsolutePath());
+ }
+ // The next block should be all or nothing
+ try {
+ createResourceFileMask(resourceFile);
+ byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(resource.toJson(persistedResourceRegistry));
+ FileUtil.writeAndForce(Paths.get(resourceFile.getAbsolutePath()), bytes);
+ indexCheckpointManagerProvider.get(DatasetResourceReference.of(resource)).init(
+ UNINITIALIZED_COMPONENT_SEQ, 0, LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID.getMaxId(), null);
+ deleteResourceFileMask(resourceFile);
+ } catch (Exception e) {
+ cleanup(resourceFile);
+ throw HyracksDataException.create(e);
+ } catch (Throwable th) {
+ LOGGER.error("Error creating resource {}", resourceFile, th);
+ ExitUtil.halt(ExitUtil.EC_ERROR_CREATING_RESOURCES);
+ }
+ resourceCache.put(resource.getPath(), resource);
}
- // The next block should be all or nothing
- try {
- createResourceFileMask(resourceFile);
- byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(resource.toJson(persistedResourceRegistry));
- FileUtil.writeAndForce(Paths.get(resourceFile.getAbsolutePath()), bytes);
- indexCheckpointManagerProvider.get(DatasetResourceReference.of(resource)).init(UNINITIALIZED_COMPONENT_SEQ,
- 0, LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID.getMaxId(), null);
- deleteResourceFileMask(resourceFile);
- } catch (Exception e) {
- cleanup(resourceFile);
- throw HyracksDataException.create(e);
- } catch (Throwable th) {
- LOGGER.error("Error creating resource {}", resourceFile, th);
- ExitUtil.halt(ExitUtil.EC_ERROR_CREATING_RESOURCES);
- }
- resourceCache.put(resource.getPath(), resource);
+ // do not do the replication operation on the synchronized to avoid blocking other threads
+ // on network operations
//if replication enabled, send resource metadata info to remote nodes
if (isReplicationEnabled) {
try {
@@ -203,25 +208,30 @@
}
@Override
- public synchronized void delete(String relativePath) throws HyracksDataException {
+ public void delete(String relativePath) throws HyracksDataException {
FileReference resourceFile = getLocalResourceFileByName(ioManager, relativePath);
- try {
- if (resourceFile.getFile().exists()) {
- try {
- createReplicationJob(ReplicationOperation.DELETE, resourceFile);
- } catch (Exception e) {
- LOGGER.error("failed to delete resource file {} from replicas", resourceFile);
- }
- final LocalResource localResource = readLocalResource(resourceFile.getFile());
- IoUtil.delete(resourceFile);
- // delete all checkpoints
- indexCheckpointManagerProvider.get(DatasetResourceReference.of(localResource)).delete();
- } else {
- throw HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.RESOURCE_DOES_NOT_EXIST,
- relativePath);
+ boolean resourceExists = resourceFile.getFile().exists();
+ if (resourceExists) {
+ try {
+ createReplicationJob(ReplicationOperation.DELETE, resourceFile);
+ } catch (Exception e) {
+ LOGGER.error("failed to delete resource file {} from replicas", resourceFile);
}
- } finally {
- invalidateResource(relativePath);
+ }
+ synchronized (this) {
+ try {
+ if (resourceExists) {
+ final LocalResource localResource = readLocalResource(resourceFile.getFile());
+ IoUtil.delete(resourceFile);
+ // delete all checkpoints
+ indexCheckpointManagerProvider.get(DatasetResourceReference.of(localResource)).delete();
+ } else {
+ throw HyracksDataException
+ .create(org.apache.hyracks.api.exceptions.ErrorCode.RESOURCE_DOES_NOT_EXIST, relativePath);
+ }
+ } finally {
+ invalidateResource(relativePath);
+ }
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannel.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannel.java
index f9bf5c7..9e052f8 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannel.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannel.java
@@ -82,9 +82,9 @@
@Override
public synchronized int read(ByteBuffer buffer) throws IOException {
- int transfereeBytes = 0;
+ int transferredBytes = 0;
if (cachedData) {
- transfereeBytes += transferTo(inAppData, buffer);
+ transferredBytes += transferTo(inAppData, buffer);
}
if (buffer.hasRemaining()) {
if (!partialRecord) {
@@ -97,17 +97,18 @@
inAppData.clear();
if (decrypt() > 0) {
inAppData.flip();
- transfereeBytes += transferTo(inAppData, buffer);
+ transferredBytes += transferTo(inAppData, buffer);
} else {
inAppData.limit(0);
}
} else if (bytesRead < 0) {
+ LOGGER.trace("received EOF; transferred bytes: {}", transferredBytes);
handleEndOfStreamQuietly();
return -1;
}
}
cachedData = inAppData.hasRemaining();
- return transfereeBytes;
+ return transferredBytes;
}
private int decrypt() throws IOException {
@@ -127,7 +128,7 @@
break;
case CLOSED:
close();
- return -1;
+ return decryptedBytes;
default:
throw new IllegalStateException("Invalid SSL result status: " + result.getStatus());
}
@@ -192,6 +193,9 @@
engine.closeOutbound();
try {
new SslHandshake(this).handshake();
+ } catch (Exception e) {
+ // ignore exceptions on best effort graceful close handshake
+ LOGGER.trace("ssl socket close handshake failed", e);
} finally {
socketChannel.close();
}
@@ -239,7 +243,8 @@
close();
}
} catch (Exception e) {
- LOGGER.warn("failed to close socket gracefully", e);
+ // ignore close exception since we are closing quietly
+ LOGGER.trace("failed to close socket gracefully", e);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java
index 4f0c3a8..b2cd435 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java
@@ -57,7 +57,8 @@
try {
closeable.close();
} catch (IOException e) {
- LOGGER.warn("Failed to close", e);
+ // ignore since we are closing quietly
+ LOGGER.trace("failed to close", e);
}
}
}