[NO ISSUE] Minor refactoring, fixes, utility functions
Change-Id: I1843705a5a934bb4814ea7e3239d970f47c298f3
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/8763
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java
index 1f3daac..26e7fd5 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java
@@ -52,7 +52,7 @@
*/
SUSPENDING,
/**
- * The activitiy has been suspended successfully. Next state must be resuming
+ * The activity has been suspended successfully. Next state must be resuming
*/
SUSPENDED,
/**
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index 276a04b..0f5dec7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -23,6 +23,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
@@ -85,7 +86,7 @@
protected final MetadataProvider metadataProvider;
protected final IHyracksClientConnection hcc;
protected final EntityId entityId;
- private final List<Dataset> datasets;
+ private final Set<Dataset> datasets;
protected final ActiveEvent statsUpdatedEvent;
protected final String runtimeName;
protected final IRetryPolicyFactory retryPolicyFactory;
@@ -118,7 +119,7 @@
this.metadataProvider = MetadataProvider.create(appCtx, null);
this.hcc = hcc;
this.entityId = entityId;
- this.datasets = datasets;
+ this.datasets = new HashSet<>(datasets);
this.retryPolicyFactory = retryPolicyFactory;
this.state = ActivityState.STOPPED;
this.statsTimestamp = -1;
@@ -251,19 +252,19 @@
}
@Override
- public synchronized void remove(Dataset dataset) throws HyracksDataException {
+ public synchronized boolean remove(Dataset dataset) throws HyracksDataException {
if (isActive()) {
throw new RuntimeDataException(ErrorCode.CANNOT_REMOVE_DATASET_FROM_ACTIVE_ENTITY, entityId, state);
}
- getDatasets().remove(dataset);
+ return getDatasets().remove(dataset);
}
@Override
- public synchronized void add(Dataset dataset) throws HyracksDataException {
+ public synchronized boolean add(Dataset dataset) throws HyracksDataException {
if (isActive()) {
throw new RuntimeDataException(ErrorCode.CANNOT_ADD_DATASET_TO_ACTIVE_ENTITY, entityId, state);
}
- getDatasets().add(dataset);
+ return getDatasets().add(dataset);
}
public JobId getJobId() {
@@ -676,7 +677,7 @@
}
@Override
- public List<Dataset> getDatasets() {
+ public Set<Dataset> getDatasets() {
return datasets;
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
index 0a7cad6..8b74e07 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
@@ -22,6 +22,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.asterix.active.ActiveEvent;
import org.apache.asterix.active.ActiveEvent.Kind;
@@ -287,7 +288,7 @@
}
LOGGER.log(level, "Acquiring locks");
lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(), dataverseName, entityName);
- List<Dataset> datasets = ((ActiveEntityEventsListener) listener).getDatasets();
+ Set<Dataset> datasets = ((ActiveEntityEventsListener) listener).getDatasets();
for (Dataset dataset : datasets) {
if (targetDataset != null && targetDataset.equals(dataset)) {
// DDL operation already acquired the proper lock for the operation
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
index fe4cdc5..a18cd45 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
@@ -65,10 +65,11 @@
}
@Override
- public synchronized void remove(Dataset dataset) throws HyracksDataException {
+ public synchronized boolean remove(Dataset dataset) throws HyracksDataException {
super.remove(dataset);
feedConnections.removeIf(o -> o.getDataverseName().equals(dataset.getDataverseName())
&& o.getDatasetName().equals(dataset.getDatasetName()));
+ return false;
}
public synchronized void addFeedConnection(FeedConnection feedConnection) {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestUserActor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestUserActor.java
index 1e2a795..bc2c110 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestUserActor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestUserActor.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.test.active;
+import java.util.Collection;
import java.util.List;
import java.util.concurrent.Semaphore;
@@ -54,7 +55,7 @@
String entityName = actionListener.getEntityId().getEntityName();
try {
lockManager.acquireActiveEntityWriteLock(mdProvider.getLocks(), dataverseName, entityName);
- List<Dataset> datasets = actionListener.getDatasets();
+ Collection<Dataset> datasets = actionListener.getDatasets();
for (Dataset dataset : datasets) {
lockUtil.modifyDatasetBegin(lockManager, mdProvider.getLocks(), dataset.getDataverseName(),
dataset.getDatasetName());
@@ -77,7 +78,7 @@
String entityName = actionListener.getEntityId().getEntityName();
try {
lockManager.acquireActiveEntityWriteLock(mdProvider.getLocks(), dataverseName, entityName);
- List<Dataset> datasets = actionListener.getDatasets();
+ Collection<Dataset> datasets = actionListener.getDatasets();
for (Dataset dataset : datasets) {
lockUtil.modifyDatasetBegin(lockManager, mdProvider.getLocks(), dataset.getDataverseName(),
dataset.getDatasetName());
@@ -98,7 +99,7 @@
protected void doExecute(MetadataProvider mdProvider) throws Exception {
DataverseName dataverseName = actionListener.getEntityId().getDataverseName();
String entityName = actionListener.getEntityId().getEntityName();
- List<Dataset> datasets = actionListener.getDatasets();
+ Collection<Dataset> datasets = actionListener.getDatasets();
try {
lockManager.acquireActiveEntityWriteLock(mdProvider.getLocks(), dataverseName, entityName);
for (Dataset dataset : datasets) {
@@ -125,7 +126,7 @@
String entityName = actionListener.getEntityId().getEntityName();
try {
lockManager.acquireActiveEntityWriteLock(mdProvider.getLocks(), dataverseName, entityName);
- List<Dataset> datasets = actionListener.getDatasets();
+ Collection<Dataset> datasets = actionListener.getDatasets();
for (Dataset dataset : datasets) {
lockManager.upgradeDatasetLockToWrite(mdProvider.getLocks(), dataset.getDataverseName(),
dataset.getDatasetName());
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IActiveEntityController.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IActiveEntityController.java
index baface2..c355b43 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IActiveEntityController.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IActiveEntityController.java
@@ -18,7 +18,7 @@
*/
package org.apache.asterix.metadata.api;
-import java.util.List;
+import java.util.Set;
import org.apache.asterix.active.IActiveEntityEventsListener;
import org.apache.asterix.metadata.declared.MetadataProvider;
@@ -76,30 +76,32 @@
*
* @param dataset
* the dataset to add
+ * @return <code>true</code> if the active entity did not already contain the dataset
* @throws HyracksDataException
* if the entity is active
*/
- void add(Dataset dataset) throws HyracksDataException;
+ boolean add(Dataset dataset) throws HyracksDataException;
/**
* Remove dataset from the list of associated datasets
*
* @param dataset
* the dataset to add
+ * @return <code>true</code> if the active entity contained the dataset
* @throws HyracksDataException
* if the entity is active
*/
- void remove(Dataset dataset) throws HyracksDataException;
+ boolean remove(Dataset dataset) throws HyracksDataException;
/**
* @return the list of associated datasets
*/
- List<Dataset> getDatasets();
+ Set<Dataset> getDatasets();
/**
* replace the dataset object with the passed updated object
*
- * @param target
+ * @param dataset
*/
void replace(Dataset dataset);
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
index 46a9b66..7b9f632 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
@@ -20,7 +20,6 @@
import java.io.IOException;
import java.io.Serializable;
-import java.util.Objects;
import org.apache.hyracks.api.util.ErrorMessageUtil;
@@ -154,11 +153,6 @@
return msgCache;
}
- public boolean matches(String component, int errorCode) {
- Objects.requireNonNull(component, "component");
- return component.equals(this.component) && errorCode == this.errorCode;
- }
-
@Override
public String toString() {
return getLocalizedMessage();
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/IFormattedException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/IFormattedException.java
index 0f873e1..f8b017b 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/IFormattedException.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/IFormattedException.java
@@ -18,6 +18,8 @@
*/
package org.apache.hyracks.api.exceptions;
+import java.util.Objects;
+
public interface IFormattedException {
/**
@@ -40,4 +42,27 @@
* @return the exception message
*/
String getMessage();
+
+ /**
+ * Tests for matching component & errorCode against this exception
+ *
+ * @param component the component to match
+ * @param errorCode the errorCode to match
+ * @return <code>true</code> if this {@link IFormattedException} instance matches the supplied parameters
+ */
+ default boolean matches(String component, int errorCode) {
+ Objects.requireNonNull(component, "component");
+ return component.equals(getComponent()) && errorCode == getErrorCode();
+ }
+
+ /**
+ * Tests for matching component & errorCode against supplied throwable
+ *
+ * @param component the component to match
+ * @param errorCode the errorCode to match
+ * @return <code>true</code> if the supplied {@link Throwable} matches the supplied parameters
+ */
+ static boolean matches(Throwable th, String component, int errorCode) {
+ return th instanceof IFormattedException && ((IFormattedException) th).matches(component, errorCode);
+ }
}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
index 5d52ed9..6ecb677 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
@@ -270,7 +270,7 @@
IDelay delay, IFailedAttemptCallback onFailure) throws HyracksDataException {
Throwable failure;
int attempt = 0;
- while (true) {
+ while (!Thread.currentThread().isInterrupted()) {
attempt++;
try {
return action.compute();
@@ -291,6 +291,7 @@
}
}
}
+ throw HyracksDataException.create(new InterruptedException());
}
@FunctionalInterface
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
index cefada7..1cfc14c 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
@@ -70,7 +70,7 @@
@Override
public int getTupleStartOffset(int tupleIndex) {
int offset = tupleIndex == 0 ? FrameConstants.TUPLE_START_OFFSET
- : IntSerDeUtils.getInt(buffer.array(), tupleCountOffset - 4 * tupleIndex);
+ : IntSerDeUtils.getInt(buffer.array(), tupleCountOffset - FrameConstants.SIZE_LEN * tupleIndex);
return start + offset;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingConsumer.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingConsumer.java
index be9874a..4f73680 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingConsumer.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingConsumer.java
@@ -40,4 +40,10 @@
};
}
+ static <R> ThrowingFunction<R, Void> asFunction(ThrowingConsumer<R> consumer) {
+ return input -> {
+ consumer.process(input);
+ return null;
+ };
+ }
}