Merge branch 'gerrit/trinity' into 'master'
Change-Id: Ia4c6bdfa9dce9437049d59e137494b2372348754
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java
index 41f2a53..b2134dc 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java
@@ -67,6 +67,9 @@
CancelQueryRequest cancelQueryMessage = new CancelQueryRequest(serviceCtx.getNodeId(),
cancelQueryFuture.getFutureId(), uuid, clientContextId);
// TODO(mblow): multicc -- need to send cancellation to the correct cc
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Sending CancelQueryRequest with uuid:{}, clientContextID:{}", uuid, clientContextId);
+ }
messageBroker.sendMessageToPrimaryCC(cancelQueryMessage);
CancelQueryResponse cancelResponse =
(CancelQueryResponse) cancelQueryFuture.get(DEFAULT_NC_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
index d2d7a9b..695956c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
@@ -165,7 +165,8 @@
CancelQueryRequest cancelQueryMessage =
new CancelQueryRequest(nodeId, cancelQueryFuture.getFutureId(), uuid, clientContextID);
// TODO(mblow): multicc -- need to send cancellation to the correct cc
- LOGGER.info("Cancelling query due to {}", exception.getClass().getSimpleName());
+ LOGGER.info("Cancelling query with uuid:{}, clientContextID:{} due to {}", uuid, clientContextID,
+ exception.getClass().getSimpleName());
messageBroker.sendMessageToPrimaryCC(cancelQueryMessage);
if (wait) {
cancelQueryFuture.get(ExecuteStatementRequestMessage.DEFAULT_QUERY_CANCELLATION_WAIT_MILLIS,
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index ddd3d64..3fd339e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -195,12 +195,12 @@
@SuppressWarnings("unchecked")
protected void finish(ActiveEvent event) throws HyracksDataException {
if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "the job {} finished", jobId);
+ LOGGER.log(level, "Active job {} finished", jobId);
}
JobId lastJobId = jobId;
if (numRegistered != numDeRegistered) {
LOGGER.log(Level.WARN,
- "the job {} finished with reported runtime registrations = {} and deregistrations = {}", jobId,
+ "Active job {} finished with reported runtime registrations = {} and deregistrations = {}", jobId,
numRegistered, numDeRegistered);
}
jobId = null;
@@ -208,7 +208,7 @@
JobStatus jobStatus = status.getLeft();
List<Exception> exceptions = status.getRight();
if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "The job finished with status: {}", jobStatus);
+ LOGGER.log(level, "Active job {} finished with status {}", lastJobId, jobStatus);
}
if (!jobSuccessfullyTerminated(jobStatus)) {
jobFailure = exceptions.isEmpty() ? new RuntimeDataException(ErrorCode.UNREPORTED_TASK_FAILURE_EXCEPTION)
@@ -372,7 +372,7 @@
@Override
public synchronized void recover() {
if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "Recover is called on " + entityId);
+ LOGGER.log(level, "Recover is called on {}", entityId);
}
if (retryPolicyFactory == NoRetryPolicyFactory.INSTANCE) {
LOGGER.log(level, "But it has no recovery policy, so it is set to permanent failure");
@@ -456,7 +456,7 @@
try {
metadataProvider.getApplicationContext().getHcc().cancelJob(jobId);
} catch (Throwable th) {
- LOGGER.warn("Failed to cancel active job", th);
+ LOGGER.warn("Failed to cancel active job {}", jobId, th);
e.addSuppressed(th);
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
index 284929f..6b3581e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
@@ -66,22 +66,24 @@
@Override
protected void handle(ActiveEvent event) {
- EntityId entityId = jobId2EntityId.get(event.getJobId());
+ JobId jobId = event.getJobId();
+ Kind eventKind = event.getEventKind();
+ EntityId entityId = jobId2EntityId.get(jobId);
if (entityId != null) {
IActiveEntityEventsListener listener = entityEventListeners.get(entityId);
if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "Next event is of type " + event.getEventKind());
+ LOGGER.log(level, "Next event is {} for job {}", eventKind, jobId);
}
- if (event.getEventKind() == Kind.JOB_FINISHED) {
- LOGGER.log(level, "Removing the job");
- jobId2EntityId.remove(event.getJobId());
+ if (eventKind == Kind.JOB_FINISHED) {
+ LOGGER.log(level, "Removing job {}", jobId);
+ jobId2EntityId.remove(jobId);
}
if (listener != null) {
LOGGER.log(level, "Notifying the listener");
listener.notify(event);
}
} else {
- LOGGER.log(Level.ERROR, "Entity not found for received message for job " + event.getJobId());
+ LOGGER.log(Level.ERROR, "Entity not found for event {} for job {}", eventKind, jobId);
}
}
@@ -90,45 +92,43 @@
@Override
public void notifyJobCreation(JobId jobId, JobSpecification jobSpecification) throws HyracksDataException {
if (LOGGER.isEnabled(level)) {
- LOGGER.log(level,
- "notifyJobCreation(JobId jobId, JobSpecification jobSpecification) was called with jobId = "
- + jobId);
+ LOGGER.log(level, "notifyJobCreation was called for job {}", jobId);
}
Object property = jobSpecification.getProperty(ACTIVE_ENTITY_PROPERTY_NAME);
if (!(property instanceof EntityId)) {
if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "Job is not of type active job. property found to be: " + property);
+ LOGGER.log(level, "Job {} is not of type active job. property found to be {}", jobId, property);
}
return;
}
EntityId entityId = (EntityId) property;
monitorJob(jobId, entityId);
boolean found = jobId2EntityId.get(jobId) != null;
- LOGGER.log(level, "Job was found to be: " + (found ? "Active" : "Inactive"));
+ LOGGER.log(level, "Job {} was found to be {}", jobId, (found ? "Active" : "Inactive"));
add(new ActiveEvent(jobId, Kind.JOB_CREATED, entityId, jobSpecification));
}
private synchronized void monitorJob(JobId jobId, EntityId entityId) {
if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "monitorJob(JobId jobId, ActiveJob activeJob) called with job id: " + jobId);
+ LOGGER.log(level, "monitorJob was called for job {}", jobId);
}
boolean found = jobId2EntityId.get(jobId) != null;
if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "Job was found to be: " + (found ? "Active" : "Inactive"));
+ LOGGER.log(level, "Job {} was found to be {}", jobId, (found ? "Active" : "Inactive"));
}
if (entityEventListeners.containsKey(entityId)) {
if (jobId2EntityId.containsKey(jobId)) {
if (LOGGER.isErrorEnabled()) {
- LOGGER.error("Job is already being monitored for job: " + jobId);
+ LOGGER.error("Job {} is already being monitored", jobId);
}
return;
}
if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "monitoring started for job id: " + jobId);
+ LOGGER.log(level, "Monitoring started for job {}", jobId);
}
} else {
if (LOGGER.isEnabled(level)) {
- LOGGER.info("No listener was found for the entity: " + entityId);
+ LOGGER.info("No listener was found for the entity {} for job {}", entityId, jobId);
}
}
jobId2EntityId.put(jobId, entityId);
@@ -146,14 +146,14 @@
public synchronized void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions)
throws HyracksException {
if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "Getting notified of job finish for JobId: " + jobId);
+ LOGGER.log(level, "Getting notified of job finish for job {}", jobId);
}
EntityId entityId = jobId2EntityId.get(jobId);
if (entityId != null) {
add(new ActiveEvent(jobId, Kind.JOB_FINISHED, entityId, Pair.of(jobStatus, exceptions)));
} else {
if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "no need to notify job finish");
+ LOGGER.log(level, "No need to notify JOB_FINISHED for job {}", jobId);
}
}
}
@@ -169,11 +169,11 @@
@Override
public IActiveEntityEventsListener getListener(EntityId entityId) {
if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "getActiveEntityListener(EntityId entityId) was called with entity " + entityId);
+ LOGGER.log(level, "getActiveEntityListener was called with entity {}", entityId);
}
IActiveEntityEventsListener listener = entityEventListeners.get(entityId);
if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "Listener found: " + listener);
+ LOGGER.log(level, "Listener found: {}", listener);
}
return entityEventListeners.get(entityId);
}
@@ -192,8 +192,7 @@
throw new RuntimeDataException(ErrorCode.ACTIVE_NOTIFICATION_HANDLER_IS_SUSPENDED);
}
if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "registerListener(IActiveEntityEventsListener listener) was called for the entity "
- + listener.getEntityId());
+ LOGGER.log(level, "registerListener was called for the entity {}", listener.getEntityId());
}
if (entityEventListeners.containsKey(listener.getEntityId())) {
throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_IS_ALREADY_REGISTERED, listener.getEntityId());
@@ -207,8 +206,7 @@
throw new RuntimeDataException(ErrorCode.ACTIVE_NOTIFICATION_HANDLER_IS_SUSPENDED);
}
if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "unregisterListener(IActiveEntityEventsListener listener) was called for the entity "
- + listener.getEntityId());
+ LOGGER.log(level, "unregisterListener was called for the entity {}", listener.getEntityId());
}
IActiveEntityEventsListener registeredListener = entityEventListeners.remove(listener.getEntityId());
if (registeredListener == null) {
@@ -226,7 +224,7 @@
for (IActiveEntityEventsListener listener : getEventListeners()) {
synchronized (listener) {
if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "Entity " + listener.getEntityId() + " is " + listener.getState());
+ LOGGER.log(level, "Entity {} is {}", listener.getEntityId(), listener.getState());
}
listener.notifyAll();
}
@@ -276,11 +274,11 @@
public void resumeOrHalt(IActiveEntityEventsListener listener, MetadataProvider metadataProvider) {
try {
if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "Resuming " + listener.getEntityId());
+ LOGGER.log(level, "Resuming {}", listener.getEntityId());
}
((ActiveEntityEventsListener) listener).resume(metadataProvider);
if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, listener.getEntityId() + " resumed");
+ LOGGER.log(level, "{} resumed", listener.getEntityId());
}
} catch (Throwable th) { // NOSONAR must halt in case of any failure
LOGGER.error("Resume active failed", th);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
index 6154faa..65d1039 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
@@ -50,6 +50,7 @@
final IRequestTracker requestTracker = appCtx.getRequestTracker();
IClientRequest req = uuid != null ? requestTracker.get(uuid) : requestTracker.getByClientContextId(contextId);
RequestStatus status;
+ String requestId = "";
if (req == null) {
LOGGER.log(Level.INFO, "No request found for uuid {} or context id {}", uuid, contextId);
@@ -59,7 +60,8 @@
status = RequestStatus.REJECTED;
} else {
try {
- requestTracker.cancel(req.getId());
+ requestId = req.getId();
+ requestTracker.cancel(requestId);
status = RequestStatus.SUCCESS;
} catch (Exception e) {
LOGGER.log(Level.WARN, "unexpected exception thrown from cancel", e);
@@ -67,6 +69,10 @@
}
}
}
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Sending CancelQueryResponse to {}. requestId:{}, uuid:{}, contextId:{}, status:{}", nodeId,
+ requestId, uuid, contextId, status);
+ }
CancelQueryResponse response = new CancelQueryResponse(reqId, status);
CCMessageBroker messageBroker = (CCMessageBroker) appCtx.getServiceContext().getMessageBroker();
try {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
index 0a5e033..05bc87b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
@@ -243,7 +243,7 @@
@Override
public String toString() {
- return String.format("%s(id=%s, from=%s): %s", getClass().getSimpleName(), requestMessageId, requestNodeId,
- LogRedactionUtil.statement(statementsText));
+ return String.format("%s(id=%s, from=%s, uuid=%s): %s", getClass().getSimpleName(), requestMessageId,
+ requestNodeId, requestReference.getUuid(), LogRedactionUtil.statement(statementsText));
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java
index 9802001..bb689ac 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java
@@ -41,6 +41,8 @@
@Override
public void operationFailed(ILSMIOOperation operation, Throwable t) {
LOGGER.error("Operation {} has failed", operation, t);
- ExitUtil.halt(ExitUtil.EC_IO_OPERATION_FAILED);
+ if (operation.getIOOpertionType() != ILSMIOOperation.LSMIOOperationType.REPLICATE) {
+ ExitUtil.halt(ExitUtil.EC_IO_OPERATION_FAILED);
+ }
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 56f794b..5d80771 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -492,18 +492,10 @@
final INetworkSecurityManager networkSecurityManager =
ncServiceContext.getControllerService().getNetworkSecurityManager();
- // clients need to have the client factory on their classpath- to enable older clients, only use
- // our client socket factory when SSL is enabled
- if (networkSecurityManager.getConfiguration().isSslEnabled()) {
- final RMIServerFactory serverSocketFactory = new RMIServerFactory(networkSecurityManager);
- final RMIClientFactory clientSocketFactory =
- new RMIClientFactory(networkSecurityManager.getConfiguration());
- metadataNodeStub = (IMetadataNode) UnicastRemoteObject.exportObject(MetadataNode.INSTANCE,
- getMetadataProperties().getMetadataPort(), clientSocketFactory, serverSocketFactory);
- } else {
- metadataNodeStub = (IMetadataNode) UnicastRemoteObject.exportObject(MetadataNode.INSTANCE,
- getMetadataProperties().getMetadataPort());
- }
+ metadataNodeStub = (IMetadataNode) UnicastRemoteObject.exportObject(MetadataNode.INSTANCE,
+ getMetadataProperties().getMetadataPort(),
+ RMIClientFactory.getSocketFactory(networkSecurityManager),
+ RMIServerFactory.getSocketFactory(networkSecurityManager));
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 21ce833..d41b549 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -4813,6 +4813,10 @@
}
}
jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("createAndRunJob jobId:{}, uuid:{}", jobId,
+ requestParameters.getRequestReference().getUuid());
+ }
clientRequest.setJobId(jobId);
if (jId != null) {
jId.setValue(jobId);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/IDatasetRebalanceCallback.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/IDatasetRebalanceCallback.java
index e8683c9..c366977 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/IDatasetRebalanceCallback.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/IDatasetRebalanceCallback.java
@@ -30,7 +30,7 @@
public interface IDatasetRebalanceCallback {
/**
- * The action to perform before the target dataset is populated.
+ * The check to perform before the target dataset is populated.
*
* @param metadataProvider,
* the metadata provider.
@@ -40,9 +40,13 @@
* the target dataset.
* @param hcc,
* the hyracks client connection.
+ *
+ * @return <code>true</code> if the rebalance of the dataset should proceed, otherwise <code>false</code> to skip.
+ * If the dataset is skipped, the active metadata transaction context, if any, can be expected to be aborted.
+ *
* @throws HyracksDataException
*/
- void beforeRebalance(MetadataProvider metadataProvider, Dataset source, Dataset target,
+ boolean canRebalance(MetadataProvider metadataProvider, Dataset source, Dataset target,
IHyracksClientConnection hcc) throws HyracksDataException;
/**
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/NoOpDatasetRebalanceCallback.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/NoOpDatasetRebalanceCallback.java
index 680adbf..7085567 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/NoOpDatasetRebalanceCallback.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/NoOpDatasetRebalanceCallback.java
@@ -33,9 +33,10 @@
}
@Override
- public void beforeRebalance(MetadataProvider metadataProvider, Dataset source, Dataset target,
+ public boolean canRebalance(MetadataProvider metadataProvider, Dataset source, Dataset target,
IHyracksClientConnection hcc) {
// Does nothing.
+ return true;
}
@Override
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
index 1b0b4fd..837d2cb 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
@@ -20,6 +20,8 @@
import static org.apache.asterix.app.translator.QueryTranslator.abort;
import static org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import static org.apache.asterix.common.utils.IdentifierUtil.dataset;
+import static org.apache.asterix.metadata.utils.DatasetUtil.getFullyQualifiedDisplayName;
import static org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE;
import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption;
@@ -88,13 +90,15 @@
* @param targetNcNames, the list of target nodes.
* @param metadataProvider, the metadata provider.
* @param hcc, the reusable hyracks connection.
+ * @return <code>false</code> if the rebalance was safely skipped
* @throws Exception
*/
- public static void rebalance(DataverseName dataverseName, String datasetName, Set<String> targetNcNames,
+ public static boolean rebalance(DataverseName dataverseName, String datasetName, Set<String> targetNcNames,
MetadataProvider metadataProvider, IHyracksClientConnection hcc,
IDatasetRebalanceCallback datasetRebalanceCallback, boolean forceRebalance) throws Exception {
Dataset sourceDataset;
Dataset targetDataset;
+ boolean success = true;
// Executes the first Metadata transaction.
// Generates the rebalance target files. While doing that, hold read locks on the dataset so
// that no one can drop the rebalance source dataset.
@@ -106,13 +110,13 @@
// If the source dataset doesn't exist, then it's a no-op.
if (sourceDataset == null) {
- return;
+ return true;
}
Set<String> sourceNodes = new HashSet<>(metadataProvider.findNodes(sourceDataset.getNodeGroupName()));
if (!forceRebalance && sourceNodes.equals(targetNcNames)) {
- return;
+ return true;
}
if (!targetNcNames.isEmpty()) {
@@ -123,20 +127,25 @@
// The target dataset for rebalance.
targetDataset = sourceDataset.getTargetDatasetForRebalance(nodeGroupName);
- LOGGER.info("Rebalancing dataset {} from node group {} with nodes {} to node group {} with nodes {}",
- sourceDataset.getDatasetName(), sourceDataset.getNodeGroupName(), sourceNodes,
- targetDataset.getNodeGroupName(), targetNcNames);
+ LOGGER.info("Rebalancing {} {} from node group {} with nodes {} to node group {} with nodes {}",
+ dataset(), getFullyQualifiedDisplayName(sourceDataset), sourceDataset.getNodeGroupName(),
+ sourceNodes, targetDataset.getNodeGroupName(), targetNcNames);
// Rebalances the source dataset into the target dataset.
if (sourceDataset.getDatasetType() != DatasetType.EXTERNAL) {
- rebalance(sourceDataset, targetDataset, metadataProvider, hcc, datasetRebalanceCallback);
+ success = rebalance(sourceDataset, targetDataset, metadataProvider, hcc, datasetRebalanceCallback);
}
} else {
targetDataset = null;
// if this the last NC in the cluster, just drop the dataset
purgeDataset(sourceDataset, metadataProvider, hcc);
}
- // Complete the metadata transaction.
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ if (success) {
+ // Complete the metadata transaction.
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } else {
+ // Abort the metadata transaction, since we failed to rebalance the dataset
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ }
} catch (Exception e) {
abort(e, e, mdTxnCtx);
throw e;
@@ -144,7 +153,10 @@
if (targetNcNames.isEmpty()) {
// Nothing else to do since the dataset was dropped.
- return;
+ return true;
+ } else if (!success) {
+ LOGGER.info("Dataset {} rebalance was skipped, see above log for reason", datasetName);
+ return false;
}
// Up to this point, since the bulk part of a rebalance operation is done,
// the following two operations will retry after interrupt and finally rethrow InterruptedException,
@@ -163,6 +175,7 @@
runMetadataTransaction(metadataProvider, () -> dropSourceDataset(sourceDataset, metadataProvider, hcc));
});
LOGGER.info("Dataset {} rebalance completed successfully", datasetName);
+ return true;
}
@FunctionalInterface
@@ -212,13 +225,16 @@
}
// Rebalances from the source to the target.
- private static void rebalance(Dataset source, Dataset target, MetadataProvider metadataProvider,
+ private static boolean rebalance(Dataset source, Dataset target, MetadataProvider metadataProvider,
IHyracksClientConnection hcc, IDatasetRebalanceCallback datasetRebalanceCallback) throws Exception {
// Drops the target dataset files (if any) to make rebalance idempotent.
dropDatasetFiles(target, metadataProvider, hcc);
// Performs the specified operation before the target dataset is populated.
- datasetRebalanceCallback.beforeRebalance(metadataProvider, source, target, hcc);
+ if (!datasetRebalanceCallback.canRebalance(metadataProvider, source, target, hcc)) {
+ // the callback indicates that this rebalance should be skipped; short circuit the remaining steps
+ return false;
+ }
// Creates the rebalance target.
createRebalanceTarget(target, metadataProvider, hcc);
@@ -231,6 +247,8 @@
// Performs the specified operation after the target dataset is populated.
datasetRebalanceCallback.afterRebalance(metadataProvider, source, target, hcc);
+
+ return true;
}
// Switches the metadata entity from the source dataset to the target dataset.
diff --git a/asterixdb/asterix-app/src/main/resources/cc-cloud-storage.conf b/asterixdb/asterix-app/src/main/resources/cc-cloud-storage.conf
new file mode 100644
index 0000000..be9bd54
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/resources/cc-cloud-storage.conf
@@ -0,0 +1,68 @@
+; Licensed to the Apache Software Foundation (ASF) under one
+; or more contributor license agreements. See the NOTICE file
+; distributed with this work for additional information
+; regarding copyright ownership. The ASF licenses this file
+; to you under the Apache License, Version 2.0 (the
+; "License"); you may not use this file except in compliance
+; with the License. You may obtain a copy of the License at
+;
+; http://www.apache.org/licenses/LICENSE-2.0
+;
+; Unless required by applicable law or agreed to in writing,
+; software distributed under the License is distributed on an
+; "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+; KIND, either express or implied. See the License for the
+; specific language governing permissions and limitations
+; under the License.
+
+[nc/asterix_nc1]
+txn.log.dir=target/tmp/asterix_nc1/txnlog
+core.dump.dir=target/tmp/asterix_nc1/coredump
+iodevices=asterix_nc1/iodevice1
+iodevices=asterix_nc1/iodevice2
+nc.api.port=19004
+#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5006
+
+[nc/asterix_nc2]
+ncservice.port=9091
+txn.log.dir=target/tmp/asterix_nc2/txnlog
+core.dump.dir=target/tmp/asterix_nc2/coredump
+iodevices=asterix_nc2/iodevice1
+iodevices=asterix_nc2/iodevice2
+nc.api.port=19005
+#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5007
+
+[nc]
+address=127.0.0.1
+command=asterixnc
+app.class=org.apache.asterix.hyracks.bootstrap.NCApplication
+jvm.args=-Xmx4096m -Dnode.Resolver="org.apache.asterix.external.util.IdentitiyResolverFactory"
+storage.buffercache.size=128MB
+storage.memorycomponent.globalbudget=512MB
+storage.io.scheduler=greedy
+storage.filtered.memorycomponent.max.size=16MB
+
+[cc]
+address = 127.0.0.1
+app.class=org.apache.asterix.hyracks.bootstrap.CCApplication
+heartbeat.period=2000
+heartbeat.max.misses=25
+
+[common]
+log.dir = logs/
+log.level = INFO
+compiler.cbo=false
+compiler.cbotest=true
+compiler.queryplanshape=zigzag
+compiler.framesize=32KB
+compiler.sortmemory=320KB
+compiler.groupmemory=160KB
+compiler.joinmemory=256KB
+compiler.textsearchmemory=160KB
+compiler.windowmemory=192KB
+compiler.sort.parallel=false
+compiler.internal.sanitycheck=true
+messaging.frame.size=4096
+messaging.frame.count=512
+cloud.deployment=true
+storage.buffercache.pagesize=32KB
diff --git a/asterixdb/asterix-app/src/main/resources/cc_static_partitioning.conf b/asterixdb/asterix-app/src/main/resources/cc_static_partitioning.conf
index 08d9dfd..9884116 100644
--- a/asterixdb/asterix-app/src/main/resources/cc_static_partitioning.conf
+++ b/asterixdb/asterix-app/src/main/resources/cc_static_partitioning.conf
@@ -51,6 +51,9 @@
[common]
log.dir = logs/
log.level = INFO
+compiler.cbo=false
+compiler.cbotest=true
+compiler.queryplanshape=zigzag
compiler.framesize=32KB
compiler.sortmemory=320KB
compiler.groupmemory=160KB
@@ -62,4 +65,5 @@
messaging.frame.size=4096
messaging.frame.count=512
storage.buffercache.pagesize=32KB
-storage.partitioning=static
\ No newline at end of file
+storage.partitioning=static
+storage.compression.block=snappy
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java
index 091fc20..05b8bbf 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java
@@ -214,8 +214,8 @@
result = OBJECT_READER.readValue(resultStr);
} catch (Exception e) {
// whoops, not JSON (e.g. 404) - just include the body
- GlobalConfig.ASTERIX_LOGGER.log(Level.ERROR, resultStr);
- throw new Exception(resultStr);
+ GlobalConfig.ASTERIX_LOGGER.log(Level.ERROR, "result: {} json parse exception: {}", resultStr, e);
+ throw new Exception(resultStr, e);
}
final boolean isJsonFormat = isJsonFormat(fmt);
diff --git a/asterixdb/asterix-app/src/test/resources/cc-analyze.conf b/asterixdb/asterix-app/src/test/resources/cc-analyze.conf
index 9d45ddb..16fa0ae 100644
--- a/asterixdb/asterix-app/src/test/resources/cc-analyze.conf
+++ b/asterixdb/asterix-app/src/test/resources/cc-analyze.conf
@@ -53,4 +53,4 @@
log.dir = logs/
log.level = INFO
compiler.groupmemory=64MB
-storage.buffercache.pagesize=32KB
\ No newline at end of file
+storage.buffercache.pagesize=32KB
diff --git a/asterixdb/asterix-app/src/test/resources/cc-cloud-storage.conf b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage.conf
index 7d0b06b..6373eae 100644
--- a/asterixdb/asterix-app/src/test/resources/cc-cloud-storage.conf
+++ b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage.conf
@@ -62,10 +62,3 @@
messaging.frame.count=512
cloud.deployment=true
storage.buffercache.pagesize=32KB
-storage.partitioning=static
-cloud.storage.scheme=s3
-cloud.storage.bucket=cloud-storage-container
-cloud.storage.region=us-west-2
-cloud.storage.endpoint=http://127.0.0.1:8001
-cloud.storage.anonymous.auth=true
-cloud.storage.cache.policy=lazy
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index ac66f33..070fe65 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -162,12 +162,14 @@
void flushDataset(IReplicationStrategy replicationStrategy, IntPredicate partitions) throws HyracksDataException;
/**
- * Waits for all ongoing IO operations on all open datasets that are matching {@code replicationStrategy}.
+ * Waits for all ongoing IO operations on all open datasets that are matching {@code replicationStrategy} and
+ * {@code partition}.
*
* @param replicationStrategy
+ * @param partition
* @throws HyracksDataException
*/
- void waitForIO(IReplicationStrategy replicationStrategy) throws HyracksDataException;
+ void waitForIO(IReplicationStrategy replicationStrategy, int partition) throws HyracksDataException;
/**
* @return the current datasets io stats
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
index 5964bb4..7d3dba4 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
@@ -33,17 +33,19 @@
private static final Logger LOGGER = LogManager.getLogger();
protected final int datasetID;
protected final DatasetInfo dsInfo;
+ protected final int partition;
- public BaseOperationTracker(int datasetID, DatasetInfo dsInfo) {
+ public BaseOperationTracker(int datasetID, DatasetInfo dsInfo, int partition) {
this.datasetID = datasetID;
this.dsInfo = dsInfo;
+ this.partition = partition;
}
@Override
public void beforeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback) throws HyracksDataException {
if (opType == LSMOperationType.REPLICATE) {
- dsInfo.declareActiveIOOperation(REPLICATE);
+ dsInfo.declareActiveIOOperation(REPLICATE, partition);
}
}
@@ -59,7 +61,7 @@
public void completeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback) throws HyracksDataException {
if (opType == LSMOperationType.REPLICATE) {
- dsInfo.undeclareActiveIOOperation(REPLICATE);
+ dsInfo.undeclareActiveIOOperation(REPLICATE, partition);
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
index d15d9be..87a3c2f 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
@@ -33,12 +33,16 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
+
public class DatasetInfo extends Info implements Comparable<DatasetInfo> {
private static final Logger LOGGER = LogManager.getLogger();
// partition -> index
private final Map<Integer, Set<IndexInfo>> partitionIndexes;
// resourceID -> index
private final Map<Long, IndexInfo> indexes;
+ private final Int2IntMap partitionPendingIO;
private final int datasetID;
private final ILogManager logManager;
private final LogRecord waitLog = new LogRecord();
@@ -54,6 +58,7 @@
public DatasetInfo(int datasetID, ILogManager logManager) {
this.partitionIndexes = new HashMap<>();
this.indexes = new HashMap<>();
+ this.partitionPendingIO = new Int2IntOpenHashMap();
this.setLastAccess(-1);
this.datasetID = datasetID;
this.setRegistered(false);
@@ -74,7 +79,8 @@
setLastAccess(System.currentTimeMillis());
}
- public synchronized void declareActiveIOOperation(ILSMIOOperation.LSMIOOperationType opType) {
+ public synchronized void declareActiveIOOperation(ILSMIOOperation.LSMIOOperationType opType, int partition) {
+ partitionPendingIO.put(partition, partitionPendingIO.getOrDefault(partition, 0) + 1);
numActiveIOOps++;
switch (opType) {
case FLUSH:
@@ -91,7 +97,8 @@
}
}
- public synchronized void undeclareActiveIOOperation(ILSMIOOperation.LSMIOOperationType opType) {
+ public synchronized void undeclareActiveIOOperation(ILSMIOOperation.LSMIOOperationType opType, int partition) {
+ partitionPendingIO.put(partition, partitionPendingIO.getOrDefault(partition, 0) - 1);
numActiveIOOps--;
switch (opType) {
case FLUSH:
@@ -253,6 +260,26 @@
}
}
+ public void waitForIO(int partition) throws HyracksDataException {
+ logManager.log(waitLog);
+ synchronized (this) {
+ while (partitionPendingIO.getOrDefault(partition, 0) > 0) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw HyracksDataException.create(e);
+ }
+ }
+ if (partitionPendingIO.getOrDefault(partition, 0) < 0) {
+ LOGGER.error("number of IO operations cannot be negative for dataset {}, partition {}", this,
+ partition);
+ throw new IllegalStateException(
+ "Number of IO operations cannot be negative: " + this + ", partition " + partition);
+ }
+ }
+ }
+
public synchronized int getPendingFlushes() {
return pendingFlushes;
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 7a109cc..34f5114 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -559,10 +559,10 @@
}
@Override
- public void waitForIO(IReplicationStrategy replicationStrategy) throws HyracksDataException {
+ public void waitForIO(IReplicationStrategy replicationStrategy, int partition) throws HyracksDataException {
for (DatasetResource dsr : datasets.values()) {
if (dsr.isOpen() && replicationStrategy.isMatch(dsr.getDatasetID())) {
- dsr.getDatasetInfo().waitForIO();
+ dsr.getDatasetInfo().waitForIO(partition);
}
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index 3ec2d22..2e32fb3 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -58,7 +58,6 @@
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
import org.apache.hyracks.storage.common.IModificationOperationCallback;
import org.apache.hyracks.storage.common.ISearchOperationCallback;
-import org.apache.hyracks.util.ExitUtil;
import org.apache.hyracks.util.annotations.NotThreadSafe;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -66,7 +65,6 @@
@NotThreadSafe
public class PrimaryIndexOperationTracker extends BaseOperationTracker implements IoOperationCompleteListener {
private static final Logger LOGGER = LogManager.getLogger();
- private final int partition;
// Number of active operations on an ILSMIndex instance.
private final AtomicInteger numActiveOperations;
private final ILogManager logManager;
@@ -80,8 +78,7 @@
public PrimaryIndexOperationTracker(int datasetID, int partition, ILogManager logManager, DatasetInfo dsInfo,
ILSMComponentIdGenerator idGenerator, IIndexCheckpointManagerProvider indexCheckpointManagerProvider) {
- super(datasetID, dsInfo);
- this.partition = partition;
+ super(datasetID, dsInfo, partition);
this.logManager = logManager;
this.numActiveOperations = new AtomicInteger();
this.idGenerator = idGenerator;
@@ -161,10 +158,10 @@
}
}
if (primaryLsmIndex == null) {
- LOGGER.fatal(
- "Primary index not found in dataset {} and partition {} open indexes {}; halting to clear memory state",
+ LOGGER.warn(
+ "Primary index not found in dataset {} and partition {} open indexes {}; possible secondary index leaked files",
dsInfo.getDatasetID(), partition, indexes);
- ExitUtil.halt(ExitUtil.EC_INCONSISTENT_STORAGE_REFERENCES);
+ return;
}
for (ILSMIndex lsmIndex : indexes) {
ILSMOperationTracker opTracker = lsmIndex.getOperationTracker();
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
index 9ab4848..150d5ee 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
@@ -72,6 +72,7 @@
private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
protected final DatasetInfo dsInfo;
protected final ILSMIndex lsmIndex;
+ private final int partition;
private long firstLsnForCurrentMemoryComponent = 0L;
private long persistenceLsn = 0L;
private int pendingFlushes = 0;
@@ -82,6 +83,7 @@
this.dsInfo = dsInfo;
this.lsmIndex = lsmIndex;
this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
+ this.partition = ResourceReference.ofIndex(lsmIndex.getIndexIdentifier()).getPartitionNum();
componentIds.add(componentId);
}
@@ -257,7 +259,7 @@
@Override
public synchronized void scheduled(ILSMIOOperation operation) throws HyracksDataException {
- dsInfo.declareActiveIOOperation(operation.getIOOpertionType());
+ dsInfo.declareActiveIOOperation(operation.getIOOpertionType(), partition);
if (operation.getIOOpertionType() == LSMIOOperationType.FLUSH) {
pendingFlushes++;
FlushOperation flush = (FlushOperation) operation;
@@ -280,7 +282,7 @@
pendingFlushes == 0 ? firstLsnForCurrentMemoryComponent : (Long) map.get(KEY_FLUSH_LOG_LSN);
}
}
- dsInfo.undeclareActiveIOOperation(operation.getIOOpertionType());
+ dsInfo.undeclareActiveIOOperation(operation.getIOOpertionType(), partition);
}
public synchronized boolean hasPendingFlush() {
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
index db0911b..33d513f 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
@@ -77,6 +77,7 @@
String indexId = "mockIndexId";
Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(numMemoryComponents);
Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(Mockito.mock(AbstractLSMMemoryComponent.class));
+ Mockito.when(mockIndex.getIndexIdentifier()).thenReturn(getIndexPath());
DatasetInfo dsInfo = new DatasetInfo(101, null);
LSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(numMemoryComponents, MIN_VALID_COMPONENT_ID);
LSMIOOperationCallback callback = new LSMIOOperationCallback(dsInfo, mockIndex, idGenerator.getId(),
@@ -140,6 +141,7 @@
ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(numMemoryComponents, MIN_VALID_COMPONENT_ID);
ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(numMemoryComponents);
+ Mockito.when(mockIndex.getIndexIdentifier()).thenReturn(getIndexPath());
ILSMMemoryComponent mockComponent = Mockito.mock(AbstractLSMMemoryComponent.class);
Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent);
LSMIOOperationCallback callback = new LSMIOOperationCallback(dsInfo, mockIndex, idGenerator.getId(),
@@ -161,6 +163,7 @@
ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(numMemoryComponents, MIN_VALID_COMPONENT_ID);
ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(numMemoryComponents);
+ Mockito.when(mockIndex.getIndexIdentifier()).thenReturn(getIndexPath());
ILSMMemoryComponent mockComponent = Mockito.mock(AbstractLSMMemoryComponent.class);
Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent);
LSMIOOperationCallback callback = new LSMIOOperationCallback(dsInfo, mockIndex, idGenerator.getId(),
@@ -221,4 +224,8 @@
Mockito.doReturn(indexCheckpointManager).when(indexCheckpointManagerProvider).get(Mockito.any());
return indexCheckpointManagerProvider;
}
+
+ private static String getIndexPath() {
+ return "storage/partition_0/dataverse/dataset/0/index";
+ }
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/RMIClientFactory.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/RMIClientFactory.java
index 515e763..ce459e2 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/RMIClientFactory.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/RMIClientFactory.java
@@ -29,6 +29,7 @@
import javax.net.ssl.SSLSocketFactory;
import org.apache.hyracks.api.network.INetworkSecurityConfig;
+import org.apache.hyracks.api.network.INetworkSecurityManager;
import org.apache.hyracks.ipc.security.NetworkSecurityManager;
public class RMIClientFactory implements RMIClientSocketFactory, Serializable {
@@ -37,11 +38,21 @@
private final INetworkSecurityConfig config;
private transient SocketFactory socketFactory;
- public RMIClientFactory(INetworkSecurityConfig config) {
+ private RMIClientFactory(INetworkSecurityConfig config) {
this.config = config;
}
+ public static RMIClientSocketFactory getSocketFactory(INetworkSecurityManager securityManager) {
+ // clients need to have the client factory on their classpath- to enable older clients, only use
+ // our client socket factory when SSL is enabled
+ INetworkSecurityConfig config = securityManager.getConfiguration();
+ if (config.isSslEnabled()) {
+ return new RMIClientFactory(config);
+ }
+ return null;
+ }
+
public Socket createSocket(String host, int port) throws IOException {
synchronized (this) {
if (socketFactory == null) {
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/RMIServerFactory.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/RMIServerFactory.java
index 9506c5a..0128a87 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/RMIServerFactory.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/RMIServerFactory.java
@@ -19,8 +19,10 @@
package org.apache.asterix.metadata;
import java.io.IOException;
+import java.net.InetAddress;
import java.net.ServerSocket;
import java.rmi.server.RMIServerSocketFactory;
+import java.util.Optional;
import javax.net.ServerSocketFactory;
@@ -28,17 +30,34 @@
public class RMIServerFactory implements RMIServerSocketFactory {
+ // default backlog used by the JDK (e.g. sun.security.ssl.SSLServerSocketFactoryImpl)
+ private static final int DEFAULT_BACKLOG = 50;
private final INetworkSecurityManager securityManager;
- public RMIServerFactory(INetworkSecurityManager securityManager) {
+ private RMIServerFactory(INetworkSecurityManager securityManager) {
this.securityManager = securityManager;
}
+ public static RMIServerSocketFactory getSocketFactory(INetworkSecurityManager securityManager) {
+ if (securityManager.getConfiguration().isSslEnabled()) {
+ return new RMIServerFactory(securityManager);
+ }
+ return null;
+ }
+
@Override
public ServerSocket createServerSocket(int port) throws IOException {
+ ServerSocketFactory socketFactory;
if (securityManager.getConfiguration().isSslEnabled()) {
- return securityManager.newSSLContext().getServerSocketFactory().createServerSocket(port);
+ socketFactory = securityManager.newSSLContext().getServerSocketFactory();
+ } else {
+ socketFactory = ServerSocketFactory.getDefault();
}
- return ServerSocketFactory.getDefault().createServerSocket(port);
+ Optional<InetAddress> rmiBindAddress = securityManager.getConfiguration().getRMIBindAddress();
+ if (rmiBindAddress.isPresent()) {
+ return socketFactory.createServerSocket(port, DEFAULT_BACKLOG, rmiBindAddress.get());
+ } else {
+ return socketFactory.createServerSocket(port);
+ }
}
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java
index 2104fdf..cedcccf 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java
@@ -43,18 +43,9 @@
public static IAsterixStateProxy registerRemoteObject(INetworkSecurityManager networkSecurityManager,
int metadataCallbackPort) throws RemoteException {
- IAsterixStateProxy stub;
- // clients need to have the client factory on their classpath- to enable older clients, only use
- // our client socket factory when SSL is enabled
- if (networkSecurityManager.getConfiguration().isSslEnabled()) {
- final RMIServerFactory serverSocketFactory = new RMIServerFactory(networkSecurityManager);
- final RMIClientFactory clientSocketFactory =
- new RMIClientFactory(networkSecurityManager.getConfiguration());
- stub = (IAsterixStateProxy) UnicastRemoteObject.exportObject(cc, metadataCallbackPort, clientSocketFactory,
- serverSocketFactory);
- } else {
- stub = (IAsterixStateProxy) UnicastRemoteObject.exportObject(cc, metadataCallbackPort);
- }
+ IAsterixStateProxy stub = (IAsterixStateProxy) UnicastRemoteObject.exportObject(cc, metadataCallbackPort,
+ RMIClientFactory.getSocketFactory(networkSecurityManager),
+ RMIServerFactory.getSocketFactory(networkSecurityManager));
LOGGER.info("Asterix Distributed State Proxy Bound");
return stub;
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/NodeGroup.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/NodeGroup.java
index a8d027f..a50fd4a 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/NodeGroup.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/NodeGroup.java
@@ -20,6 +20,7 @@
package org.apache.asterix.metadata.entities;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -42,7 +43,7 @@
this.nodeNames = nodeNames;
}
- public static NodeGroup createOrdered(String groupName, List<String> nodeNames) {
+ public static NodeGroup createOrdered(String groupName, Collection<String> nodeNames) {
List<String> sortedNodeNames = new ArrayList<>(nodeNames);
Collections.sort(sortedNodeNames);
return new NodeGroup(groupName, sortedNodeNames);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 0cbdc24..88eafcd 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -672,7 +672,7 @@
nodeGroup = nodeGroup + "_" + UUID.randomUUID();
appCtx.getMetadataLockManager().acquireNodeGroupWriteLock(metadataProvider.getLocks(), nodeGroup);
}
- MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, NodeGroup.createOrdered(nodeGroup, new ArrayList<>(ncNames)));
+ MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, NodeGroup.createOrdered(nodeGroup, ncNames));
return nodeGroup;
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
index d803756..f7a739b 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
@@ -20,7 +20,10 @@
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.ArrayDeque;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
@@ -29,6 +32,7 @@
import org.apache.asterix.common.exceptions.ReplicationException;
import org.apache.asterix.common.replication.IPartitionReplica;
import org.apache.asterix.common.replication.IReplicationDestination;
+import org.apache.asterix.common.storage.ReplicaIdentifier;
import org.apache.asterix.replication.management.NetworkingUtil;
import org.apache.asterix.replication.messaging.ReplicationProtocol;
import org.apache.hyracks.api.network.ISocketChannel;
@@ -41,6 +45,7 @@
private static final Logger LOGGER = LogManager.getLogger();
private final Set<IPartitionReplica> replicas = new HashSet<>();
private final InetSocketAddress inputLocation;
+ private final Map<ReplicaIdentifier, ArrayDeque<PartitionReplica>> replicasConnPool = new HashMap<>();
private InetSocketAddress resolvedLocation;
private ISocketChannel logRepChannel;
@@ -64,6 +69,11 @@
@Override
public synchronized void remove(IPartitionReplica replica) {
replicas.remove(replica);
+ ArrayDeque<PartitionReplica> partitionConnections = replicasConnPool.remove(replica.getIdentifier());
+ if (partitionConnections != null) {
+ partitionConnections.forEach(PartitionReplica::close);
+ partitionConnections.clear();
+ }
}
@Override
@@ -138,4 +148,26 @@
public int hashCode() {
return Objects.hash(inputLocation);
}
+
+ public synchronized PartitionReplica getPartitionReplicaConnection(ReplicaIdentifier identifier,
+ INcApplicationContext appCtx) {
+ ArrayDeque<PartitionReplica> partitionReplicas =
+ replicasConnPool.computeIfAbsent(identifier, k -> new ArrayDeque<>());
+ if (!partitionReplicas.isEmpty()) {
+ return partitionReplicas.remove();
+ }
+ return new PartitionReplica(identifier, appCtx);
+ }
+
+ public synchronized void recycleConnection(PartitionReplica partitionReplica) {
+ ArrayDeque<PartitionReplica> partitionReplicas = replicasConnPool.get(partitionReplica.getIdentifier());
+ if (partitionReplicas != null) {
+ partitionReplicas.add(partitionReplica);
+ }
+ }
+
+ public synchronized void closeConnections() {
+ replicasConnPool
+ .forEach(((identifier, partitionReplicas) -> partitionReplicas.forEach(PartitionReplica::close)));
+ }
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
index 063709a..8c514bd 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
@@ -21,22 +21,21 @@
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.exceptions.ReplicationException;
-import org.apache.asterix.common.replication.IPartitionReplica;
import org.apache.asterix.common.replication.IReplicationDestination;
import org.apache.asterix.common.replication.IReplicationManager;
import org.apache.asterix.common.replication.IReplicationStrategy;
import org.apache.asterix.common.storage.DatasetResourceReference;
-import org.apache.asterix.common.storage.ResourceReference;
-import org.apache.asterix.replication.api.PartitionReplica;
import org.apache.asterix.replication.api.ReplicationDestination;
-import org.apache.asterix.replication.sync.IndexSynchronizer;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.replication.IReplicationJob;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexReplicationJob;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -45,13 +44,15 @@
private static final Logger LOGGER = LogManager.getLogger();
private final IReplicationManager replicationManager;
- private final Set<ReplicationDestination> destinations = new HashSet<>();
+ private final Set<ReplicationDestination> destinations = ConcurrentHashMap.newKeySet();
private final LinkedBlockingQueue<IReplicationJob> replicationJobsQ = new LinkedBlockingQueue<>();
private final IReplicationStrategy replicationStrategy;
private final PersistentLocalResourceRepository resourceRepository;
private final INcApplicationContext appCtx;
+ private final ILSMIOOperationScheduler ioScheduler;
private final Object transferLock = new Object();
private final Set<ReplicationDestination> failedDest = new HashSet<>();
+ private final AtomicInteger pendingRepOpsCount = new AtomicInteger();
public IndexReplicationManager(INcApplicationContext appCtx, IReplicationManager replicationManager) {
this.appCtx = appCtx;
@@ -59,6 +60,8 @@
this.resourceRepository = (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
replicationStrategy = replicationManager.getReplicationStrategy();
appCtx.getThreadExecutor().execute(new ReplicationJobsProcessor());
+ ioScheduler = appCtx.getStorageComponentProvider().getIoOperationSchedulerProvider()
+ .getIoScheduler(appCtx.getServiceContext());
}
public void register(ReplicationDestination dest) {
@@ -72,12 +75,18 @@
public void unregister(IReplicationDestination dest) {
synchronized (transferLock) {
LOGGER.info(() -> "unregister " + dest);
+ for (ReplicationDestination existingDest : destinations) {
+ if (existingDest.equals(dest)) {
+ existingDest.closeConnections();
+ break;
+ }
+ }
destinations.remove(dest);
failedDest.remove(dest);
}
}
- private void handleFailure(ReplicationDestination dest, Exception e) {
+ public void handleFailure(ReplicationDestination dest, Exception e) {
synchronized (transferLock) {
if (failedDest.contains(dest)) {
return;
@@ -87,6 +96,7 @@
LOGGER.error("replica at {} failed", dest);
failedDest.add(dest);
}
+ dest.closeConnections();
replicationManager.notifyFailure(dest, e);
}
}
@@ -99,71 +109,62 @@
process(job);
}
- private void process(IReplicationJob job) {
- try {
- if (skip(job)) {
- return;
- }
- synchronized (transferLock) {
- if (destinations.isEmpty()) {
- return;
- }
- final IndexSynchronizer synchronizer = new IndexSynchronizer(job, appCtx);
- final int indexPartition = getJobPartition(job);
- for (ReplicationDestination dest : destinations) {
- try {
- Optional<IPartitionReplica> partitionReplica = dest.getPartitionReplica(indexPartition);
- if (!partitionReplica.isPresent()) {
- continue;
- }
- PartitionReplica replica = (PartitionReplica) partitionReplica.get();
- synchronizer.sync(replica);
- } catch (Exception e) {
- handleFailure(dest, e);
- }
- }
- closeChannels();
- }
- } finally {
- afterReplication(job);
+ public Set<ReplicationDestination> getDestinations() {
+ synchronized (transferLock) {
+ return destinations;
}
}
- private boolean skip(IReplicationJob job) {
- try {
- final String fileToReplicate = job.getAnyFile();
- final Optional<DatasetResourceReference> indexFileRefOpt =
- resourceRepository.getLocalResourceReference(fileToReplicate);
- if (!indexFileRefOpt.isPresent()) {
- LOGGER.warn("skipping replication of {} due to missing dataset resource reference", fileToReplicate);
- return true;
+ private void process(IReplicationJob job) {
+ pendingRepOpsCount.incrementAndGet();
+ Optional<DatasetResourceReference> jobIndexRefOpt = getJobIndexRef(job);
+ if (jobIndexRefOpt.isEmpty()) {
+ LOGGER.warn("skipping replication of {} due to missing dataset resource reference", job.getAnyFile());
+ afterReplication(job);
+ return;
+ }
+ ReplicationOperation rp = new ReplicationOperation(appCtx, jobIndexRefOpt.get(), job, this);
+ if (job.getExecutionType() == IReplicationJob.ReplicationExecutionType.SYNC) {
+ rp.call();
+ } else {
+ try {
+ ioScheduler.scheduleOperation(rp);
+ } catch (HyracksDataException e) {
+ throw new ReplicationException(e);
}
- return !replicationStrategy.isMatch(indexFileRefOpt.get().getDatasetId());
+ }
+ }
+
+ public boolean skip(DatasetResourceReference indexRef) {
+ return !replicationStrategy.isMatch(indexRef.getDatasetId());
+ }
+
+ public Optional<DatasetResourceReference> getJobIndexRef(IReplicationJob job) {
+ final String fileToReplicate = job.getAnyFile();
+ try {
+ return resourceRepository.getLocalResourceReference(fileToReplicate);
} catch (HyracksDataException e) {
throw new IllegalStateException("Couldn't find resource for " + job.getAnyFile(), e);
}
}
- private int getJobPartition(IReplicationJob job) {
- return ResourceReference.of(job.getAnyFile()).getPartitionNum();
- }
-
private void closeChannels() {
- if (!replicationJobsQ.isEmpty()) {
- return;
- }
LOGGER.trace("no pending replication jobs; closing connections to replicas");
for (ReplicationDestination dest : destinations) {
- dest.getReplicas().stream().map(PartitionReplica.class::cast).forEach(PartitionReplica::close);
+ dest.closeConnections();
}
}
- private static void afterReplication(IReplicationJob job) {
+ public void afterReplication(IReplicationJob job) {
try {
+ int pendingOps = pendingRepOpsCount.decrementAndGet();
if (job.getOperation() == IReplicationJob.ReplicationOperation.REPLICATE
&& job instanceof ILSMIndexReplicationJob) {
((ILSMIndexReplicationJob) job).endReplication();
}
+ if (pendingOps == 0 && replicationJobsQ.isEmpty()) {
+ closeChannels();
+ }
} catch (HyracksDataException e) {
throw new ReplicationException(e);
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationOperation.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationOperation.java
new file mode 100644
index 0000000..258f24a
--- /dev/null
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationOperation.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.management;
+
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.replication.IPartitionReplica;
+import org.apache.asterix.common.storage.DatasetResourceReference;
+import org.apache.asterix.replication.api.PartitionReplica;
+import org.apache.asterix.replication.api.ReplicationDestination;
+import org.apache.asterix.replication.sync.IndexSynchronizer;
+import org.apache.hyracks.api.replication.IReplicationJob;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractIoOperation;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
+import org.apache.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallbackFactory;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class ReplicationOperation extends AbstractIoOperation {
+
+ private static final Logger LOGGER = LogManager.getLogger();
+
+ private static final ILSMIOOperationCallback INSTANCE =
+ NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(null);
+ private final INcApplicationContext appCtx;
+ private final DatasetResourceReference indexRef;
+ private final IReplicationJob job;
+ private final IndexReplicationManager indexReplicationManager;
+
+ public ReplicationOperation(INcApplicationContext appCtx, DatasetResourceReference indexRef, IReplicationJob job,
+ IndexReplicationManager indexReplicationManager) {
+ super(null, null, INSTANCE, indexRef.getRelativePath().toString());
+ this.appCtx = appCtx;
+ this.indexRef = indexRef;
+ this.job = job;
+ this.indexReplicationManager = indexReplicationManager;
+ }
+
+ @Override
+ public LSMIOOperationType getIOOpertionType() {
+ return LSMIOOperationType.REPLICATE;
+ }
+
+ @Override
+ public LSMIOOperationStatus call() {
+ try {
+ Set<ReplicationDestination> destinations = indexReplicationManager.getDestinations();
+ if (destinations.isEmpty() || indexReplicationManager.skip(indexRef)) {
+ return LSMIOOperationStatus.SUCCESS;
+ }
+ LOGGER.debug("started replicate operation on index {}", indexRef);
+ final IndexSynchronizer synchronizer = new IndexSynchronizer(job, appCtx);
+ final int indexPartition = indexRef.getPartitionId();
+ for (ReplicationDestination dest : destinations) {
+ Optional<IPartitionReplica> partitionReplica = dest.getPartitionReplica(indexPartition);
+ if (partitionReplica.isEmpty()) {
+ continue;
+ }
+ PartitionReplica destReplica = null;
+ try {
+ destReplica = dest.getPartitionReplicaConnection(partitionReplica.get().getIdentifier(), appCtx);
+ synchronizer.sync(destReplica);
+ dest.recycleConnection(destReplica);
+ } catch (Exception e) {
+ if (destReplica != null) {
+ destReplica.close();
+ }
+ indexReplicationManager.handleFailure(dest, e);
+ }
+ }
+ LOGGER.debug("completed replicate operation on index {}", indexRef);
+ return LSMIOOperationStatus.SUCCESS;
+ } finally {
+ indexReplicationManager.afterReplication(job);
+ }
+ }
+
+ @Override
+ protected LSMComponentFileReferences getComponentFiles() {
+ return null;
+ }
+
+ @Override
+ public long getRemainingPages() {
+ return 0;
+ }
+}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
index 459ff01..68ccd54 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
@@ -103,6 +103,6 @@
private void waitForReplicatedDatasetsIO() throws HyracksDataException {
// wait for IO operations to ensure replicated datasets files won't change during replica sync
final IReplicationStrategy replStrategy = appCtx.getReplicationManager().getReplicationStrategy();
- appCtx.getDatasetLifecycleManager().waitForIO(replStrategy);
+ appCtx.getDatasetLifecycleManager().waitForIO(replStrategy, replica.getIdentifier().getPartition());
}
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java
index a104ae3..827b713 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java
@@ -21,6 +21,7 @@
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.context.BaseOperationTracker;
+import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IJsonSerializable;
@@ -46,7 +47,8 @@
public ILSMOperationTracker getOperationTracker(INCServiceContext ctx, IResource resource) {
IDatasetLifecycleManager dslcManager =
((INcApplicationContext) ctx.getApplicationContext()).getDatasetLifecycleManager();
- return new BaseOperationTracker(datasetId, dslcManager.getDatasetInfo(datasetId));
+ int partition = StoragePathUtil.getPartitionNumFromRelativePath(resource.getPath());
+ return new BaseOperationTracker(datasetId, dslcManager.getDatasetInfo(datasetId), partition);
}
@Override
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
index 8a1cc65..1c614c9 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.transaction.management.service.logging;
+import java.util.concurrent.TimeUnit;
+
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.replication.IReplicationManager;
import org.apache.asterix.common.replication.IReplicationStrategy;
@@ -26,6 +28,7 @@
import org.apache.asterix.common.transactions.LogSource;
import org.apache.asterix.common.transactions.LogType;
import org.apache.hyracks.api.util.InvokeUtil;
+import org.apache.logging.log4j.Logger;
import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
import it.unimi.dsi.fastutil.longs.LongSet;
@@ -33,12 +36,16 @@
public class LogManagerWithReplication extends LogManager {
+ private static final Logger LOGGER = org.apache.logging.log4j.LogManager.getLogger();
private IReplicationManager replicationManager;
private IReplicationStrategy replicationStrategy;
private final LongSet replicatedTxn = LongSets.synchronize(new LongOpenHashSet());
+ private final long replicationTimeoutMillis;
public LogManagerWithReplication(ITransactionSubsystem txnSubsystem) {
super(txnSubsystem);
+ replicationTimeoutMillis = TimeUnit.SECONDS
+ .toMillis(txnSubsystem.getApplicationContext().getReplicationProperties().getReplicationTimeOut());
}
@SuppressWarnings("squid:S2445")
@@ -94,8 +101,18 @@
//wait for job Commit/Abort ACK from replicas
if (logRecord.isReplicate() && (logRecord.getLogType() == LogType.JOB_COMMIT
|| logRecord.getLogType() == LogType.ABORT)) {
+ long replicationTimeOut = replicationTimeoutMillis;
while (!logRecord.isReplicated()) {
- logRecord.wait();
+ if (replicationTimeOut <= 0) {
+ LOGGER.warn(
+ "{} ms passed without receiving acks for log {}; setting log as replicated due to timeout",
+ replicationTimeoutMillis, logRecord.getLogRecordForDisplay());
+ logRecord.setReplicated(true);
+ continue;
+ }
+ final long startTime = System.nanoTime();
+ logRecord.wait(replicationTimeOut);
+ replicationTimeOut -= TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
}
}
}
diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml
index 681e9a8..30a987e 100644
--- a/asterixdb/pom.xml
+++ b/asterixdb/pom.xml
@@ -2021,12 +2021,12 @@
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-util</artifactId>
- <version>9.4.48.v20220622</version>
+ <version>9.4.51.v20230217</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-util-ajax</artifactId>
- <version>9.4.48.v20220622</version>
+ <version>9.4.51.v20230217</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
diff --git a/asterixdb/src/main/appended-resources/supplemental-models.xml b/asterixdb/src/main/appended-resources/supplemental-models.xml
index 5d61da3..81ed7f0 100644
--- a/asterixdb/src/main/appended-resources/supplemental-models.xml
+++ b/asterixdb/src/main/appended-resources/supplemental-models.xml
@@ -163,9 +163,9 @@
<artifactId>netty-transport</artifactId>
<properties>
<!-- netty is ALv2, and does not contain any embedded LICENSE or NOTICE file -->
- <license.ignoreMissingEmbeddedLicense>4.1.87.Final</license.ignoreMissingEmbeddedLicense>
- <license.ignoreMissingEmbeddedNotice>4.1.87.Final</license.ignoreMissingEmbeddedNotice>
- <license.ignoreNoticeOverride>4.1.87.Final</license.ignoreNoticeOverride>
+ <license.ignoreMissingEmbeddedLicense>4.1.94.Final</license.ignoreMissingEmbeddedLicense>
+ <license.ignoreMissingEmbeddedNotice>4.1.94.Final</license.ignoreMissingEmbeddedNotice>
+ <license.ignoreNoticeOverride>4.1.94.Final</license.ignoreNoticeOverride>
</properties>
</project>
</supplement>
@@ -175,9 +175,9 @@
<artifactId>netty-transport-classes-epoll</artifactId>
<properties>
<!-- netty is ALv2, and does not contain any embedded LICENSE or NOTICE file -->
- <license.ignoreMissingEmbeddedLicense>4.1.87.Final</license.ignoreMissingEmbeddedLicense>
- <license.ignoreMissingEmbeddedNotice>4.1.87.Final</license.ignoreMissingEmbeddedNotice>
- <license.ignoreNoticeOverride>4.1.87.Final</license.ignoreNoticeOverride>
+ <license.ignoreMissingEmbeddedLicense>4.1.94.Final</license.ignoreMissingEmbeddedLicense>
+ <license.ignoreMissingEmbeddedNotice>4.1.94.Final</license.ignoreMissingEmbeddedNotice>
+ <license.ignoreNoticeOverride>4.1.94.Final</license.ignoreNoticeOverride>
</properties>
</project>
</supplement>
@@ -187,9 +187,9 @@
<artifactId>netty-transport-native-unix-common</artifactId>
<properties>
<!-- netty is ALv2, and does not contain any embedded LICENSE or NOTICE file -->
- <license.ignoreMissingEmbeddedLicense>4.1.87.Final</license.ignoreMissingEmbeddedLicense>
- <license.ignoreMissingEmbeddedNotice>4.1.87.Final</license.ignoreMissingEmbeddedNotice>
- <license.ignoreNoticeOverride>4.1.87.Final</license.ignoreNoticeOverride>
+ <license.ignoreMissingEmbeddedLicense>4.1.94.Final</license.ignoreMissingEmbeddedLicense>
+ <license.ignoreMissingEmbeddedNotice>4.1.94.Final</license.ignoreMissingEmbeddedNotice>
+ <license.ignoreNoticeOverride>4.1.94.Final</license.ignoreNoticeOverride>
</properties>
</project>
</supplement>
@@ -199,9 +199,9 @@
<artifactId>netty-codec</artifactId>
<properties>
<!-- netty is ALv2, and does not contain any embedded LICENSE or NOTICE file -->
- <license.ignoreMissingEmbeddedLicense>4.1.87.Final</license.ignoreMissingEmbeddedLicense>
- <license.ignoreMissingEmbeddedNotice>4.1.87.Final</license.ignoreMissingEmbeddedNotice>
- <license.ignoreNoticeOverride>4.1.87.Final</license.ignoreNoticeOverride>
+ <license.ignoreMissingEmbeddedLicense>4.1.94.Final</license.ignoreMissingEmbeddedLicense>
+ <license.ignoreMissingEmbeddedNotice>4.1.94.Final</license.ignoreMissingEmbeddedNotice>
+ <license.ignoreNoticeOverride>4.1.94.Final</license.ignoreNoticeOverride>
</properties>
</project>
</supplement>
@@ -211,9 +211,9 @@
<artifactId>netty-codec-dns</artifactId>
<properties>
<!-- netty is ALv2, and does not contain any embedded LICENSE or NOTICE file -->
- <license.ignoreMissingEmbeddedLicense>4.1.87.Final</license.ignoreMissingEmbeddedLicense>
- <license.ignoreMissingEmbeddedNotice>4.1.87.Final</license.ignoreMissingEmbeddedNotice>
- <license.ignoreNoticeOverride>4.1.87.Final</license.ignoreNoticeOverride>
+ <license.ignoreMissingEmbeddedLicense>4.1.94.Final</license.ignoreMissingEmbeddedLicense>
+ <license.ignoreMissingEmbeddedNotice>4.1.94.Final</license.ignoreMissingEmbeddedNotice>
+ <license.ignoreNoticeOverride>4.1.94.Final</license.ignoreNoticeOverride>
</properties>
</project>
</supplement>
@@ -223,9 +223,9 @@
<artifactId>netty-codec-http2</artifactId>
<properties>
<!-- netty is ALv2, and does not contain any embedded LICENSE or NOTICE file -->
- <license.ignoreMissingEmbeddedLicense>4.1.87.Final</license.ignoreMissingEmbeddedLicense>
- <license.ignoreMissingEmbeddedNotice>4.1.87.Final</license.ignoreMissingEmbeddedNotice>
- <license.ignoreNoticeOverride>4.1.87.Final</license.ignoreNoticeOverride>
+ <license.ignoreMissingEmbeddedLicense>4.1.94.Final</license.ignoreMissingEmbeddedLicense>
+ <license.ignoreMissingEmbeddedNotice>4.1.94.Final</license.ignoreMissingEmbeddedNotice>
+ <license.ignoreNoticeOverride>4.1.94.Final</license.ignoreNoticeOverride>
</properties>
</project>
</supplement>
@@ -235,9 +235,9 @@
<artifactId>netty-codec-socks</artifactId>
<properties>
<!-- netty is ALv2, and does not contain any embedded LICENSE or NOTICE file -->
- <license.ignoreMissingEmbeddedLicense>4.1.87.Final</license.ignoreMissingEmbeddedLicense>
- <license.ignoreMissingEmbeddedNotice>4.1.87.Final</license.ignoreMissingEmbeddedNotice>
- <license.ignoreNoticeOverride>4.1.87.Final</license.ignoreNoticeOverride>
+ <license.ignoreMissingEmbeddedLicense>4.1.94.Final</license.ignoreMissingEmbeddedLicense>
+ <license.ignoreMissingEmbeddedNotice>4.1.94.Final</license.ignoreMissingEmbeddedNotice>
+ <license.ignoreNoticeOverride>4.1.94.Final</license.ignoreNoticeOverride>
</properties>
</project>
</supplement>
@@ -247,9 +247,9 @@
<artifactId>netty-handler</artifactId>
<properties>
<!-- netty is ALv2, and does not contain any embedded LICENSE or NOTICE file -->
- <license.ignoreMissingEmbeddedLicense>4.1.87.Final</license.ignoreMissingEmbeddedLicense>
- <license.ignoreMissingEmbeddedNotice>4.1.87.Final</license.ignoreMissingEmbeddedNotice>
- <license.ignoreNoticeOverride>4.1.87.Final</license.ignoreNoticeOverride>
+ <license.ignoreMissingEmbeddedLicense>4.1.94.Final</license.ignoreMissingEmbeddedLicense>
+ <license.ignoreMissingEmbeddedNotice>4.1.94.Final</license.ignoreMissingEmbeddedNotice>
+ <license.ignoreNoticeOverride>4.1.94.Final</license.ignoreNoticeOverride>
</properties>
</project>
</supplement>
@@ -259,9 +259,9 @@
<artifactId>netty-handler-proxy</artifactId>
<properties>
<!-- netty is ALv2, and does not contain any embedded LICENSE or NOTICE file -->
- <license.ignoreMissingEmbeddedLicense>4.1.87.Final</license.ignoreMissingEmbeddedLicense>
- <license.ignoreMissingEmbeddedNotice>4.1.87.Final</license.ignoreMissingEmbeddedNotice>
- <license.ignoreNoticeOverride>4.1.87.Final</license.ignoreNoticeOverride>
+ <license.ignoreMissingEmbeddedLicense>4.1.94.Final</license.ignoreMissingEmbeddedLicense>
+ <license.ignoreMissingEmbeddedNotice>4.1.94.Final</license.ignoreMissingEmbeddedNotice>
+ <license.ignoreNoticeOverride>4.1.94.Final</license.ignoreNoticeOverride>
</properties>
</project>
</supplement>
@@ -271,9 +271,9 @@
<artifactId>netty-buffer</artifactId>
<properties>
<!-- netty is ALv2, and does not contain any embedded LICENSE or NOTICE file -->
- <license.ignoreMissingEmbeddedLicense>4.1.87.Final</license.ignoreMissingEmbeddedLicense>
- <license.ignoreMissingEmbeddedNotice>4.1.87.Final</license.ignoreMissingEmbeddedNotice>
- <license.ignoreNoticeOverride>4.1.87.Final</license.ignoreNoticeOverride>
+ <license.ignoreMissingEmbeddedLicense>4.1.94.Final</license.ignoreMissingEmbeddedLicense>
+ <license.ignoreMissingEmbeddedNotice>4.1.94.Final</license.ignoreMissingEmbeddedNotice>
+ <license.ignoreNoticeOverride>4.1.94.Final</license.ignoreNoticeOverride>
</properties>
</project>
</supplement>
@@ -283,9 +283,9 @@
<artifactId>netty-common</artifactId>
<properties>
<!-- netty is ALv2, and does not contain any embedded LICENSE or NOTICE file -->
- <license.ignoreMissingEmbeddedLicense>4.1.87.Final</license.ignoreMissingEmbeddedLicense>
- <license.ignoreMissingEmbeddedNotice>4.1.87.Final</license.ignoreMissingEmbeddedNotice>
- <license.ignoreNoticeOverride>4.1.87.Final</license.ignoreNoticeOverride>
+ <license.ignoreMissingEmbeddedLicense>4.1.94.Final</license.ignoreMissingEmbeddedLicense>
+ <license.ignoreMissingEmbeddedNotice>4.1.94.Final</license.ignoreMissingEmbeddedNotice>
+ <license.ignoreNoticeOverride>4.1.94.Final</license.ignoreNoticeOverride>
</properties>
</project>
</supplement>
@@ -295,9 +295,9 @@
<artifactId>netty-codec-http</artifactId>
<properties>
<!-- netty is ALv2, and does not contain any embedded LICENSE or NOTICE file -->
- <license.ignoreMissingEmbeddedLicense>4.1.87.Final</license.ignoreMissingEmbeddedLicense>
- <license.ignoreMissingEmbeddedNotice>4.1.87.Final</license.ignoreMissingEmbeddedNotice>
- <license.ignoreNoticeOverride>4.1.87.Final</license.ignoreNoticeOverride>
+ <license.ignoreMissingEmbeddedLicense>4.1.94.Final</license.ignoreMissingEmbeddedLicense>
+ <license.ignoreMissingEmbeddedNotice>4.1.94.Final</license.ignoreMissingEmbeddedNotice>
+ <license.ignoreNoticeOverride>4.1.94.Final</license.ignoreNoticeOverride>
</properties>
</project>
</supplement>
@@ -307,9 +307,9 @@
<artifactId>netty-resolver</artifactId>
<properties>
<!-- netty is ALv2, and does not contain any embedded LICENSE or NOTICE file -->
- <license.ignoreMissingEmbeddedLicense>4.1.87.Final</license.ignoreMissingEmbeddedLicense>
- <license.ignoreMissingEmbeddedNotice>4.1.87.Final</license.ignoreMissingEmbeddedNotice>
- <license.ignoreNoticeOverride>4.1.87.Final</license.ignoreNoticeOverride>
+ <license.ignoreMissingEmbeddedLicense>4.1.94.Final</license.ignoreMissingEmbeddedLicense>
+ <license.ignoreMissingEmbeddedNotice>4.1.94.Final</license.ignoreMissingEmbeddedNotice>
+ <license.ignoreNoticeOverride>4.1.94.Final</license.ignoreNoticeOverride>
</properties>
</project>
</supplement>
@@ -319,9 +319,9 @@
<artifactId>netty-resolver-dns</artifactId>
<properties>
<!-- netty is ALv2, and does not contain any embedded LICENSE or NOTICE file -->
- <license.ignoreMissingEmbeddedLicense>4.1.87.Final</license.ignoreMissingEmbeddedLicense>
- <license.ignoreMissingEmbeddedNotice>4.1.87.Final</license.ignoreMissingEmbeddedNotice>
- <license.ignoreNoticeOverride>4.1.87.Final</license.ignoreNoticeOverride>
+ <license.ignoreMissingEmbeddedLicense>4.1.94.Final</license.ignoreMissingEmbeddedLicense>
+ <license.ignoreMissingEmbeddedNotice>4.1.94.Final</license.ignoreMissingEmbeddedNotice>
+ <license.ignoreNoticeOverride>4.1.94.Final</license.ignoreNoticeOverride>
</properties>
</project>
</supplement>
@@ -331,9 +331,9 @@
<artifactId>netty-transport-native-unix-common</artifactId>
<properties>
<!-- netty is ALv2, and does not contain any embedded LICENSE or NOTICE file -->
- <license.ignoreMissingEmbeddedLicense>4.1.87.Final</license.ignoreMissingEmbeddedLicense>
- <license.ignoreMissingEmbeddedNotice>4.1.87.Final</license.ignoreMissingEmbeddedNotice>
- <license.ignoreNoticeOverride>4.1.87.Final</license.ignoreNoticeOverride>
+ <license.ignoreMissingEmbeddedLicense>4.1.94.Final</license.ignoreMissingEmbeddedLicense>
+ <license.ignoreMissingEmbeddedNotice>4.1.94.Final</license.ignoreMissingEmbeddedNotice>
+ <license.ignoreNoticeOverride>4.1.94.Final</license.ignoreNoticeOverride>
</properties>
</project>
</supplement>
@@ -463,9 +463,10 @@
<properties>
<!-- snappy-java is ALv2, and does not contain any embedded LICENSE or NOTICE file -->
<!-- license override not needed, ALv2 is specified in its pom.xml -->
- <!-- see https://raw.githubusercontent.com/xerial/snappy-java/1.1.8.4/LICENSE -->
- <license.ignoreMissingEmbeddedLicense>1.1.8.4</license.ignoreMissingEmbeddedLicense>
- <license.ignoreMissingEmbeddedNotice>1.1.8.4</license.ignoreMissingEmbeddedNotice>
+ <!-- see https://raw.githubusercontent.com/xerial/snappy-java/v1.1.10.1/LICENSE -->
+ <license.ignoreMissingEmbeddedLicense>1.1.10.1</license.ignoreMissingEmbeddedLicense>
+ <license.ignoreMissingEmbeddedNotice>1.1.10.1</license.ignoreMissingEmbeddedNotice>
+ <license.ignoreNoticeOverride>1.1.10.1</license.ignoreNoticeOverride>
</properties>
</project>
</supplement>
diff --git a/asterixdb/src/main/licenses/content/raw.githubusercontent.com_netty_netty_netty-4.1.87.Final_NOTICE.txt b/asterixdb/src/main/licenses/content/raw.githubusercontent.com_netty_netty_netty-4.1.94.Final_NOTICE.txt
similarity index 100%
rename from asterixdb/src/main/licenses/content/raw.githubusercontent.com_netty_netty_netty-4.1.87.Final_NOTICE.txt
rename to asterixdb/src/main/licenses/content/raw.githubusercontent.com_netty_netty_netty-4.1.94.Final_NOTICE.txt
diff --git a/asterixdb/src/main/licenses/content/raw.githubusercontent.com_xerial_snappy-java_v1.1.10.1_NOTICE.txt b/asterixdb/src/main/licenses/content/raw.githubusercontent.com_xerial_snappy-java_v1.1.10.1_NOTICE.txt
new file mode 100644
index 0000000..19301705
--- /dev/null
+++ b/asterixdb/src/main/licenses/content/raw.githubusercontent.com_xerial_snappy-java_v1.1.10.1_NOTICE.txt
@@ -0,0 +1,16 @@
+This product includes software developed by Google
+ Snappy: http://code.google.com/p/snappy/ (New BSD License)
+
+This product includes software developed by Apache
+ PureJavaCrc32C from apache-hadoop-common http://hadoop.apache.org/
+ (Apache 2.0 license)
+
+This library containd statically linked libstdc++. This inclusion is allowed by
+"GCC RUntime Library Exception"
+http://gcc.gnu.org/onlinedocs/libstdc++/manual/license.html
+
+== Contributors ==
+ * Tatu Saloranta
+ * Providing benchmark suite
+ * Alec Wysoker
+ * Performance and memory usage improvement
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/NetworkAddress.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/NetworkAddress.java
index d74f500..75fbb92 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/NetworkAddress.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/NetworkAddress.java
@@ -25,6 +25,7 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
+import java.util.Objects;
import org.apache.hyracks.api.io.IWritable;
@@ -82,6 +83,10 @@
return inetSocketAddress;
}
+ public InetSocketAddress toInetSocketAddress() {
+ return new InetSocketAddress(address, port);
+ }
+
public int getPort() {
return port;
}
@@ -102,7 +107,7 @@
return false;
}
NetworkAddress on = (NetworkAddress) o;
- return on.port == port && on.address == address;
+ return on.port == port && Objects.equals(on.address, address);
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/network/INetworkSecurityConfig.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/network/INetworkSecurityConfig.java
index b483158..7fc0335 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/network/INetworkSecurityConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/network/INetworkSecurityConfig.java
@@ -20,7 +20,9 @@
import java.io.File;
import java.io.Serializable;
+import java.net.InetAddress;
import java.security.KeyStore;
+import java.util.Optional;
public interface INetworkSecurityConfig extends Serializable {
@@ -65,4 +67,11 @@
* @return the trust store file
*/
File getTrustStoreFile();
+
+ /**
+ * The optional address to bind for RMI server sockets; or absent to bind to all addresses / interfaces.
+ *
+ * @return the optional bind address
+ */
+ Optional<InetAddress> getRMIBindAddress();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
index 7d04cf2..57f9750 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
@@ -29,6 +29,7 @@
import org.apache.hyracks.util.ComputingAction;
import org.apache.hyracks.util.IDelay;
import org.apache.hyracks.util.IOInterruptibleAction;
+import org.apache.hyracks.util.IOThrowingAction;
import org.apache.hyracks.util.IRetryPolicy;
import org.apache.hyracks.util.InterruptibleAction;
import org.apache.hyracks.util.Span;
@@ -188,7 +189,7 @@
}
}
- @SuppressWarnings({ "squid:S1181", "squid:S1193" }) // catching Throwable, instanceof of exception
+ @SuppressWarnings({ "squid:S1181", "squid:S1193", "ConstantConditions" }) // catching Throwable, instanceofs
public static void tryWithCleanups(ThrowingAction action, ThrowingAction... cleanups) throws Exception {
Throwable savedT = null;
boolean suppressedInterrupted = false;
@@ -225,6 +226,43 @@
}
}
+ @SuppressWarnings({ "squid:S1181", "squid:S1193", "ConstantConditions" }) // catching Throwable, instanceofs
+ public static void tryIoWithCleanups(IOThrowingAction action, IOThrowingAction... cleanups) throws IOException {
+ Throwable savedT = null;
+ boolean suppressedInterrupted = false;
+ try {
+ action.run();
+ } catch (Throwable t) {
+ savedT = t;
+ } finally {
+ for (IOThrowingAction cleanup : cleanups) {
+ try {
+ cleanup.run();
+ } catch (Throwable t) {
+ if (savedT != null) {
+ savedT.addSuppressed(t);
+ suppressedInterrupted = suppressedInterrupted || t instanceof InterruptedException;
+ } else {
+ savedT = t;
+ }
+ }
+ }
+ }
+ if (savedT == null) {
+ return;
+ }
+ if (suppressedInterrupted) {
+ Thread.currentThread().interrupt();
+ }
+ if (savedT instanceof Error) {
+ throw (Error) savedT;
+ } else if (savedT instanceof IOException) {
+ throw (IOException) savedT;
+ } else {
+ throw HyracksDataException.create(savedT);
+ }
+ }
+
/**
* Runs the supplied action, after suspending any pending interruption. An error will be logged if
* the action is itself interrupted.
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java
index e3135df..9a08e8e2 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java
@@ -47,4 +47,9 @@
callback.setException(e);
}
}
+
+ @Override
+ public String toString() {
+ return getName() + " jobId:" + jobId;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
index 727793b..c3a09f9 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
@@ -37,7 +37,7 @@
public class JobletCleanupNotificationWork extends AbstractHeartbeatWork {
private static final Logger LOGGER = LogManager.getLogger();
- private JobId jobId;
+ private final JobId jobId;
public JobletCleanupNotificationWork(ClusterControllerService ccs, JobId jobId, String nodeId) {
super(ccs, nodeId, null);
@@ -77,4 +77,9 @@
}
}
}
+
+ @Override
+ public String toString() {
+ return getName() + " jobId:" + jobId + ", nodeId:" + nodeId;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
index b1700fe..ec21785 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
@@ -18,6 +18,7 @@
*/
package org.apache.hyracks.control.cc.work;
+import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
@@ -50,8 +51,9 @@
protected void doRun() throws Exception {
String id = reg.getNodeId();
LOGGER.info("registering node: {}", id);
- NodeControllerRemoteProxy nc = new NodeControllerRemoteProxy(ccs.getCcId(),
- ccs.getClusterIPC().getReconnectingHandle(reg.getNodeControllerAddress().resolveInetSocketAddress()));
+ InetSocketAddress ncAddress = reg.getNodeControllerAddress().toInetSocketAddress();
+ NodeControllerRemoteProxy nc =
+ new NodeControllerRemoteProxy(ccs.getCcId(), ccs.getClusterIPC().getReconnectingHandle(ncAddress));
INodeManager nodeManager = ccs.getNodeManager();
NodeParameters params = new NodeParameters();
params.setClusterControllerInfo(ccs.getClusterControllerInfo());
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
index 14d92fb..ed3e574 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
@@ -87,4 +87,9 @@
});
}
}
+
+ @Override
+ public String toString() {
+ return getName() + " jobId:" + jobId;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 8c9cbfb..f69d106 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -426,7 +426,7 @@
NodeParameters nodeParameters = ccc.getNodeParameters();
// Start heartbeat generator.
heartbeatManagers.computeIfAbsent(ccId, newCcId -> HeartbeatManager.init(this, ccc, hbTask.getHeartbeatData(),
- nodeRegistration.getNodeControllerAddress().resolveInetSocketAddress()));
+ nodeRegistration.getNodeControllerAddress().toInetSocketAddress()));
if (!ccTimers.containsKey(ccId) && nodeParameters.getProfileDumpPeriod() > 0) {
Timer ccTimer = new Timer("Timer-" + ccId, true);
// Schedule profile dump generator.
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java
index f47e1ce..bfe3706 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java
@@ -70,4 +70,9 @@
"Joblet couldn't be found. Tasks of job " + jobId + " have all either completed or failed");
}
}
+
+ @Override
+ public String toString() {
+ return getName() + " jobId:" + jobId;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
index ae2cfa0..75edd38 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
@@ -35,7 +35,7 @@
private final JobId jobId;
- private JobStatus status;
+ private final JobStatus status;
public CleanupJobletWork(NodeControllerService ncs, JobId jobId, JobStatus status) {
this.ncs = ncs;
@@ -54,4 +54,9 @@
joblet.cleanup(status);
}
}
+
+ @Override
+ public String toString() {
+ return getName() + " jobId:" + jobId + ", status:" + status;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
index f6c144d..f277046 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
@@ -315,4 +315,9 @@
}
return channelsForInputConnectors;
}
+
+ @Override
+ public String toString() {
+ return getName() + " jobId:" + jobId;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
index e00c519..740af2f 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
@@ -25,6 +25,8 @@
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.InvokeUtil;
import org.apache.hyracks.http.api.IServletResponse;
import org.apache.hyracks.http.server.utils.HttpUtil;
import org.apache.logging.log4j.Level;
@@ -45,6 +47,7 @@
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
+import io.netty.util.ReferenceCountUtil;
/**
* A chunked http response. Here is how it is expected to work:
@@ -114,28 +117,40 @@
@Override
public void close() throws IOException {
- if (writer != null) {
- writer.close();
- } else {
- outputStream.close();
- }
- if (errorBuf == null && response.status() == HttpResponseStatus.OK) {
- if (!done) {
- respond(LastHttpContent.EMPTY_LAST_CONTENT);
- }
- } else {
- // There was an error
- if (headerSent) {
- LOGGER.log(Level.WARN, "Error after header write of chunked response");
- if (errorBuf != null) {
- errorBuf.release();
+ try {
+ InvokeUtil.tryIoWithCleanups(() -> {
+ if (writer != null) {
+ writer.close();
+ } else {
+ outputStream.close();
}
- future = ctx.channel().close().addListener(handler);
- } else {
- // we didn't send anything to the user, we need to send an non-chunked error response
- fullResponse(response.protocolVersion(), response.status(),
- errorBuf == null ? ctx.alloc().buffer(0, 0) : errorBuf, response.headers());
- }
+ if (errorBuf == null && response.status() == HttpResponseStatus.OK) {
+ if (!done) {
+ respond(LastHttpContent.EMPTY_LAST_CONTENT);
+ }
+ } else {
+ // There was an error
+ if (headerSent) {
+ LOGGER.log(Level.WARN, "Error after header write of chunked response");
+ future = ctx.channel().close().addListener(handler);
+ } else {
+ // we didn't send anything to the user, we need to send an non-chunked error response
+ fullResponse(response.protocolVersion(), response.status(),
+ errorBuf == null ? ctx.alloc().buffer(0, 0) : errorBuf, response.headers());
+ // The responsibility of releasing the error buffer is now with the netty pipeline since it is
+ // forwarded within the http content. We must nullify buffer to avoid releasing the buffer twice.
+ errorBuf = null;
+ }
+ }
+ }, outputStream::close, () -> {
+ ReferenceCountUtil.release(errorBuf);
+ // We must nullify buffer to avoid releasing the buffer twice in case of duplicate close()
+ errorBuf = null;
+ });
+ } catch (IOException e) {
+ throw e;
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
}
done = true;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/security/NetworkSecurityConfig.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/security/NetworkSecurityConfig.java
index 2170c15..bfcd623 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/security/NetworkSecurityConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/security/NetworkSecurityConfig.java
@@ -22,10 +22,12 @@
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
+import java.net.InetAddress;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
+import java.util.Optional;
import org.apache.hyracks.api.network.INetworkSecurityConfig;
@@ -90,6 +92,11 @@
return trustStoreFile;
}
+ @Override
+ public Optional<InetAddress> getRMIBindAddress() {
+ return Optional.empty();
+ }
+
private void writeObject(ObjectOutputStream out) throws IOException {
out.defaultWriteObject();
writeStore(keyStore, out);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractAsynchronousScheduler.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractAsynchronousScheduler.java
index e266a6f..049da38 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractAsynchronousScheduler.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractAsynchronousScheduler.java
@@ -38,9 +38,10 @@
private final int maxNumFlushes;
protected final Map<String, ILSMIOOperation> runningFlushOperations = new HashMap<>();
+ protected final Map<String, ILSMIOOperation> runningReplicateOperations = new HashMap<>();
protected final Deque<ILSMIOOperation> waitingFlushOperations = new ArrayDeque<>();
protected final Deque<ILSMIOOperation> waitingMergeOperations = new ArrayDeque<>();
-
+ protected final Deque<ILSMIOOperation> waitingReplicateOperations = new ArrayDeque<>();
protected final Map<String, Throwable> failedGroups = new HashMap<>();
public AbstractAsynchronousScheduler(ThreadFactory threadFactory, final IIoOperationFailedCallback callback,
@@ -58,8 +59,11 @@
case MERGE:
scheduleMerge(operation);
break;
+ case REPLICATE:
+ scheduleReplicate(operation);
+ break;
case NOOP:
- return;
+ break;
default:
// this should never happen
// just guard here to avoid silent failures in case of future extensions
@@ -75,6 +79,10 @@
break;
case MERGE:
completeMerge(operation);
+ break;
+ case REPLICATE:
+ completeReplicate(operation);
+ break;
case NOOP:
return;
default:
@@ -149,6 +157,46 @@
}
}
+ private void scheduleReplicate(ILSMIOOperation operation) {
+ String id = operation.getIndexIdentifier();
+ synchronized (executor) {
+ if (runningReplicateOperations.size() >= maxNumFlushes || runningReplicateOperations.containsKey(id)) {
+ waitingReplicateOperations.add(operation);
+ } else {
+ runningReplicateOperations.put(id, operation);
+ executor.submit(operation);
+ }
+ }
+ }
+
+ private void completeReplicate(ILSMIOOperation operation) {
+ String id = operation.getIndexIdentifier();
+ synchronized (executor) {
+ runningReplicateOperations.remove(id);
+ // Schedule replicate in FIFO order. Must make sure that there is at most one scheduled replicate for each index.
+ for (ILSMIOOperation replicateOp : waitingReplicateOperations) {
+ String replicateOpId = replicateOp.getIndexIdentifier();
+ if (runningReplicateOperations.size() < maxNumFlushes) {
+ if (!runningReplicateOperations.containsKey(replicateOpId) && !replicateOp.isCompleted()) {
+ runningReplicateOperations.put(replicateOpId, replicateOp);
+ executor.submit(replicateOp);
+ }
+ } else {
+ break;
+ }
+ }
+ // cleanup scheduled replicate
+ while (!waitingReplicateOperations.isEmpty()) {
+ ILSMIOOperation top = waitingReplicateOperations.peek();
+ if (top.isCompleted() || runningReplicateOperations.get(top.getIndexIdentifier()) == top) {
+ waitingReplicateOperations.poll();
+ } else {
+ break;
+ }
+ }
+ }
+ }
+
@Override
public void close() throws IOException {
executor.shutdown();
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IOThrowingAction.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IOThrowingAction.java
new file mode 100644
index 0000000..b9430d8
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IOThrowingAction.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.util;
+
+import java.io.IOException;
+
+@FunctionalInterface
+public interface IOThrowingAction {
+ void run() throws IOException; // NOSONAR
+
+ static ComputingAction<Void> asComputingAction(IOThrowingAction action) {
+ return () -> {
+ action.run();
+ return null;
+ };
+ }
+}
diff --git a/hyracks-fullstack/pom.xml b/hyracks-fullstack/pom.xml
index df23608..5ed10e8 100644
--- a/hyracks-fullstack/pom.xml
+++ b/hyracks-fullstack/pom.xml
@@ -73,10 +73,10 @@
<hadoop.version>3.3.1</hadoop.version>
<jacoco.version>0.7.6.201602180812</jacoco.version>
<log4j.version>2.19.0</log4j.version>
- <snappy.version>1.1.8.4</snappy.version>
- <jackson.version>2.14.1</jackson.version>
+ <snappy.version>1.1.10.1</snappy.version>
+ <jackson.version>2.14.3</jackson.version>
<jackson-databind.version>${jackson.version}</jackson-databind.version>
- <netty.version>4.1.87.Final</netty.version>
+ <netty.version>4.1.94.Final</netty.version>
<implementation.title>Apache Hyracks and Algebricks - ${project.name}</implementation.title>
<implementation.url>https://asterixdb.apache.org/</implementation.url>
@@ -112,6 +112,16 @@
</dependency>
<dependency>
<groupId>io.netty</groupId>
+ <artifactId>netty-codec-dns</artifactId>
+ <version>${netty.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec-socks</artifactId>
+ <version>${netty.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<version>${netty.version}</version>
</dependency>
@@ -127,6 +137,11 @@
</dependency>
<dependency>
<groupId>io.netty</groupId>
+ <artifactId>netty-resolver</artifactId>
+ <version>${netty.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
<artifactId>netty-resolver-dns</artifactId>
<version>${netty.version}</version>
</dependency>
@@ -482,12 +497,12 @@
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-util</artifactId>
- <version>9.4.48.v20220622</version>
+ <version>9.4.51.v20230217</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-util-ajax</artifactId>
- <version>9.4.48.v20220622</version>
+ <version>9.4.51.v20230217</version>
</dependency>
<!-- Manually included to avoid CVE-2023-1370 -->
<dependency>