Merge branch 'gerrit/stabilization-40cfb8705b' into 'gerrit/neo'
Change-Id: Id982c1f25f98cd2b1a61f321214e5a9c953a5a2e
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/CancelUnnestWithNestedListifyRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/CancelUnnestWithNestedListifyRule.java
index 069ba49..bcb59b4 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/CancelUnnestWithNestedListifyRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/CancelUnnestWithNestedListifyRule.java
@@ -194,7 +194,7 @@
AggregateOperator agg = (AggregateOperator) nestedPlanRoot;
Mutable<ILogicalOperator> aggInputOpRef = agg.getInputs().get(0);
- if (agg.getVariables().size() > 1) {
+ if (agg.getVariables().size() != 1) {
return false;
}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveRedundantListifyRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveRedundantListifyRule.java
index f968b35..0f490be 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveRedundantListifyRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveRedundantListifyRule.java
@@ -200,7 +200,7 @@
return false;
}
AggregateOperator agg = (AggregateOperator) r;
- if (agg.getVariables().size() > 1) {
+ if (agg.getVariables().size() != 1) {
return false;
}
LogicalVariable aggVar = agg.getVariables().get(0);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index 626b938..63d8846 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -153,9 +153,8 @@
@Override
public synchronized void notify(ActiveEvent event) {
try {
- if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "EventListener is notified.");
- }
+ LOGGER.debug("CC handling event {}; state={}, prev state={}, suspended={}", event, state, prevState,
+ suspended);
ActiveEvent.Kind eventKind = event.getEventKind();
switch (eventKind) {
case JOB_CREATED:
@@ -194,26 +193,21 @@
@SuppressWarnings("unchecked")
protected void finish(ActiveEvent event) throws HyracksDataException {
- if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "Active job {} finished", jobId);
- }
JobId lastJobId = jobId;
+ Pair<JobStatus, List<Exception>> status = (Pair<JobStatus, List<Exception>>) event.getEventObject();
if (numRegistered != numDeRegistered) {
LOGGER.log(Level.WARN,
- "Active job {} finished with reported runtime registrations = {} and deregistrations = {}", jobId,
- numRegistered, numDeRegistered);
+ "ingestion job {} finished with status={}, reported runtime registrations={}, deregistrations={}",
+ jobId, status, numRegistered, numDeRegistered);
}
jobId = null;
- Pair<JobStatus, List<Exception>> status = (Pair<JobStatus, List<Exception>>) event.getEventObject();
JobStatus jobStatus = status.getLeft();
List<Exception> exceptions = status.getRight();
- if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "Active job {} finished with status {}", lastJobId, jobStatus);
- }
+ LOGGER.debug("ingestion job {} finished with status {}", lastJobId, jobStatus);
if (!jobSuccessfullyTerminated(jobStatus)) {
jobFailure = exceptions.isEmpty() ? new RuntimeDataException(ErrorCode.UNREPORTED_TASK_FAILURE_EXCEPTION)
: exceptions.get(0);
- LOGGER.error("Active Job {} failed", lastJobId, jobFailure);
+ LOGGER.error("ingestion job {} failed", lastJobId, jobFailure);
setState((state == ActivityState.STOPPING || state == ActivityState.CANCELLING) ? ActivityState.STOPPED
: ActivityState.TEMPORARILY_FAILED);
if (prevState == ActivityState.RUNNING) {
@@ -371,16 +365,14 @@
@Override
public synchronized void recover() {
- if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "Recover is called on {}", entityId);
- }
if (retryPolicyFactory == NoRetryPolicyFactory.INSTANCE) {
- LOGGER.log(level, "But it has no recovery policy, so it is set to permanent failure");
+ LOGGER.debug("recover is called on {} w/o recovery policy; setting to permanent failure", entityId);
setState(ActivityState.STOPPED);
} else {
+ LOGGER.debug("recover is called on {}", entityId);
ExecutorService executor = appCtx.getServiceContext().getControllerService().getExecutor();
setState(ActivityState.TEMPORARILY_FAILED);
- LOGGER.log(level, "Recovery task has been submitted");
+ LOGGER.debug("recovery task has been submitted");
rt = createRecoveryTask();
executor.submit(rt.recover());
}
@@ -481,13 +473,9 @@
try {
Thread.currentThread().setName(nameBefore + " : WaitForCompletionForJobId: " + jobId);
sendStopMessages(metadataProvider, timeout, unit);
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Waiting for its state to become " + waitFor);
- }
+ LOGGER.debug("waiting for {} to become {}", jobId, waitFor);
subscriber.sync();
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Disconnect has been completed " + waitFor);
- }
+ LOGGER.debug("disconnect has been completed {}", waitFor);
} catch (InterruptedException ie) {
forceStop(subscriber, ie);
Thread.currentThread().interrupt();
@@ -513,13 +501,8 @@
ICCMessageBroker messageBroker = (ICCMessageBroker) applicationCtx.getServiceContext().getMessageBroker();
AlgebricksAbsolutePartitionConstraint runtimeLocations = getLocations();
int partition = 0;
- if (LOGGER.isInfoEnabled()) {
- LOGGER.log(Level.INFO, "Sending stop messages to " + runtimeLocations);
- }
+ LOGGER.log(Level.INFO, "Sending stop messages to {}", runtimeLocations);
for (String location : runtimeLocations.getLocations()) {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.log(Level.INFO, "Sending to " + location);
- }
ActiveRuntimeId runtimeId = getActiveRuntimeId(partition++);
messageBroker.sendApplicationMessageToNC(new ActiveManagerMessage(ActiveManagerMessage.Kind.STOP_ACTIVITY,
runtimeId, new StopRuntimeParameters(timeout, unit)), location);
@@ -581,14 +564,10 @@
WaitForStateSubscriber subscriber;
Future<Void> suspendTask;
synchronized (this) {
- if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "suspending entity " + entityId);
- LOGGER.log(level, "Waiting for ongoing activities");
- }
+ LOGGER.log(level, "{} Suspending entity {}", jobId, entityId);
+ LOGGER.log(level, "{} Waiting for ongoing activities", jobId);
waitForNonTransitionState();
- if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "Proceeding with suspension. Current state is " + state);
- }
+ LOGGER.log(level, "{} Proceeding with suspension. Current state is {}", jobId, state);
if (state == ActivityState.STOPPED) {
suspended = true;
return;
@@ -609,12 +588,12 @@
doSuspend(metadataProvider);
return null;
});
- LOGGER.log(level, "Suspension task has been submitted");
+ LOGGER.log(level, "{} Suspension task has been submitted", jobId);
}
try {
- LOGGER.log(level, "Waiting for suspension task to complete");
+ LOGGER.log(level, "{} Waiting for suspension task to complete", jobId);
suspendTask.get();
- LOGGER.log(level, "waiting for state to become SUSPENDED or TEMPORARILY_FAILED");
+ LOGGER.log(level, "{} Waiting for state to become SUSPENDED or TEMPORARILY_FAILED", jobId);
subscriber.sync();
suspended = true;
} catch (Exception e) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
index 3c277d5..1f31698 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
@@ -92,16 +92,14 @@
@Override
public void notifyJobCreation(JobId jobId, JobSpecification jobSpecification) throws HyracksDataException {
- if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "notifyJobCreation was called for job {}", jobId);
- }
Object property = jobSpecification.getProperty(ACTIVE_ENTITY_PROPERTY_NAME);
if (!(property instanceof EntityId)) {
- if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "Job {} is not of type active job. property found to be {}", jobId, property);
+ if (property != null) {
+ LOGGER.debug("{} is not an active job. job property={}", jobId, property);
}
return;
}
+ LOGGER.debug("notified of ingestion job creation {}", jobId);
EntityId entityId = (EntityId) property;
monitorJob(jobId, entityId);
boolean found = jobId2EntityId.get(jobId) != null;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
index 34a54d1..e4e7b36 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
@@ -140,6 +140,7 @@
if (!cancelRecovery && listener.getState() == ActivityState.TEMPORARILY_FAILED) {
listener.setState(ActivityState.RECOVERING);
listener.doRecover(metadataProvider);
+ listener.setRunning(metadataProvider, true);
}
LOGGER.log(level, "Recovery completed successfully");
return null;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
index 65d1039..04e661b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
@@ -82,4 +82,9 @@
}
}
+ @Override
+ public String toString() {
+ return String.format("%s(id=%s, uuid=%s, contextId=%s, node=%s)", getClass().getSimpleName(), reqId, uuid,
+ contextId, nodeId);
+ }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java
index d65ae31..a711b73 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java
@@ -49,4 +49,8 @@
return status;
}
+ @Override
+ public String toString() {
+ return String.format("%s(id=%s, status=%s)", getClass().getSimpleName(), reqId, status);
+ }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 4d32967..61656b7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -22,6 +22,7 @@
import java.rmi.RemoteException;
import java.rmi.server.UnicastRemoteObject;
import java.util.Collection;
+import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -160,6 +161,7 @@
private ICacheManager cacheManager;
private IConfigValidator configValidator;
private IDiskWriteRateLimiterProvider diskWriteRateLimiterProvider;
+ private Integer metadataPartitionId;
public NCAppRuntimeContext(INCServiceContext ncServiceContext, NCExtensionManager extensionManager,
IPropertiesFactory propertiesFactory) {
@@ -443,6 +445,7 @@
@Override
public void initializeMetadata(boolean newUniverse, int partitionId) throws Exception {
LOGGER.info("Bootstrapping metadata");
+ metadataPartitionId = partitionId;
MetadataNode.INSTANCE.initialize(this, ncExtensionManager.getMetadataTupleTranslatorProvider(),
ncExtensionManager.getMetadataExtensions(), partitionId);
@@ -636,4 +639,9 @@
public IDiskWriteRateLimiterProvider getDiskWriteRateLimiterProvider() {
return diskWriteRateLimiterProvider;
}
+
+ @Override
+ public OptionalInt getMetadataPartitionId() {
+ return metadataPartitionId == null ? OptionalInt.empty() : OptionalInt.of(metadataPartitionId);
+ }
}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-3403/query-ASTERIXDB-3403.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-3403/query-ASTERIXDB-3403.1.ddl.sqlpp
new file mode 100644
index 0000000..0b94eb1
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-3403/query-ASTERIXDB-3403.1.ddl.sqlpp
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Description: This test case is to verify the fix for ASTERIXDB-3403
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+
+use test;
+
+create type dt1 as {id:int};
+create dataset collection1(dt1) primary key id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-3403/query-ASTERIXDB-3403.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-3403/query-ASTERIXDB-3403.2.update.sqlpp
new file mode 100644
index 0000000..327d1d6
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-3403/query-ASTERIXDB-3403.2.update.sqlpp
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+insert into collection1
+([
+ {
+ "id": 1,
+ "f1": "f1",
+ "x1": [{"date":"01-02-2024", "item": '1234', "cnt":2}]
+
+ },
+ {
+ "id": 2,
+ "f1": "f1",
+ "x2": [{"date":"01-02-2024", "item": '5678', "cnt":2}]
+ },
+ {
+ "id": 3,
+ "f1": "f1",
+ "x3": [{"su": {"x4":2}, "item": "i1", "cnt":2}]
+ },
+ {
+ "id": 4,
+ "f1": "f1",
+ "x3": [{"su": {"x4":5}, "item": 1234, "cnt":2}]
+ }
+]);
+
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-3403/query-ASTERIXDB-3403.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-3403/query-ASTERIXDB-3403.3.query.sqlpp
new file mode 100644
index 0000000..ca70034
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/query-ASTERIXDB-3403/query-ASTERIXDB-3403.3.query.sqlpp
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Description: This test case is to verify the fix for ASTERIXDB-3403
+ */
+
+use test;
+
+ SELECT COUNT(id) AS matches
+ FROM collection1 AS d
+ WHERE d.`f1` = 'f1'
+ AND (ARRAY_SUM((
+ SELECT VALUE i.`cnt`
+ FROM d.`x1` AS i
+ WHERE i.`date` BETWEEN "01-01-2024" AND "02-02-2024"
+ AND i.`item` IN ['1234', '5678'] )) >= 1
+ OR ARRAY_SUM((
+ SELECT VALUE i.`cnt`
+ FROM d.`x2` AS i
+ WHERE i.`date` BETWEEN "01-01-2024" AND "02-02-2024"
+ AND i.`item` IN ['1234', '5678'] )) >= 1
+ OR (ANY e IN d.x3 SATISFIES e.item IN ['i1', 'i2', 'i3']
+ AND e.su.`x4` >= 1 END));
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/query-ASTERIXDB-3403/query-ASTERIXDB-3403.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/query-ASTERIXDB-3403/query-ASTERIXDB-3403.3.adm
new file mode 100644
index 0000000..433b8ee
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/query-ASTERIXDB-3403/query-ASTERIXDB-3403.3.adm
@@ -0,0 +1 @@
+{ "matches": 3 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index b6dbf7f..66e47ba 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -7300,6 +7300,11 @@
<output-dir compare="Text">query-ASTERIXDB-3418</output-dir>
</compilation-unit>
</test-case>
+ <test-case FilePath="misc">
+ <compilation-unit name="query-ASTERIXDB-3403">
+ <output-dir compare="Text">query-ASTERIXDB-3403</output-dir>
+ </compilation-unit>
+ </test-case>
</test-group>
<test-group name="multipart-dataverse">
<test-case FilePath="multipart-dataverse">
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
index 5475b97..a5ae099 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.rmi.RemoteException;
+import java.util.OptionalInt;
import java.util.concurrent.Executor;
import org.apache.asterix.common.context.IStorageComponentProvider;
@@ -147,4 +148,6 @@
* @return the disk write rate limiter provider
*/
IDiskWriteRateLimiterProvider getDiskWriteRateLimiterProvider();
+
+ OptionalInt getMetadataPartitionId();
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/GlobalVirtualBufferCache.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/GlobalVirtualBufferCache.java
index 0bb9bd5..1565730 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/GlobalVirtualBufferCache.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/GlobalVirtualBufferCache.java
@@ -173,13 +173,16 @@
// 2. there are still some active readers and memory cannot be reclaimed.
// But for both cases, we will notify all primary index op trackers to let their writers retry,
// if they have been blocked. Moreover, we will check whether more flushes are needed.
+ List<ILSMOperationTracker> opTrackers = new ArrayList<>();
synchronized (this) {
final int size = primaryIndexes.size();
for (int i = 0; i < size; i++) {
- ILSMOperationTracker opTracker = primaryIndexes.get(i).getOperationTracker();
- synchronized (opTracker) {
- opTracker.notifyAll();
- }
+ opTrackers.add(primaryIndexes.get(i).getOperationTracker());
+ }
+ }
+ for (ILSMOperationTracker opTracker : opTrackers) {
+ synchronized (opTracker) {
+ opTracker.notifyAll();
}
}
checkAndNotifyFlushThread();
diff --git a/asterixdb/asterix-doc/src/main/markdown/builtins/9_aggregate_sql.md b/asterixdb/asterix-doc/src/main/markdown/builtins/9_aggregate_sql.md
index 0657fb0..755fd39 100644
--- a/asterixdb/asterix-doc/src/main/markdown/builtins/9_aggregate_sql.md
+++ b/asterixdb/asterix-doc/src/main/markdown/builtins/9_aggregate_sql.md
@@ -48,8 +48,8 @@
* or, a `missing` value.
* Return Value:
* a `bigint` value representing the number of non-null and non-missing items in the given collection,
- * `null` is returned if the input is `null` or `missing`,
- * any other non-array and non-multiset input value will cause an error.
+ * `0` is returned if the input is `null` or `missing`,
+ * `0` is returned if the input is not an array or a multiset.
* Example:
@@ -77,8 +77,8 @@
* a `double` value representing the average of the non-null and non-missing numbers in the given collection,
* `null` is returned if the input is `null` or `missing`,
* `null` is returned if the given collection does not contain any non-null and non-missing items,
- * any other non-array and non-multiset input value will cause a type error,
- * any other non-numeric value in the input collection will cause a type error.
+ * `null` is returned if the input is not an array or a multiset,
+ * any other non-numeric value in the input collection will be ignored.
* Example:
@@ -107,8 +107,8 @@
items.
* `null` is returned if the input is `null` or `missing`,
* `null` is returned if the given collection does not contain any non-null and non-missing items,
- * any other non-array and non-multiset input value will cause a type error,
- * any other non-numeric value in the input collection will cause a type error.
+ * `null` is returned if the input is not an array or a multiset,
+ * any other non-numeric value in the input collection will be ignored.
* Example:
@@ -136,8 +136,8 @@
type promotion order (`tinyint`-> `smallint`->`integer`->`bigint`->`float`->`double`) among numeric items.
* `null` is returned if the input is `null` or `missing`,
* `null` is returned if the given collection does not contain any non-null and non-missing items,
- * multiple incomparable items in the input array or multiset will cause a type error,
- * any other non-array and non-multiset input value will cause a type error.
+ * `null` is returned if there are incomparable items in the input array or multiset,
+ * `null` is returned if the input is not an array or a multiset.
* Example:
@@ -165,8 +165,8 @@
type promotion order (`tinyint`-> `smallint`->`integer`->`bigint`->`float`->`double`) among numeric items.
* `null` is returned if the input is `null` or `missing`,
* `null` is returned if the given collection does not contain any non-null and non-missing items,
- * multiple incomparable items in the input array or multiset will cause a type error,
- * any other non-array and non-multiset input value will cause a type error.
+ * `null` is returned if there are incomparable items in the input array or multiset,
+ * `null` is returned if the input is not an array or a multiset.
* Example:
@@ -177,6 +177,44 @@
3.4
+### array_median ###
+ * Syntax:
+
+ array_median(num_collection)
+
+ * Gets the median value of the numeric items in the given collection, ignoring null, missing, and non-numeric items.
+
+ The function starts by sorting the numeric items.
+
+ - If there is an odd number of numeric items, the function returns the item that is exactly in the middle of the range: that is, it has the same number of items before and after.
+ - If there is an even number of numeric items, the function returns the mean of the two items that are exactly in the middle of the range.
+
+ * Note: You cannot use the `DISTINCT` keyword with this function, or with the `median` aggregation pseudo-function.
+ The `median` aggregation pseudo-function does support the `FILTER` clause.
+ There is no `strict_median` function corresponding to this function.
+ * Arguments:
+ * `num_collection` could be:
+ * an `array` or `multiset` of numbers,
+ * or, a `null` value,
+ * or, a `missing` value.
+ * Clauses: When used as a window function, this function supports the [Window Partition Clause](manual.html#Window_partition_clause), but not the [Window Order Clause](manual.html#Window_order_clause) or the [Window Frame Clause](manual.html#Window_frame_clause).
+ * Return Value:
+ * a `double` value representing the median of the numeric items in the given collection,
+ * `null` is returned if the input is `null` or `missing`,
+ * `null` is returned if the given collection does not contain any numeric items,
+ * `null` is returned if the input is not an array or a multiset,
+ * any other non-numeric value in the input collection will be ignored.
+ * Example:
+
+ { "v1": array_median( [1.2, 2.3, 3.4, 0, null, missing],
+ "v2": array_median( [1.2, 2.3, 3.4, 4.5, 0, null, missing] ) };
+
+ * The expected result is:
+
+ { "v1": 1.75,
+ "v2": 2.3 }
+
+
### array_stddev_samp ###
* Syntax:
@@ -193,7 +231,7 @@
* a `double` value representing the sample standard deviation of the non-null and non-missing numbers in the given collection,
* `null` is returned if the input is `null` or `missing`,
* `null` is returned if the given collection does not contain any non-null and non-missing items,
- * any other non-array and non-multiset input value will cause a type error,
+ * `null` is returned if the input is not an array or a multiset,
* any other non-numeric value in the input collection will cause a type error.
* Example:
@@ -220,7 +258,7 @@
* a `double` value representing the population standard deviation of the non-null and non-missing numbers in the given collection,
* `null` is returned if the input is `null` or `missing`,
* `null` is returned if the given collection does not contain any non-null and non-missing items,
- * any other non-array and non-multiset input value will cause a type error,
+ * `null` is returned if the input is not an array or a multiset,
* any other non-numeric value in the input collection will cause a type error.
* Example:
@@ -247,7 +285,7 @@
* a `double` value representing the sample variance of the non-null and non-missing numbers in the given collection,
* `null` is returned if the input is `null` or `missing`,
* `null` is returned if the given collection does not contain any non-null and non-missing items,
- * any other non-array and non-multiset input value will cause a type error,
+ * `null` is returned if the input is not an array or a multiset,
* any other non-numeric value in the input collection will cause a type error.
* Example:
@@ -274,7 +312,7 @@
* a `double` value representing the population variance of the non-null and non-missing numbers in the given collection,
* `null` is returned if the input is `null` or `missing`,
* `null` is returned if the given collection does not contain any non-null and non-missing items,
- * any other non-array and non-multiset input value will cause a type error,
+ * `null` is returned if the input is not an array or a multiset,
* any other non-numeric value in the input collection will cause a type error.
* Example:
@@ -301,7 +339,7 @@
* a `double` value representing the skewness of the non-null and non-missing numbers in the given collection,
* `null` is returned if the input is `null` or `missing`,
* `null` is returned if the given collection does not contain any non-null and non-missing items,
- * any other non-array and non-multiset input value will cause a type error,
+ * `null` is returned if the input is not an array or a multiset,
* any other non-numeric value in the input collection will cause a type error.
* Example:
@@ -328,7 +366,7 @@
* a `double` value representing the kurtosis from a normal distribution of the non-null and non-missing numbers in the given collection,
* `null` is returned if the input is `null` or `missing`,
* `null` is returned if the given collection does not contain any non-null and non-missing items,
- * any other non-array and non-multiset input value will cause a type error,
+ * `null` is returned if the input is not an array or a multiset,
* any other non-numeric value in the input collection will cause a type error.
* Example:
@@ -352,7 +390,8 @@
* or a `missing` value.
* Return Value:
* a `bigint` value representing the number of items in the given collection,
- * `null` is returned if the input is `null` or `missing`.
+ * `0` is returned if the input is `null` or `missing`,
+ * `0` is returned if the input is not an array or a multiset.
* Example:
@@ -377,7 +416,8 @@
* a `double` value representing the average of the numbers in the given collection,
* `null` is returned if the input is `null` or `missing`,
* `null` is returned if there is a `null` or `missing` in the input collection,
- * any other non-numeric value in the input collection will cause a type error.
+ * `null` is returned if the input is not an array or a multiset,
+ * `null` is returned if there are any other non-numeric values in the input collection.
* Example:
@@ -404,7 +444,8 @@
items.
* `null` is returned if the input is `null` or `missing`,
* `null` is returned if there is a `null` or `missing` in the input collection,
- * any other non-numeric value in the input collection will cause a type error.
+ * `null` is returned if the input is not an array or a multiset,
+ * `null` is returned if there are any other non-numeric values in the input collection.
* Example:
@@ -431,8 +472,8 @@
(`tinyint`-> `smallint`->`integer`->`bigint`->`float`->`double`) among numeric items.
* `null` is returned if the input is `null` or `missing`,
* `null` is returned if there is a `null` or `missing` in the input collection,
- * multiple incomparable items in the input array or multiset will cause a type error,
- * any other non-array and non-multiset input value will cause a type error.
+ * `null` is returned if there are incomparable items in the input array or multiset,
+ * `null` is returned if the input is not an array or a multiset.
* Example:
@@ -460,8 +501,8 @@
(`tinyint`-> `smallint`->`integer`->`bigint`->`float`->`double`) among numeric items.
* `null` is returned if the input is `null` or `missing`,
* `null` is returned if there is a `null` or `missing` in the input collection,
- * multiple incomparable items in the input array or multiset will cause a type error,
- * any other non-array and non-multiset input value will cause a type error.
+ * `null` is returned if there are incomparable items in the input array or multiset,
+ * `null` is returned if the input is not an array or a multiset.
* Example:
@@ -536,6 +577,7 @@
* a `double` value representing the sample variance of the numbers in the given collection,
* `null` is returned if the input is `null` or `missing`,
* `null` is returned if there is a `null` or `missing` in the input collection,
+ * `null` is returned if the input is not an array or a multiset,
* any other non-numeric value in the input collection will cause a type error.
* Example:
@@ -561,6 +603,7 @@
* a `double` value representing the population variance of the numbers in the given collection,
* `null` is returned if the input is `null` or `missing`,
* `null` is returned if there is a `null` or `missing` in the input collection,
+ * `null` is returned if the input is not an array or a multiset,
* any other non-numeric value in the input collection will cause a type error.
* Example:
@@ -586,6 +629,7 @@
* a `double` value representing the skewness of the numbers in the given collection,
* `null` is returned if the input is `null` or `missing`,
* `null` is returned if there is a `null` or `missing` in the input collection,
+ * `null` is returned if the input is not an array or a multiset,
* any other non-numeric value in the input collection will cause a type error.
* Example:
@@ -611,6 +655,7 @@
* a `double` value representing the kurtosis from a normal distribution of the numbers in the given collection,
* `null` is returned if the input is `null` or `missing`,
* `null` is returned if there is a `null` or `missing` in the input collection,
+ * `null` is returned if the input is not an array or a multiset,
* any other non-numeric value in the input collection will cause a type error.
* Example:
diff --git a/asterixdb/asterix-doc/src/main/markdown/sqlpp/3_query.md b/asterixdb/asterix-doc/src/main/markdown/sqlpp/3_query.md
index 04a65d1..2bd9f49 100644
--- a/asterixdb/asterix-doc/src/main/markdown/sqlpp/3_query.md
+++ b/asterixdb/asterix-doc/src/main/markdown/sqlpp/3_query.md
@@ -1215,20 +1215,21 @@
For example, `SELECT COUNT(*) FROM customers` simply returns the total number of customers, whereas `SELECT COUNT(rating) FROM customers` returns the number of customers who have known ratings (that is, their ratings are not `null` or `missing`).
-Because the aggregation pseudo-functions sometimes restructure their operands, they can be used only in query blocks where (explicit or implicit) grouping is being done. Therefore the pseudo-functions cannot operate directly on arrays or multisets. For operating directly on JSON collections, SQL++ provides a set of ordinary functions for computing aggregations. Each ordinary aggregation function (except the ones corresponding to `COUNT` and `ARRAY_AGG`) has two versions: one that ignores `null` and `missing` values and one that returns `null` if a `null` or `missing` value is encountered anywhere in the collection. The names of the aggregation functions are as follows:
+Because the aggregation pseudo-functions sometimes restructure their operands, they can be used only in query blocks where (explicit or implicit) grouping is being done. Therefore the pseudo-functions cannot operate directly on arrays or multisets. For operating directly on JSON collections, SQL++ provides a set of ordinary functions for computing aggregations. Each ordinary aggregation function (except as noted below) has two versions: one that ignores `null` and `missing` values, and one that returns `null` if a `null` or `missing` value is encountered anywhere in the collection. The names of the aggregation functions are as follows:
| Aggregation pseudo-function; operates on groups only | Ordinary function: Ignores NULL or MISSING values | Ordinary function: Returns NULL if NULL or MISSING are encountered|
|----------|----------|--------|
-|SUM| ARRAY_SUM| STRICT_SUM |
-| AVG |ARRAY_MAX| STRICT_MAX |
-| MAX | ARRAY_MIN| STRICT_MIN |
-| MIN | ARRAY_AVG| STRICT_AVG |
+| SUM | ARRAY_SUM| STRICT_SUM |
+| AVG | ARRAY_AVG| STRICT_AVG |
+| MAX | ARRAY_MAX| STRICT_MAX |
+| MIN | ARRAY_MIN| STRICT_MIN |
| COUNT |ARRAY_COUNT|STRICT_COUNT (see exception below) |
+| MEDIAN | ARRAY_MEDIAN | |
|STDDEV_SAMP|ARRAY_STDDEV_SAMP| STRICT_STDDEV_SAMP |
|STDDEV_POP|ARRAY_STDDEV_POP| STRICT_STDDEV_POP |
|VAR_SAMP|ARRAY_VAR_SAMP| STRICT_VAR_SAMP |
|VAR_POP|ARRAY_VAR_POP| STRICT_VAR_POP |
-|SKEWENESS|ARRAY_SKEWNESS| STRICT_SKEWNESS |
+|SKEWNESS|ARRAY_SKEWNESS| STRICT_SKEWNESS |
|KURTOSIS|ARRAY_KURTOSIS| STRICT_KURTOSIS |
| |ARRAY_AGG| |
diff --git a/asterixdb/asterix-doc/src/main/markdown/sqlpp/4_windowfunctions.md b/asterixdb/asterix-doc/src/main/markdown/sqlpp/4_windowfunctions.md
index d6e40e7..c41e8a3 100644
--- a/asterixdb/asterix-doc/src/main/markdown/sqlpp/4_windowfunctions.md
+++ b/asterixdb/asterix-doc/src/main/markdown/sqlpp/4_windowfunctions.md
@@ -108,7 +108,7 @@
The *window order clause* determines how tuples are ordered within each partition. The window function works on tuples in the order specified by this clause.
-This clause may be used with any [window function](builtins.html#WindowFunctions), or any [aggregate function](builtins.html#AggregateFunctions) used as a window function.
+This clause may be used with any [window function](builtins.html#WindowFunctions), and most [aggregate functions](builtins.html#AggregateFunctions) — refer to the descriptions of individual functions for more details.
This clause is optional. If omitted, all tuples are considered peers, i.e. their order is tied. When tuples in the window partition are tied, each window function behaves differently.
@@ -130,7 +130,7 @@
##### WindowFrameClause

-The *window frame clause* defines the window frame. It can be used with all [aggregate functions](builtins.html#AggregateFunctions) and some [window functions](builtins.html#WindowFunctions) — refer to the descriptions of individual functions for more details. It is optional and allowed only when the [window order clause](#Window_order_clause) is present.
+The *window frame clause* defines the window frame. It can be used with some [window functions](builtins.html#WindowFunctions) and most [aggregate functions](builtins.html#AggregateFunctions) — refer to the descriptions of individual functions for more details. It is optional and allowed only when the [window order clause](#Window_order_clause) is present.
* If this clause is omitted and there is no [window order clause](#Window_order_clause), the window frame is the entire partition.
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
index 0f5949e..97d03e6 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
@@ -51,6 +51,11 @@
@Override
public void perform(INcApplicationContext appCtx, IReplicationWorker worker) throws HyracksDataException {
LOGGER.debug("processing {}", this);
+ if (appCtx.getMetadataPartitionId().isPresent() && appCtx.getMetadataPartitionId().getAsInt() == partition
+ && appCtx.getReplicaManager().getPartitions().contains(partition)) {
+ LOGGER.warn("received request to get metadata files from non-master {}", worker.getRemoteAddress());
+ throw new IllegalStateException();
+ }
final PersistentLocalResourceRepository localResourceRepository =
(PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
localResourceRepository.cleanup(partition);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SleepDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SleepDescriptor.java
index ac87f7e..ef348fe 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SleepDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SleepDescriptor.java
@@ -64,16 +64,16 @@
final long time = ATypeHierarchy.getLongValue(getIdentifier().getName(), 1, bytes, offset);
try {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.log(Level.INFO,
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.log(Level.TRACE,
ctx.getTaskContext().getTaskAttemptId() + " sleeping for " + time + " ms");
}
Thread.sleep(time);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.log(Level.INFO,
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.log(Level.TRACE,
ctx.getTaskContext().getTaskAttemptId() + " done sleeping for " + time + " ms");
}
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
index b123a5e..ae903d1 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
@@ -77,6 +77,11 @@
ensureMaxCapacity();
}
+ @Override
+ public IReadOnlyClusterCapacity getClusterCapacity() {
+ return resourceManager.getCurrentCapacity();
+ }
+
private void ensureMaxCapacity() {
final IClusterCapacity currentCapacity = resourceManager.getCurrentCapacity();
final IReadOnlyClusterCapacity maximumCapacity = resourceManager.getMaximumCapacity();
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
index 9f4541f5..0c74260 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
@@ -74,7 +74,7 @@
}
protected void flushIfNotFailed() throws HyracksDataException {
- if (!failed && appender.getTupleCount() > 0) {
+ if (!failed && appender != null && appender.getTupleCount() > 0) {
flushAndReset();
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java
index 9e38a20..b18bcb1 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java
@@ -24,6 +24,11 @@
public class DefaultJobCapacityController implements IJobCapacityController {
public static final DefaultJobCapacityController INSTANCE = new DefaultJobCapacityController();
+ private static final IClusterCapacity CAPACITY = new ClusterCapacity();
+ static {
+ CAPACITY.setAggregatedCores(Integer.MAX_VALUE);
+ CAPACITY.setAggregatedMemoryByteSize(Long.MAX_VALUE);
+ }
private DefaultJobCapacityController() {
}
@@ -37,4 +42,9 @@
public void release(JobSpecification job) {
// No operation here.
}
+
+ @Override
+ public IReadOnlyClusterCapacity getClusterCapacity() {
+ return CAPACITY;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java
index 5fa4bd9..f88baa2 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java
@@ -57,4 +57,10 @@
*/
void release(JobSpecification job);
+ /**
+ * The cluster current capacity.
+ *
+ * @return the cluster current capacity.
+ */
+ IReadOnlyClusterCapacity getClusterCapacity();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
index 3574acd..631f226 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
@@ -539,7 +539,7 @@
private void abortTaskCluster(TaskClusterAttempt tcAttempt,
TaskClusterAttempt.TaskClusterStatus failedOrAbortedStatus) {
- LOGGER.trace(() -> "Aborting task cluster: " + tcAttempt.getAttempt());
+ LOGGER.trace("Aborting task cluster: {}", tcAttempt.getAttempt());
Set<TaskAttemptId> abortTaskIds = new HashSet<>();
Map<String, List<TaskAttemptId>> abortTaskAttemptMap = new HashMap<>();
for (TaskAttempt ta : tcAttempt.getTaskAttempts().values()) {
@@ -561,14 +561,12 @@
}
}
final JobId jobId = jobRun.getJobId();
- LOGGER.trace(() -> "Abort map for job: " + jobId + ": " + abortTaskAttemptMap);
+ LOGGER.trace("Abort map for job: {}: {}", jobId, abortTaskAttemptMap);
INodeManager nodeManager = ccs.getNodeManager();
abortTaskAttemptMap.forEach((key, abortTaskAttempts) -> {
final NodeControllerState node = nodeManager.getNodeControllerState(key);
if (node != null) {
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("Aborting: " + abortTaskAttempts + " at " + key);
- }
+ LOGGER.trace("Aborting: {} at {}", abortTaskAttempts, key);
try {
node.getNodeController().abortTasks(jobId, abortTaskAttempts);
} catch (Exception e) {
@@ -579,8 +577,8 @@
inProgressTaskClusters.remove(tcAttempt.getTaskCluster());
TaskCluster tc = tcAttempt.getTaskCluster();
PartitionMatchMaker pmm = jobRun.getPartitionMatchMaker();
- pmm.removeUncommittedPartitions(tc.getProducedPartitions(), abortTaskIds);
- pmm.removePartitionRequests(tc.getRequiredPartitions(), abortTaskIds);
+ pmm.removeUncommittedPartitions(tc.getProducedPartitions(), abortTaskIds, jobId);
+ pmm.removePartitionRequests(tc.getRequiredPartitions(), abortTaskIds, jobId);
tcAttempt.setStatus(failedOrAbortedStatus);
tcAttempt.setEndTime(System.currentTimeMillis());
@@ -683,7 +681,6 @@
*/
public void notifyTaskFailure(TaskAttempt ta, List<Exception> exceptions) {
try {
- LOGGER.debug("Received failure notification for TaskAttempt " + ta.getTaskAttemptId());
TaskAttemptId taId = ta.getTaskAttemptId();
TaskCluster tc = ta.getTask().getTaskCluster();
TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc);
@@ -696,7 +693,7 @@
LOGGER.trace(() -> "Marking TaskAttempt " + ta.getTaskAttemptId()
+ " as failed and the number of max re-attempts = " + maxReattempts);
if (lastAttempt.getAttempt() >= maxReattempts || isCancelled()) {
- LOGGER.debug(() -> "Aborting the job of " + ta.getTaskAttemptId());
+ LOGGER.debug("Aborting the job:{} of {}", jobRun.getJobId(), ta.getTaskAttemptId());
abortJob(exceptions, NoOpCallback.INSTANCE);
return;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index 4882f4a..2347449 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -38,7 +38,9 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IClusterCapacity;
import org.apache.hyracks.api.job.resource.IJobCapacityController;
+import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity;
import org.apache.hyracks.api.util.ExceptionUtils;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.cc.NodeControllerState;
@@ -129,7 +131,7 @@
if (activeRunMap.containsKey(jobId)) {
JobRun jobRun = activeRunMap.get(jobId);
// The following call will abort all ongoing tasks and then consequently
- // trigger JobCleanupWork and JobCleanupNotificationWork which will update the lifecyle of the job.
+ // trigger JobCleanupWork and JobCleanupNotificationWork which will update the lifecycle of the job.
// Therefore, we do not remove the job out of activeRunMap here.
jobRun.getExecutor().cancelJob(callback);
return;
@@ -139,7 +141,7 @@
if (jobRun != null) {
List<Exception> exceptions =
Collections.singletonList(HyracksException.create(ErrorCode.JOB_CANCELED, jobId));
- // Since the job has not been executed, we only need to update its status and lifecyle here.
+ // Since the job has not been executed, we only need to update its status and lifecycle here.
jobRun.setStatus(JobStatus.FAILURE_BEFORE_EXECUTION, exceptions);
runMapArchive.put(jobId, jobRun);
runMapHistory.put(jobId, exceptions);
@@ -170,7 +172,6 @@
return;
}
if (run.getPendingStatus() != null) {
- LOGGER.warn("Ignoring duplicate cleanup for JobRun with id: {}", run::getJobId);
return;
}
Set<String> targetNodes = run.getParticipatingNodeIds();
@@ -313,6 +314,7 @@
run.setStartTime(System.currentTimeMillis());
run.setStartTimeZoneId(ZoneId.systemDefault().getId());
JobId jobId = run.getJobId();
+ logJobCapacity(run, "running");
activeRunMap.put(jobId, run);
run.setStatus(JobStatus.RUNNING, null);
executeJobInternal(run);
@@ -320,6 +322,7 @@
// Queue a job when the required capacity for the job is not met.
private void queueJob(JobRun jobRun) throws HyracksException {
+ logJobCapacity(jobRun, "queueing");
jobRun.setStatus(JobStatus.PENDING, null);
jobQueue.add(jobRun);
}
@@ -355,5 +358,23 @@
private void releaseJobCapacity(JobRun jobRun) {
final JobSpecification job = jobRun.getJobSpecification();
jobCapacityController.release(job);
+ logJobCapacity(jobRun, "released");
+ }
+
+ private void logJobCapacity(JobRun jobRun, String jobStateDesc) {
+ IClusterCapacity requiredResources = jobRun.getJobSpecification().getRequiredClusterCapacity();
+ if (requiredResources == null) {
+ return;
+ }
+ long requiredMemory = requiredResources.getAggregatedMemoryByteSize();
+ int requiredCPUs = requiredResources.getAggregatedCores();
+ if (requiredMemory == 0 && requiredCPUs == 0) {
+ return;
+ }
+ IReadOnlyClusterCapacity clusterCapacity = jobCapacityController.getClusterCapacity();
+ LOGGER.info("{} {}, memory={}, cpu={}, (new) cluster memory={}, cpu={}, currently running={}, queued={}",
+ jobStateDesc, jobRun.getJobId(), requiredMemory, requiredCPUs,
+ clusterCapacity.getAggregatedMemoryByteSize(), clusterCapacity.getAggregatedCores(),
+ getRunningJobsCount(), jobQueue.size());
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionMatchMaker.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionMatchMaker.java
index ac29b53..8f91944 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionMatchMaker.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionMatchMaker.java
@@ -28,6 +28,7 @@
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.api.dataflow.TaskAttemptId;
+import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.partitions.PartitionId;
import org.apache.hyracks.control.common.job.PartitionDescriptor;
import org.apache.hyracks.control.common.job.PartitionRequest;
@@ -43,14 +44,13 @@
private final Map<PartitionId, List<PartitionRequest>> partitionRequests;
public PartitionMatchMaker() {
- partitionDescriptors = new HashMap<PartitionId, List<PartitionDescriptor>>();
- partitionRequests = new HashMap<PartitionId, List<PartitionRequest>>();
+ partitionDescriptors = new HashMap<>();
+ partitionRequests = new HashMap<>();
}
public List<Pair<PartitionDescriptor, PartitionRequest>> registerPartitionDescriptor(
PartitionDescriptor partitionDescriptor) {
- List<Pair<PartitionDescriptor, PartitionRequest>> matches =
- new ArrayList<Pair<PartitionDescriptor, PartitionRequest>>();
+ List<Pair<PartitionDescriptor, PartitionRequest>> matches = new ArrayList<>();
PartitionId pid = partitionDescriptor.getPartitionId();
boolean matched = false;
List<PartitionRequest> requests = partitionRequests.get(pid);
@@ -73,11 +73,7 @@
}
if (!matched) {
- List<PartitionDescriptor> descriptors = partitionDescriptors.get(pid);
- if (descriptors == null) {
- descriptors = new ArrayList<PartitionDescriptor>();
- partitionDescriptors.put(pid, descriptors);
- }
+ List<PartitionDescriptor> descriptors = partitionDescriptors.computeIfAbsent(pid, k -> new ArrayList<>());
descriptors.add(partitionDescriptor);
}
@@ -108,11 +104,7 @@
}
if (match == null) {
- List<PartitionRequest> requests = partitionRequests.get(pid);
- if (requests == null) {
- requests = new ArrayList<PartitionRequest>();
- partitionRequests.put(pid, requests);
- }
+ List<PartitionRequest> requests = partitionRequests.computeIfAbsent(pid, k -> new ArrayList<>());
requests.add(partitionRequest);
}
@@ -133,17 +125,11 @@
}
private interface IEntryFilter<T> {
- public boolean matches(T o);
+ boolean matches(T o);
}
private static <T> void removeEntries(List<T> list, IEntryFilter<T> filter) {
- Iterator<T> j = list.iterator();
- while (j.hasNext()) {
- T o = j.next();
- if (filter.matches(o)) {
- j.remove();
- }
- }
+ list.removeIf(filter::matches);
}
private static <T> void removeEntries(Map<PartitionId, List<T>> map, IEntryFilter<T> filter) {
@@ -159,30 +145,16 @@
}
public void notifyNodeFailures(final Collection<String> deadNodes) {
- removeEntries(partitionDescriptors, new IEntryFilter<PartitionDescriptor>() {
- @Override
- public boolean matches(PartitionDescriptor o) {
- return deadNodes.contains(o.getNodeId());
- }
- });
- removeEntries(partitionRequests, new IEntryFilter<PartitionRequest>() {
- @Override
- public boolean matches(PartitionRequest o) {
- return deadNodes.contains(o.getNodeId());
- }
- });
+ removeEntries(partitionDescriptors, o -> deadNodes.contains(o.getNodeId()));
+ removeEntries(partitionRequests, o -> deadNodes.contains(o.getNodeId()));
}
- public void removeUncommittedPartitions(Set<PartitionId> partitionIds, final Set<TaskAttemptId> taIds) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Removing uncommitted partitions: " + partitionIds);
+ public void removeUncommittedPartitions(Set<PartitionId> partitionIds, Set<TaskAttemptId> taIds, JobId jobId) {
+ if (!partitionIds.isEmpty()) {
+ LOGGER.debug("Removing uncommitted partitions {}: {}", jobId, partitionIds);
}
- IEntryFilter<PartitionDescriptor> filter = new IEntryFilter<PartitionDescriptor>() {
- @Override
- public boolean matches(PartitionDescriptor o) {
- return o.getState() != PartitionState.COMMITTED && taIds.contains(o.getProducingTaskAttemptId());
- }
- };
+ IEntryFilter<PartitionDescriptor> filter =
+ o -> o.getState() != PartitionState.COMMITTED && taIds.contains(o.getProducingTaskAttemptId());
for (PartitionId pid : partitionIds) {
List<PartitionDescriptor> descriptors = partitionDescriptors.get(pid);
if (descriptors != null) {
@@ -194,16 +166,11 @@
}
}
- public void removePartitionRequests(Set<PartitionId> partitionIds, final Set<TaskAttemptId> taIds) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Removing partition requests: " + partitionIds);
+ public void removePartitionRequests(Set<PartitionId> partitionIds, Set<TaskAttemptId> taIds, JobId jobId) {
+ if (!partitionIds.isEmpty()) {
+ LOGGER.debug("Removing partition requests {}: {}", jobId, partitionIds);
}
- IEntryFilter<PartitionRequest> filter = new IEntryFilter<PartitionRequest>() {
- @Override
- public boolean matches(PartitionRequest o) {
- return taIds.contains(o.getRequestingTaskAttemptId());
- }
- };
+ IEntryFilter<PartitionRequest> filter = o -> taIds.contains(o.getRequestingTaskAttemptId());
for (PartitionId pid : partitionIds) {
List<PartitionRequest> requests = partitionRequests.get(pid);
if (requests != null) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
index 9f8a7e2..46dd351 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
@@ -45,7 +45,6 @@
import org.apache.hyracks.control.common.result.AbstractResultManager;
import org.apache.hyracks.control.common.result.ResultStateSweeper;
import org.apache.hyracks.control.common.work.IResultCallback;
-import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -79,9 +78,7 @@
@Override
public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(getClass().getSimpleName() + " notified of new job " + jobId);
- }
+ LOGGER.debug("{} notified of new job {}", getClass().getSimpleName(), jobId);
if (jobResultLocations.get(jobId) != null) {
throw HyracksDataException.create(ErrorCode.MORE_THAN_ONE_RESULT, jobId);
}
@@ -157,15 +154,14 @@
@Override
public synchronized void reportJobFailure(JobId jobId, List<Exception> exceptions) {
- Exception ex = exceptions.isEmpty() ? null : exceptions.get(0);
- Level logLevel = Level.DEBUG;
- if (LOGGER.isEnabled(logLevel)) {
- LOGGER.log(logLevel, "job " + jobId + " failed and is being reported to " + getClass().getSimpleName(), ex);
- }
ResultJobRecord rjr = getResultJobRecord(jobId);
+ if (logFailure(rjr)) {
+ LOGGER.debug("job {} failed and is being reported to {}", jobId, getClass().getSimpleName());
+ }
if (rjr != null) {
rjr.fail(exceptions);
}
+ Exception ex = exceptions.isEmpty() ? null : exceptions.get(0);
final JobResultInfo jobResultInfo = jobResultLocations.get(jobId);
if (jobResultInfo != null) {
jobResultInfo.setException(ex);
@@ -211,6 +207,15 @@
}
}
+ private static boolean logFailure(ResultJobRecord rjr) {
+ if (rjr == null) {
+ return true;
+ }
+ // don't re-log if the state is already failed
+ ResultJobRecord.Status status = rjr.getStatus();
+ return status == null || status.getState() != State.FAILED;
+ }
+
/**
* Compares the records already known by the client for the given job's result set id with the records that the
* result directory service knows and if there are any newly discovered records returns a whole array with the
@@ -264,7 +269,7 @@
class JobResultInfo {
- private ResultJobRecord record;
+ private final ResultJobRecord record;
private Waiters waiters;
private Exception exception;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
index 260c6b9..38277c2 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
@@ -121,4 +121,9 @@
public void clear() {
jobListMap.clear();
}
+
+ @Override
+ public int size() {
+ return jobListMap.size();
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IJobQueue.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IJobQueue.java
index be40883..1f2c29a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IJobQueue.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IJobQueue.java
@@ -73,4 +73,11 @@
* Clears the job queue
*/
void clear();
+
+ /**
+ * Returns the number of queued jobs.
+ *
+ * @return the number of queued jobs.
+ */
+ int size();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java
index 771832e..4f77914 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java
@@ -25,6 +25,7 @@
import org.apache.hyracks.api.messages.IMessage;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.common.deployment.DeploymentUtils;
+import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -34,8 +35,8 @@
public class ApplicationMessageWork extends AbstractHeartbeatWork {
private static final Logger LOGGER = LogManager.getLogger();
- private byte[] message;
- private DeploymentId deploymentId;
+ private final byte[] message;
+ private final DeploymentId deploymentId;
public ApplicationMessageWork(ClusterControllerService ccs, byte[] message, DeploymentId deploymentId,
String nodeId) {
@@ -61,6 +62,11 @@
return getName() + ": nodeID: " + nodeId;
}
+ @Override
+ public Level logLevel() {
+ return Level.TRACE;
+ }
+
private static void notifyMessageBroker(ICCServiceContext ctx, IMessage msg, String nodeId) {
final ExecutorService executor = ctx.getControllerService().getExecutor();
executor.execute(() -> {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java
index c36b887..73c0c7d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java
@@ -24,10 +24,11 @@
import org.apache.hyracks.control.cc.cluster.INodeManager;
import org.apache.hyracks.control.common.work.AbstractWork;
import org.apache.hyracks.control.common.work.IResultCallback;
+import org.apache.logging.log4j.Level;
public class GetNodeControllersInfoWork extends AbstractWork {
private final INodeManager nodeManager;
- private IResultCallback<Map<String, NodeControllerInfo>> callback;
+ private final IResultCallback<Map<String, NodeControllerInfo>> callback;
public GetNodeControllersInfoWork(INodeManager nodeManager,
IResultCallback<Map<String, NodeControllerInfo>> callback) {
@@ -39,4 +40,9 @@
public void run() {
callback.setValue(nodeManager.getNodeControllerInfoMap());
}
+
+ @Override
+ public Level logLevel() {
+ return Level.TRACE;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
index 77d2f82..6454804 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
@@ -34,11 +34,11 @@
public class JobCleanupWork extends AbstractWork {
private static final Logger LOGGER = LogManager.getLogger();
- private IJobManager jobManager;
- private JobId jobId;
- private JobStatus status;
- private List<Exception> exceptions;
- private IResultCallback<Void> callback;
+ private final IJobManager jobManager;
+ private final JobId jobId;
+ private final JobStatus status;
+ private final List<Exception> exceptions;
+ private final IResultCallback<Void> callback;
public JobCleanupWork(IJobManager jobManager, JobId jobId, JobStatus status, List<Exception> exceptions,
IResultCallback<Void> callback) {
@@ -51,9 +51,6 @@
@Override
public void run() {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("Cleanup for job: {}", jobId);
- }
final JobRun jobRun = jobManager.get(jobId);
if (jobRun == null) {
LOGGER.debug("Ignoring cleanup for unknown job: {}", jobId);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java
index ee10669..cbdf98a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java
@@ -71,6 +71,6 @@
@Override
public Level logLevel() {
- return Level.DEBUG;
+ return Level.TRACE;
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
index 833066e..33d391f 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
@@ -22,17 +22,13 @@
import org.apache.hyracks.api.dataflow.TaskAttemptId;
import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.util.ExceptionUtils;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.cc.job.IJobManager;
import org.apache.hyracks.control.cc.job.JobRun;
import org.apache.hyracks.control.cc.job.TaskAttempt;
-import org.apache.logging.log4j.Level;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
public class TaskFailureWork extends AbstractTaskLifecycleWork {
- private static final Logger LOGGER = LogManager.getLogger();
+
private final List<Exception> exceptions;
public TaskFailureWork(ClusterControllerService ccs, JobId jobId, TaskAttemptId taId, String nodeId,
@@ -43,9 +39,6 @@
@Override
protected void performEvent(TaskAttempt ta) {
- Exception ex = exceptions.get(0);
- LOGGER.log(ExceptionUtils.causedByInterrupt(ex) ? Level.DEBUG : Level.WARN,
- "Executing task failure work for " + this, ex);
IJobManager jobManager = ccs.getJobManager();
JobRun run = jobManager.get(jobId);
if (run == null) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java
index 85661fe..9c5a9fa 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java
@@ -148,7 +148,7 @@
if (configured) {
throw new IllegalStateException("configuration already processed");
}
- LOGGER.debug("registering option: " + option.toIniString());
+ LOGGER.trace("registering option: {}", option::toIniString);
Map<String, IOption> optionMap = sectionMap.computeIfAbsent(option.section(), section -> new HashMap<>());
IOption prev = optionMap.put(option.ini(), option);
if (prev != null) {
@@ -160,8 +160,13 @@
registeredOptions.add(option);
optionSetters.put(option, (node, value, isDefault) -> correctedMap(node, isDefault).put(option, value));
if (LOGGER.isDebugEnabled()) {
- optionSetters.put(option, (node, value, isDefault) -> LOGGER.debug("{} {} to {} for node {}",
- isDefault ? "defaulting" : "setting", option.toIniString(), value, node));
+ optionSetters.put(option, (node, value, isDefault) -> {
+ if (isDefault) {
+ LOGGER.trace("defaulting {} to {} for node {}", option.toIniString(), value, node);
+ } else {
+ LOGGER.debug("setting {} to {} for node {}", option.toIniString(), value, node);
+ }
+ });
}
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
index d1f7d5a..c99898d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
@@ -70,6 +70,8 @@
InvokeUtil.runWithTimeout(() -> {
this.wait(REGISTRATION_RESPONSE_POLL_PERIOD); // NOSONAR while loop in timeout call
}, () -> !registrationPending, 1, TimeUnit.MINUTES);
+ } catch (InterruptedException e) {
+ throw e;
} catch (Exception e) {
registrationException = e;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
index eee8950..e52e3ac 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
@@ -52,7 +52,6 @@
private boolean failed;
protected boolean flushRequest;
private boolean deallocated;
- private Level openCloseLevel = Level.DEBUG;
private Thread dataConsumerThread;
public MaterializingPipelinedPartition(IHyracksTaskContext ctx, PartitionManager manager, PartitionId pid,
@@ -181,9 +180,6 @@
@Override
public void open() throws HyracksDataException {
- if (LOGGER.isEnabled(openCloseLevel)) {
- LOGGER.log(openCloseLevel, "open(" + pid + " by " + taId);
- }
size = 0;
eos = false;
failed = false;
@@ -215,9 +211,6 @@
@Override
public void close() throws HyracksDataException {
- if (LOGGER.isEnabled(openCloseLevel)) {
- LOGGER.log(openCloseLevel, "close(" + pid + " by " + taId);
- }
if (writeHandle != null) {
ctx.getIoManager().close(writeHandle);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java
index 6d4f173..d1360d8 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java
@@ -30,10 +30,10 @@
public class ApplicationMessageWork extends AbstractWork {
private static final Logger LOGGER = LogManager.getLogger();
- private byte[] message;
- private DeploymentId deploymentId;
- private String nodeId;
- private NodeControllerService ncs;
+ private final byte[] message;
+ private final DeploymentId deploymentId;
+ private final String nodeId;
+ private final NodeControllerService ncs;
public ApplicationMessageWork(NodeControllerService ncs, byte[] message, DeploymentId deploymentId, String nodeId) {
this.ncs = ncs;
@@ -62,4 +62,9 @@
public String toString() {
return getName() + ": nodeId: " + nodeId;
}
+
+ @Override
+ public Level logLevel() {
+ return Level.TRACE;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
index b0c60aa..cd79da7 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
@@ -23,7 +23,7 @@
import org.apache.hyracks.api.dataflow.TaskAttemptId;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.result.IResultPartitionManager;
-import org.apache.hyracks.api.util.ExceptionUtils;
+import org.apache.hyracks.api.util.ErrorMessageUtil;
import org.apache.hyracks.control.common.work.AbstractWork;
import org.apache.hyracks.control.nc.NodeControllerService;
import org.apache.hyracks.control.nc.Task;
@@ -50,9 +50,6 @@
@Override
public void run() {
- Exception ex = exceptions.get(0);
- LOGGER.log(ExceptionUtils.causedByInterrupt(ex) ? Level.DEBUG : Level.WARN, "task " + taskId + " has failed",
- ex);
try {
IResultPartitionManager resultPartitionManager = ncs.getResultPartitionManager();
if (resultPartitionManager != null) {
@@ -69,6 +66,8 @@
@Override
public String toString() {
- return getName() + ": [" + ncs.getId() + "[" + jobId + ":" + taskId + "]";
+ return getName() + ": [" + ncs.getId() + "[" + jobId + ":" + taskId + "]"
+ + ((exceptions != null && !exceptions.isEmpty())
+ ? " " + ErrorMessageUtil.getCauseMessage(exceptions.get(0)) : "");
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 555e8fb..0f31491 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -319,9 +319,6 @@
if (!failed) {
state.hybridHJ.closeBuild();
ctx.setStateObject(state);
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("OptimizedHybridHashJoin closed its build phase");
- }
} else {
state.hybridHJ.clearBuildTempFiles();
}
@@ -402,10 +399,6 @@
writer.open();
state.hybridHJ.initProbe(probComp);
-
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("OptimizedHybridHashJoin is starting the probe phase.");
- }
}
@Override
@@ -416,7 +409,7 @@
@Override
public void fail() throws HyracksDataException {
failed = true;
- if (state.hybridHJ != null) {
+ if (state != null && state.hybridHJ != null) {
state.hybridHJ.fail();
}
writer.fail();
@@ -427,12 +420,13 @@
if (failed) {
try {
// Clear temp files if fail() was called.
- state.hybridHJ.clearBuildTempFiles();
- state.hybridHJ.clearProbeTempFiles();
+ if (state != null && state.hybridHJ != null) {
+ state.hybridHJ.clearBuildTempFiles();
+ state.hybridHJ.clearProbeTempFiles();
+ }
} finally {
writer.close(); // writer should always be closed.
}
- logProbeComplete();
return;
}
try {
@@ -477,17 +471,7 @@
// Re-throw the whatever is caught.
throw e;
} finally {
- try {
- logProbeComplete();
- } finally {
- writer.close();
- }
- }
- }
-
- private void logProbeComplete() {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("OptimizedHybridHashJoin closed its probe phase");
+ writer.close();
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java
index 08f15b3..a1704ec 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java
@@ -41,13 +41,9 @@
import org.apache.hyracks.dataflow.std.structures.IResetableComparableFactory;
import org.apache.hyracks.dataflow.std.structures.MaxHeap;
import org.apache.hyracks.dataflow.std.structures.TuplePointer;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
public class TupleSorterHeapSort implements ITupleSorter {
- private static final Logger LOGGER = LogManager.getLogger();
-
class HeapEntryFactory implements IResetableComparableFactory<HeapEntry> {
@Override
public IResetableComparable<HeapEntry> createResetableComparable() {
@@ -288,7 +284,6 @@
int maxFrameSize = outputFrame.getFrameSize();
int numEntries = heap.getNumEntries();
IResetableComparable[] entries = heap.getEntries();
- int io = 0;
for (int i = 0; i < numEntries; i++) {
HeapEntry minEntry = (HeapEntry) entries[i];
bufferAccessor1.reset(minEntry.tuplePointer);
@@ -296,14 +291,10 @@
bufferAccessor1.getTupleStartOffset(), bufferAccessor1.getTupleLength());
if (flushed > 0) {
maxFrameSize = Math.max(maxFrameSize, flushed);
- io++;
}
}
maxFrameSize = Math.max(maxFrameSize, outputFrame.getFrameSize());
outputAppender.write(writer, true);
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("Flushed records:" + numEntries + "; Flushed through " + (io + 1) + " frames");
- }
return maxFrameSize;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
index be22b9c..7a75a0f 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
@@ -34,7 +34,9 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.ClusterCapacity;
import org.apache.hyracks.api.job.resource.IJobCapacityController;
+import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity;
import org.apache.hyracks.api.result.IResultSet;
import org.apache.hyracks.api.result.IResultSetReader;
import org.apache.hyracks.client.result.ResultSet;
@@ -254,6 +256,14 @@
public void release(JobSpecification job) {
}
+
+ @Override
+ public IReadOnlyClusterCapacity getClusterCapacity() {
+ ClusterCapacity clusterCapacity = new ClusterCapacity();
+ clusterCapacity.setAggregatedMemoryByteSize(maxRAM);
+ clusterCapacity.setAggregatedCores(Integer.MAX_VALUE);
+ return clusterCapacity;
+ }
};
}
}