[ASTERIXDB-2008][CLUS] Only add pending removal if node known
[ASTERIXDB-2023][ING] Introduce Enums instead of using bytes
- user model changes: no
- storage format changes: no
- interface changes: no
details:
- Only nodes which are known to cluster manager are added
to the list of nodes pending removal. Other nodes are ignored
- Enums introduced:
- ActiveEvent.Kind
- ActivePartitionMessage.Event
- Remove AdapterRuntimeManager
- Remove AdapterExecutor
Change-Id: I7044896559798426c04a3f46861bc5335b25d140
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1921
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-active/pom.xml b/asterixdb/asterix-active/pom.xml
index 3dd24b6..6568795 100644
--- a/asterixdb/asterix-active/pom.xml
+++ b/asterixdb/asterix-active/pom.xml
@@ -31,10 +31,6 @@
<version>${project.version}</version>
</dependency>
<dependency>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </dependency>
- <dependency>
<groupId>org.apache.hyracks</groupId>
<artifactId>hyracks-api</artifactId>
</dependency>
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
index fcf2be9..c0717b9 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
@@ -23,14 +23,16 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import org.apache.asterix.active.message.ActiveManagerMessage;
import org.apache.asterix.active.message.ActiveStatsResponse;
import org.apache.asterix.active.message.StatsRequestMessage;
-import org.apache.asterix.common.api.ThreadExecutor;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.memory.ConcurrentFramePool;
@@ -38,21 +40,20 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.util.JavaSerializationUtils;
import org.apache.hyracks.control.nc.NodeControllerService;
-import org.apache.log4j.Logger;
public class ActiveManager {
private static final Logger LOGGER = Logger.getLogger(ActiveManager.class.getName());
private static final int SHUTDOWN_TIMEOUT_SECS = 60;
- private final ThreadExecutor executor;
+ private final ExecutorService executor;
private final ConcurrentMap<ActiveRuntimeId, IActiveRuntime> runtimes;
private final ConcurrentFramePool activeFramePool;
private final String nodeId;
private final INCServiceContext serviceCtx;
private volatile boolean shutdown;
- public ActiveManager(ThreadExecutor executor, String nodeId, long activeMemoryBudget, int frameSize,
+ public ActiveManager(ExecutorService executor, String nodeId, long activeMemoryBudget, int frameSize,
INCServiceContext serviceCtx) throws HyracksDataException {
this.executor = executor;
this.nodeId = nodeId;
@@ -86,15 +87,16 @@
}
public void submit(ActiveManagerMessage message) throws HyracksDataException {
+ LOGGER.log(Level.INFO, "Message of type " + message.getKind() + " received in " + nodeId);
switch (message.getKind()) {
- case ActiveManagerMessage.STOP_ACTIVITY:
+ case STOP_ACTIVITY:
stopRuntime(message);
break;
- case ActiveManagerMessage.REQUEST_STATS:
+ case REQUEST_STATS:
requestStats((StatsRequestMessage) message);
break;
default:
- LOGGER.warn("Unknown message type received: " + message.getKind());
+ LOGGER.warning("Unknown message type received: " + message.getKind());
}
}
@@ -104,7 +106,7 @@
IActiveRuntime runtime = runtimes.get(runtimeId);
long reqId = message.getReqId();
if (runtime == null) {
- LOGGER.warn("Request stats of a runtime that is not registered " + runtimeId);
+ LOGGER.warning("Request stats of a runtime that is not registered " + runtimeId);
// Send a failure message
((NodeControllerService) serviceCtx.getControllerService())
.sendApplicationMessageToCC(
@@ -124,7 +126,7 @@
}
public void shutdown() {
- LOGGER.warn("Shutting down ActiveManager on node " + nodeId);
+ LOGGER.warning("Shutting down ActiveManager on node " + nodeId);
Map<ActiveRuntimeId, Future<Void>> stopFutures = new HashMap<>();
shutdown = true;
runtimes.forEach((runtimeId, runtime) -> stopFutures.put(runtimeId, executor.submit(() -> {
@@ -136,29 +138,29 @@
try {
entry.getValue().get(SHUTDOWN_TIMEOUT_SECS, TimeUnit.SECONDS);
} catch (InterruptedException e) {
- LOGGER.warn("Interrupted waiting to stop runtime: " + entry.getKey());
+ LOGGER.warning("Interrupted waiting to stop runtime: " + entry.getKey());
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
- LOGGER.warn("Exception while stopping runtime: " + entry.getKey(), e);
+ LOGGER.log(Level.WARNING, "Exception while stopping runtime: " + entry.getKey(), e);
} catch (TimeoutException e) {
- LOGGER.warn("Timed out waiting to stop runtime: " + entry.getKey(), e);
+ LOGGER.log(Level.WARNING, "Timed out waiting to stop runtime: " + entry.getKey(), e);
}
});
- LOGGER.warn("Shutdown ActiveManager on node " + nodeId + " complete");
+ LOGGER.warning("Shutdown ActiveManager on node " + nodeId + " complete");
}
private void stopRuntime(ActiveManagerMessage message) {
ActiveRuntimeId runtimeId = (ActiveRuntimeId) message.getPayload();
IActiveRuntime runtime = runtimes.get(runtimeId);
if (runtime == null) {
- LOGGER.warn("Request to stop a runtime that is not registered " + runtimeId);
+ LOGGER.warning("Request to stop a runtime that is not registered " + runtimeId);
} else {
executor.execute(() -> {
try {
stopIfRunning(runtimeId, runtime);
} catch (Exception e) {
// TODO(till) Figure out a better way to handle failure to stop a runtime
- LOGGER.warn("Failed to stop runtime: " + runtimeId, e);
+ LOGGER.log(Level.WARNING, "Failed to stop runtime: " + runtimeId, e);
}
});
}
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
index a7d7796..27ecb52 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
@@ -22,6 +22,7 @@
import java.util.logging.Logger;
import org.apache.asterix.active.message.ActivePartitionMessage;
+import org.apache.asterix.active.message.ActivePartitionMessage.Event;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -36,6 +37,7 @@
protected final IHyracksTaskContext ctx;
protected final ActiveManager activeManager;
/** A unique identifier for the runtime **/
+ protected Thread taskThread;
protected final ActiveRuntimeId runtimeId;
private volatile boolean done = false;
@@ -85,11 +87,12 @@
@Override
public final void initialize() throws HyracksDataException {
LOGGER.log(Level.INFO, "initialize() called on ActiveSourceOperatorNodePushable");
+ taskThread = Thread.currentThread();
activeManager.registerRuntime(this);
try {
// notify cc that runtime has been registered
ctx.sendApplicationMessageToCC(new ActivePartitionMessage(runtimeId, ctx.getJobletContext().getJobId(),
- ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED, null), null);
+ Event.RUNTIME_REGISTERED, null), null);
start();
} catch (InterruptedException e) {
LOGGER.log(Level.INFO, "initialize() interrupted on ActiveSourceOperatorNodePushable", e);
@@ -112,7 +115,7 @@
activeManager.deregisterRuntime(runtimeId);
try {
ctx.sendApplicationMessageToCC(new ActivePartitionMessage(runtimeId, ctx.getJobletContext().getJobId(),
- ActivePartitionMessage.ACTIVE_RUNTIME_DEREGISTERED, null), null);
+ Event.RUNTIME_DEREGISTERED, null), null);
} catch (Exception e) {
LOGGER.log(Level.INFO, "deinitialize() failed on ActiveSourceOperatorNodePushable", e);
throw HyracksDataException.create(e);
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java
index 0a36216..de6682d 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java
@@ -18,15 +18,12 @@
*/
package org.apache.asterix.active;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.hyracks.api.exceptions.HyracksDataException;
public abstract class SingleThreadEventProcessor<T> implements Runnable {
@@ -34,20 +31,20 @@
private static final Logger LOGGER = Logger.getLogger(SingleThreadEventProcessor.class.getName());
private final String name;
private final LinkedBlockingQueue<T> eventInbox;
- private final ExecutorService executorService;
- private final Future<?> future;
+ private volatile Thread executorThread;
+ private volatile boolean stopped = false;
public SingleThreadEventProcessor(String threadName) {
this.name = threadName;
eventInbox = new LinkedBlockingQueue<>();
- executorService = Executors.newSingleThreadExecutor(r -> new Thread(r, threadName));
- future = executorService.submit(this);
+ executorThread = new Thread(this, threadName);
+ executorThread.start();
}
@Override
public final void run() {
LOGGER.log(Level.INFO, "Started " + Thread.currentThread().getName());
- while (!Thread.currentThread().isInterrupted()) {
+ while (!stopped) {
try {
T event = eventInbox.take();
handle(event);
@@ -69,10 +66,19 @@
}
public void stop() throws HyracksDataException, InterruptedException {
- future.cancel(true);
- executorService.shutdown();
- if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
- throw HyracksDataException.create(ErrorCode.FAILED_TO_SHUTDOWN_EVENT_PROCESSOR, name);
+ stopped = true;
+ executorThread.interrupt();
+ executorThread.join(1000);
+ int attempt = 0;
+ while (executorThread.isAlive()) {
+ attempt++;
+ LOGGER.log(Level.WARNING,
+ "Failed to stop event processor after " + attempt + " attempts. Interrupted exception swallowed?");
+ if (attempt == 10) {
+ throw new RuntimeDataException(ErrorCode.FAILED_TO_SHUTDOWN_EVENT_PROCESSOR, name);
+ }
+ executorThread.interrupt();
+ executorThread.join(1000);
}
}
}
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
index 9772698..bef418b 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
@@ -26,14 +26,16 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
public class ActiveManagerMessage implements INcAddressedMessage {
- public static final byte STOP_ACTIVITY = 0x00;
- public static final byte REQUEST_STATS = 0x01;
+ public enum Kind {
+ STOP_ACTIVITY,
+ REQUEST_STATS
+ }
private static final long serialVersionUID = 1L;
- private final byte kind;
+ private final Kind kind;
private final Serializable payload;
- public ActiveManagerMessage(byte kind, Serializable payload) {
+ public ActiveManagerMessage(Kind kind, Serializable payload) {
this.kind = kind;
this.payload = payload;
}
@@ -42,7 +44,7 @@
return payload;
}
- public byte getKind() {
+ public Kind getKind() {
return kind;
}
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
index a47d5a5..9ace417 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
@@ -29,17 +29,19 @@
import org.apache.hyracks.api.job.JobId;
public class ActivePartitionMessage implements ICcAddressedMessage {
+ public enum Event {
+ RUNTIME_REGISTERED,
+ RUNTIME_DEREGISTERED,
+ GENERIC_EVENT
+ }
private static final long serialVersionUID = 1L;
- public static final byte ACTIVE_RUNTIME_REGISTERED = 0x00;
- public static final byte ACTIVE_RUNTIME_DEREGISTERED = 0x01;
- public static final byte GENERIC_EVENT = 0x02;
private final ActiveRuntimeId activeRuntimeId;
private final JobId jobId;
private final Serializable payload;
- private final byte event;
+ private final Event event;
- public ActivePartitionMessage(ActiveRuntimeId activeRuntimeId, JobId jobId, byte event, Serializable payload) {
+ public ActivePartitionMessage(ActiveRuntimeId activeRuntimeId, JobId jobId, Event event, Serializable payload) {
this.activeRuntimeId = activeRuntimeId;
this.jobId = jobId;
this.event = event;
@@ -58,7 +60,7 @@
return payload;
}
- public byte getEvent() {
+ public Event getEvent() {
return event;
}
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java
index 8fa5f19..d43f00e 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java
@@ -24,8 +24,8 @@
private static final long serialVersionUID = 1L;
private final long reqId;
- public StatsRequestMessage(byte kind, Serializable payload, long reqId) {
- super(kind, payload);
+ public StatsRequestMessage(Serializable payload, long reqId) {
+ super(Kind.REQUEST_STATS, payload);
this.reqId = reqId;
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AppRuntimeContextProviderForRecovery.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AppRuntimeContextProviderForRecovery.java
index 1fea840..18ef143 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AppRuntimeContextProviderForRecovery.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AppRuntimeContextProviderForRecovery.java
@@ -18,10 +18,11 @@
*/
package org.apache.asterix.api.common;
+import java.util.concurrent.ExecutorService;
+
import org.apache.asterix.app.nc.NCAppRuntimeContext;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.api.ThreadExecutor;
import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
import org.apache.hyracks.api.io.IIOManager;
@@ -84,7 +85,7 @@
}
@Override
- public ThreadExecutor getThreadExecutor() {
+ public ExecutorService getThreadExecutor() {
return asterixAppRuntimeContext.getThreadExecutor();
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index 995e372..acb1614 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -38,8 +38,8 @@
import org.apache.asterix.active.IRetryPolicy;
import org.apache.asterix.active.IRetryPolicyFactory;
import org.apache.asterix.active.NoRetryPolicyFactory;
-import org.apache.asterix.active.message.ActiveManagerMessage;
import org.apache.asterix.active.message.ActivePartitionMessage;
+import org.apache.asterix.active.message.ActivePartitionMessage.Event;
import org.apache.asterix.active.message.StatsRequestMessage;
import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
import org.apache.asterix.common.api.IMetadataLockManager;
@@ -68,6 +68,7 @@
public abstract class ActiveEntityEventsListener implements IActiveEntityController {
private static final Logger LOGGER = Logger.getLogger(ActiveEntityEventsListener.class.getName());
+ private static final Level level = Level.INFO;
private static final ActiveEvent STATE_CHANGED = new ActiveEvent(null, Kind.STATE_CHANGED, null, null);
private static final EnumSet<ActivityState> TRANSITION_STATES = EnumSet.of(ActivityState.RESUMING,
ActivityState.STARTING, ActivityState.STOPPING, ActivityState.RECOVERING);
@@ -130,7 +131,7 @@
}
protected synchronized void setState(ActivityState newState) {
- LOGGER.log(Level.FINE, "State is being set to " + newState + " from " + state);
+ LOGGER.log(level, "State of " + getEntityId() + "is being set to " + newState + " from " + state);
this.prevState = state;
this.state = newState;
if (newState == ActivityState.SUSPENDED) {
@@ -142,7 +143,7 @@
@Override
public synchronized void notify(ActiveEvent event) {
try {
- LOGGER.fine("EventListener is notified.");
+ LOGGER.log(level, "EventListener is notified.");
ActiveEvent.Kind eventKind = event.getEventKind();
switch (eventKind) {
case JOB_CREATED:
@@ -172,22 +173,24 @@
}
protected synchronized void handle(ActivePartitionMessage message) {
- if (message.getEvent() == ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED) {
+ if (message.getEvent() == Event.RUNTIME_REGISTERED) {
numRegistered++;
if (numRegistered == locations.getLocations().length) {
setState(ActivityState.RUNNING);
}
- } else if (message.getEvent() == ActivePartitionMessage.ACTIVE_RUNTIME_DEREGISTERED) {
+ } else if (message.getEvent() == Event.RUNTIME_DEREGISTERED) {
numRegistered--;
}
}
@SuppressWarnings("unchecked")
protected void finish(ActiveEvent event) throws HyracksDataException {
+ LOGGER.log(level, "the job " + jobId + " finished");
jobId = null;
Pair<JobStatus, List<Exception>> status = (Pair<JobStatus, List<Exception>>) event.getEventObject();
JobStatus jobStatus = status.getLeft();
List<Exception> exceptions = status.getRight();
+ LOGGER.log(level, "The job finished with status: " + jobStatus);
if (jobStatus.equals(JobStatus.FAILURE)) {
jobFailure = exceptions.isEmpty() ? new RuntimeDataException(ErrorCode.UNREPORTED_TASK_FAILURE_EXCEPTION)
: exceptions.get(0);
@@ -271,10 +274,10 @@
@SuppressWarnings("unchecked")
@Override
public void refreshStats(long timeout) throws HyracksDataException {
- LOGGER.log(Level.FINE, "refreshStats called");
+ LOGGER.log(level, "refreshStats called");
synchronized (this) {
if (state != ActivityState.RUNNING || isFetchingStats) {
- LOGGER.log(Level.FINE,
+ LOGGER.log(level,
"returning immediately since state = " + state + " and fetchingStats = " + isFetchingStats);
return;
} else {
@@ -287,8 +290,7 @@
List<INcAddressedMessage> requests = new ArrayList<>();
List<String> ncs = Arrays.asList(locations.getLocations());
for (int i = 0; i < ncs.size(); i++) {
- requests.add(new StatsRequestMessage(ActiveManagerMessage.REQUEST_STATS,
- new ActiveRuntimeId(entityId, runtimeName, i), reqId));
+ requests.add(new StatsRequestMessage(new ActiveRuntimeId(entityId, runtimeName, i), reqId));
}
try {
List<String> responses = (List<String>) messageBroker.sendSyncRequestToNCs(reqId, ncs, requests, timeout);
@@ -348,32 +350,32 @@
@Override
public synchronized void recover() throws HyracksDataException {
- LOGGER.log(Level.FINE, "Recover is called on " + entityId);
+ LOGGER.log(level, "Recover is called on " + entityId);
if (recoveryTask != null) {
- LOGGER.log(Level.FINE, "But recovery task for " + entityId + " is already there!! throwing an exception");
+ LOGGER.log(level, "But recovery task for " + entityId + " is already there!! throwing an exception");
throw new RuntimeDataException(ErrorCode.DOUBLE_RECOVERY_ATTEMPTS);
}
if (retryPolicyFactory == NoRetryPolicyFactory.INSTANCE) {
- LOGGER.log(Level.FINE, "But it has no recovery policy, so it is set to permanent failure");
+ LOGGER.log(level, "But it has no recovery policy, so it is set to permanent failure");
setState(ActivityState.PERMANENTLY_FAILED);
} else {
ExecutorService executor = appCtx.getServiceContext().getControllerService().getExecutor();
IRetryPolicy policy = retryPolicyFactory.create(this);
cancelRecovery = false;
setState(ActivityState.TEMPORARILY_FAILED);
- LOGGER.log(Level.FINE, "Recovery task has been submitted");
+ LOGGER.log(level, "Recovery task has been submitted");
recoveryTask = executor.submit(() -> doRecover(policy));
}
}
protected Void doRecover(IRetryPolicy policy)
throws AlgebricksException, HyracksDataException, InterruptedException {
- LOGGER.log(Level.FINE, "Actual Recovery task has started");
+ LOGGER.log(level, "Actual Recovery task has started");
if (getState() != ActivityState.TEMPORARILY_FAILED) {
- LOGGER.log(Level.FINE, "but its state is not temp failure and so we're just returning");
+ LOGGER.log(level, "but its state is not temp failure and so we're just returning");
return null;
}
- LOGGER.log(Level.FINE, "calling the policy");
+ LOGGER.log(level, "calling the policy");
while (policy.retry()) {
synchronized (this) {
if (cancelRecovery) {
@@ -402,7 +404,7 @@
doStart(metadataProvider);
return null;
} catch (Exception e) {
- LOGGER.log(Level.WARNING, "Attempt to revive " + entityId + " failed", e);
+ LOGGER.log(level, "Attempt to revive " + entityId + " failed", e);
setState(ActivityState.TEMPORARILY_FAILED);
recoverFailure = e;
} finally {
@@ -515,10 +517,10 @@
WaitForStateSubscriber subscriber;
Future<Void> suspendTask;
synchronized (this) {
- LOGGER.log(Level.FINE, "suspending entity " + entityId);
- LOGGER.log(Level.FINE, "Waiting for ongoing activities");
+ LOGGER.log(level, "suspending entity " + entityId);
+ LOGGER.log(level, "Waiting for ongoing activities");
waitForNonTransitionState();
- LOGGER.log(Level.FINE, "Proceeding with suspension. Current state is " + state);
+ LOGGER.log(level, "Proceeding with suspension. Current state is " + state);
if (state == ActivityState.STOPPED || state == ActivityState.PERMANENTLY_FAILED) {
suspended = true;
return;
@@ -536,12 +538,12 @@
EnumSet.of(ActivityState.SUSPENDED, ActivityState.TEMPORARILY_FAILED));
suspendTask = metadataProvider.getApplicationContext().getServiceContext().getControllerService()
.getExecutor().submit(() -> doSuspend(metadataProvider));
- LOGGER.log(Level.FINE, "Suspension task has been submitted");
+ LOGGER.log(level, "Suspension task has been submitted");
}
try {
- LOGGER.log(Level.FINE, "Waiting for suspension task to complete");
+ LOGGER.log(level, "Waiting for suspension task to complete");
suspendTask.get();
- LOGGER.log(Level.FINE, "waiting for state to become SUSPENDED or TEMPORARILY_FAILED");
+ LOGGER.log(level, "waiting for state to become SUSPENDED or TEMPORARILY_FAILED");
subscriber.sync();
} catch (Exception e) {
synchronized (this) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
index c5e5dbb..d36d9b7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
@@ -53,7 +53,7 @@
implements IActiveNotificationHandler, IJobLifecycleListener {
private static final Logger LOGGER = Logger.getLogger(ActiveNotificationHandler.class.getName());
- private static final boolean DEBUG = false;
+ private static final Level level = Level.INFO;
public static final String ACTIVE_ENTITY_PROPERTY_NAME = "ActiveJob";
private final Map<EntityId, IActiveEntityEventsListener> entityEventListeners;
private final Map<JobId, EntityId> jobId2EntityId;
@@ -73,13 +73,13 @@
EntityId entityId = jobId2EntityId.get(event.getJobId());
if (entityId != null) {
IActiveEntityEventsListener listener = entityEventListeners.get(entityId);
- LOGGER.log(Level.FINE, "Next event is of type " + event.getEventKind());
+ LOGGER.log(level, "Next event is of type " + event.getEventKind());
if (event.getEventKind() == Kind.JOB_FINISHED) {
- LOGGER.log(Level.FINE, "Removing the job");
+ LOGGER.log(level, "Removing the job");
jobId2EntityId.remove(event.getJobId());
}
if (listener != null) {
- LOGGER.log(Level.FINE, "Notifying the listener");
+ LOGGER.log(level, "Notifying the listener");
listener.notify(event);
}
} else {
@@ -91,34 +91,30 @@
@Override
public void notifyJobCreation(JobId jobId, JobSpecification jobSpecification) throws HyracksDataException {
- LOGGER.log(Level.FINE,
+ LOGGER.log(level,
"notifyJobCreation(JobId jobId, JobSpecification jobSpecification) was called with jobId = " + jobId);
Object property = jobSpecification.getProperty(ACTIVE_ENTITY_PROPERTY_NAME);
if (property == null || !(property instanceof EntityId)) {
- LOGGER.log(Level.FINE, "Job is not of type active job. property found to be: " + property);
+ LOGGER.log(level, "Job is not of type active job. property found to be: " + property);
return;
}
EntityId entityId = (EntityId) property;
monitorJob(jobId, entityId);
boolean found = jobId2EntityId.get(jobId) != null;
- LOGGER.log(Level.FINE, "Job was found to be: " + (found ? "Active" : "Inactive"));
+ LOGGER.log(level, "Job was found to be: " + (found ? "Active" : "Inactive"));
add(new ActiveEvent(jobId, Kind.JOB_CREATED, entityId, jobSpecification));
}
private synchronized void monitorJob(JobId jobId, EntityId entityId) {
- if (DEBUG) {
- LOGGER.log(Level.WARNING, "monitorJob(JobId jobId, ActiveJob activeJob) called with job id: " + jobId);
- boolean found = jobId2EntityId.get(jobId) != null;
- LOGGER.log(Level.WARNING, "Job was found to be: " + (found ? "Active" : "Inactive"));
- }
+ LOGGER.log(level, "monitorJob(JobId jobId, ActiveJob activeJob) called with job id: " + jobId);
+ boolean found = jobId2EntityId.get(jobId) != null;
+ LOGGER.log(level, "Job was found to be: " + (found ? "Active" : "Inactive"));
if (entityEventListeners.containsKey(entityId)) {
if (jobId2EntityId.containsKey(jobId)) {
LOGGER.severe("Job is already being monitored for job: " + jobId);
return;
}
- if (DEBUG) {
- LOGGER.log(Level.WARNING, "monitoring started for job id: " + jobId);
- }
+ LOGGER.log(level, "monitoring started for job id: " + jobId);
} else {
LOGGER.info("No listener was found for the entity: " + entityId);
}
@@ -140,9 +136,7 @@
if (entityId != null) {
add(new ActiveEvent(jobId, Kind.JOB_FINISHED, entityId, Pair.of(jobStatus, exceptions)));
} else {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("NO NEED TO NOTIFY JOB FINISH!");
- }
+ LOGGER.log(level, "NO NEED TO NOTIFY JOB FINISH!");
}
}
@@ -156,20 +150,16 @@
@Override
public IActiveEntityEventsListener getListener(EntityId entityId) {
- if (DEBUG) {
- LOGGER.log(Level.WARNING, "getActiveEntityListener(EntityId entityId) was called with entity " + entityId);
- IActiveEntityEventsListener listener = entityEventListeners.get(entityId);
- LOGGER.log(Level.WARNING, "Listener found: " + listener);
- }
+ LOGGER.log(level, "getActiveEntityListener(EntityId entityId) was called with entity " + entityId);
+ IActiveEntityEventsListener listener = entityEventListeners.get(entityId);
+ LOGGER.log(level, "Listener found: " + listener);
return entityEventListeners.get(entityId);
}
@Override
public synchronized IActiveEntityEventsListener[] getEventListeners() {
- if (DEBUG) {
- LOGGER.log(Level.WARNING, "getEventListeners() was called");
- LOGGER.log(Level.WARNING, "returning " + entityEventListeners.size() + " Listeners");
- }
+ LOGGER.log(level, "getEventListeners() was called");
+ LOGGER.log(level, "returning " + entityEventListeners.size() + " Listeners");
return entityEventListeners.values().toArray(new IActiveEntityEventsListener[entityEventListeners.size()]);
}
@@ -178,11 +168,8 @@
if (suspended) {
throw new RuntimeDataException(ErrorCode.ACTIVE_NOTIFICATION_HANDLER_IS_SUSPENDED);
}
- if (DEBUG) {
- LOGGER.log(Level.WARNING,
- "registerListener(IActiveEntityEventsListener listener) was called for the entity "
- + listener.getEntityId());
- }
+ LOGGER.log(level, "registerListener(IActiveEntityEventsListener listener) was called for the entity "
+ + listener.getEntityId());
if (entityEventListeners.containsKey(listener.getEntityId())) {
throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_IS_ALREADY_REGISTERED, listener.getEntityId());
}
@@ -194,7 +181,7 @@
if (suspended) {
throw new RuntimeDataException(ErrorCode.ACTIVE_NOTIFICATION_HANDLER_IS_SUSPENDED);
}
- LOGGER.log(Level.FINE, "unregisterListener(IActiveEntityEventsListener listener) was called for the entity "
+ LOGGER.log(level, "unregisterListener(IActiveEntityEventsListener listener) was called for the entity "
+ listener.getEntityId());
IActiveEntityEventsListener registeredListener = entityEventListeners.remove(listener.getEntityId());
if (registeredListener == null) {
@@ -221,16 +208,16 @@
@Override
public synchronized void recover() throws HyracksDataException {
- LOGGER.log(Level.FINE, "Starting active recovery");
+ LOGGER.log(level, "Starting active recovery");
for (IActiveEntityEventsListener listener : entityEventListeners.values()) {
synchronized (listener) {
- LOGGER.log(Level.FINE, "Entity " + listener.getEntityId() + " is " + listener.getStats());
+ LOGGER.log(level, "Entity " + listener.getEntityId() + " is " + listener.getStats());
if (listener.getState() == ActivityState.PERMANENTLY_FAILED
&& listener instanceof IActiveEntityController) {
- LOGGER.log(Level.FINE, "Recovering");
+ LOGGER.log(level, "Recovering");
((IActiveEntityController) listener).recover();
} else {
- LOGGER.log(Level.FINE, "Only notifying");
+ LOGGER.log(level, "Only notifying");
listener.notifyAll();
}
}
@@ -243,7 +230,7 @@
if (suspended) {
throw new RuntimeDataException(ErrorCode.ACTIVE_EVENT_HANDLER_ALREADY_SUSPENDED);
}
- LOGGER.log(Level.FINE, "Suspending active events handler");
+ LOGGER.log(level, "Suspending active events handler");
suspended = true;
}
IMetadataLockManager lockManager = mdProvider.getApplicationContext().getMetadataLockManager();
@@ -253,27 +240,27 @@
// exclusive lock all the datasets
String dataverseName = listener.getEntityId().getDataverse();
String entityName = listener.getEntityId().getEntityName();
- LOGGER.log(Level.FINE, "Suspending " + listener.getEntityId());
- LOGGER.log(Level.FINE, "Acquiring locks");
+ LOGGER.log(level, "Suspending " + listener.getEntityId());
+ LOGGER.log(level, "Acquiring locks");
lockManager.acquireActiveEntityWriteLock(mdProvider.getLocks(), dataverseName + '.' + entityName);
List<Dataset> datasets = ((ActiveEntityEventsListener) listener).getDatasets();
for (Dataset dataset : datasets) {
lockManager.acquireDatasetExclusiveModificationLock(mdProvider.getLocks(),
DatasetUtil.getFullyQualifiedName(dataset));
}
- LOGGER.log(Level.FINE, "locks acquired");
+ LOGGER.log(level, "locks acquired");
((ActiveEntityEventsListener) listener).suspend(mdProvider);
- LOGGER.log(Level.FINE, listener.getEntityId() + " suspended");
+ LOGGER.log(level, listener.getEntityId() + " suspended");
}
}
public void resume(MetadataProvider mdProvider)
throws AsterixException, HyracksDataException, InterruptedException {
- LOGGER.log(Level.FINE, "Resuming active events handler");
+ LOGGER.log(level, "Resuming active events handler");
for (IActiveEntityEventsListener listener : entityEventListeners.values()) {
- LOGGER.log(Level.FINE, "Resuming " + listener.getEntityId());
+ LOGGER.log(level, "Resuming " + listener.getEntityId());
((ActiveEntityEventsListener) listener).resume(mdProvider);
- LOGGER.log(Level.FINE, listener.getEntityId() + " resumed");
+ LOGGER.log(level, listener.getEntityId() + " resumed");
}
synchronized (this) {
suspended = false;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
index 45c79a0..124e56e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
@@ -114,7 +114,7 @@
@Override
protected Void doStop(MetadataProvider metadataProvider) throws HyracksDataException, AlgebricksException {
IActiveEntityEventSubscriber eventSubscriber =
- new WaitForStateSubscriber(this, Collections.singleton(ActivityState.STOPPED));
+ new WaitForStateSubscriber(this, EnumSet.of(ActivityState.STOPPED, ActivityState.PERMANENTLY_FAILED));
try {
// Construct ActiveMessage
for (int i = 0; i < getLocations().getLocations().length; i++) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
index 9faa9e9..e7919fa 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
@@ -137,7 +137,6 @@
GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, "Unexpected exception", e);
responseMsg.setError(new Exception(e.toString()));
}
-
try {
messageBroker.sendApplicationMessageToNC(responseMsg, requestNodeId);
} catch (Exception e) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 29bc95e..7647881 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -24,6 +24,8 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -31,7 +33,6 @@
import org.apache.asterix.api.common.AppRuntimeContextProviderForRecovery;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.api.ThreadExecutor;
import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.config.ActiveProperties;
import org.apache.asterix.common.config.AsterixExtension;
@@ -113,7 +114,7 @@
private ReplicationProperties replicationProperties;
private MessagingProperties messagingProperties;
private final NodeProperties nodeProperties;
- private ThreadExecutor threadExecutor;
+ private ExecutorService threadExecutor;
private IDatasetLifecycleManager datasetLifecycleManager;
private IBufferCache bufferCache;
private ITransactionSubsystem txnSubsystem;
@@ -164,7 +165,7 @@
@Override
public void initialize(boolean initialRun) throws IOException, ACIDException {
ioManager = getServiceContext().getIoManager();
- threadExecutor = new ThreadExecutor(getServiceContext().getThreadFactory());
+ threadExecutor = Executors.newCachedThreadPool(getServiceContext().getThreadFactory());
ICacheMemoryAllocator allocator = new HeapBufferAllocator();
IPageCleanerPolicy pcp = new DelayPageCleanerPolicy(600000);
IPageReplacementStrategy prs = new ClockPageReplacementStrategy(allocator,
@@ -383,7 +384,7 @@
}
@Override
- public ThreadExecutor getThreadExecutor() {
+ public ExecutorService getThreadExecutor() {
return threadExecutor;
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java
index 452d13e..56975d1 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java
@@ -107,7 +107,7 @@
}
app.append("\r\n");
} catch (IOException e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index 09c4983..9fc9940 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -31,6 +31,7 @@
import org.apache.asterix.active.ActiveRuntimeId;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.active.message.ActiveManagerMessage;
+import org.apache.asterix.active.message.ActiveManagerMessage.Kind;
import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
@@ -272,8 +273,9 @@
}
// make connections between operators
- for (Entry<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>,
- Pair<IOperatorDescriptor, Integer>>> entry : subJob.getConnectorOperatorMap().entrySet()) {
+ for (Entry<ConnectorDescriptorId,
+ Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>> entry
+ : subJob.getConnectorOperatorMap().entrySet()) {
ConnectorDescriptorId newId = connectorIdMapping.get(entry.getKey());
IConnectorDescriptor connDesc = jobSpec.getConnectorMap().get(newId);
Pair<IOperatorDescriptor, Integer> leftOp = entry.getValue().getLeft();
@@ -381,7 +383,7 @@
public static void SendStopMessageToNode(ICcApplicationContext appCtx, EntityId feedId, String intakeNodeLocation,
Integer partition) throws Exception {
- ActiveManagerMessage stopFeedMessage = new ActiveManagerMessage(ActiveManagerMessage.STOP_ACTIVITY,
+ ActiveManagerMessage stopFeedMessage = new ActiveManagerMessage(Kind.STOP_ACTIVITY,
new ActiveRuntimeId(feedId, FeedIntakeOperatorNodePushable.class.getSimpleName(), partition));
SendActiveMessage(appCtx, stopFeedMessage, intakeNodeLocation);
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Action.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Action.java
index 71cb038..74c4364 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Action.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Action.java
@@ -21,7 +21,7 @@
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-abstract class Action {
+public abstract class Action {
boolean done = false;
HyracksDataException failure;
@@ -39,21 +39,21 @@
protected abstract void doExecute(MetadataProvider mdProvider) throws Exception;
- boolean hasFailed() {
+ public boolean hasFailed() {
return failure != null;
}
- HyracksDataException getFailure() {
+ public HyracksDataException getFailure() {
return failure;
}
- synchronized void sync() throws InterruptedException {
+ public synchronized void sync() throws InterruptedException {
while (!done) {
wait();
}
}
- boolean isDone() {
+ public boolean isDone() {
return done;
}
}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
index f8baa0e..e1fdb69 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
@@ -30,6 +30,7 @@
import org.apache.asterix.active.IActiveRuntime;
import org.apache.asterix.active.NoRetryPolicyFactory;
import org.apache.asterix.active.message.ActivePartitionMessage;
+import org.apache.asterix.active.message.ActivePartitionMessage.Event;
import org.apache.asterix.algebra.base.ILangExtension.Language;
import org.apache.asterix.app.active.ActiveEntityEventsListener;
import org.apache.asterix.app.active.ActiveNotificationHandler;
@@ -126,8 +127,8 @@
requestedStats = eventsListener.getStats();
Assert.assertTrue(requestedStats.contains("N/A"));
// Fake partition message and notify eventListener
- ActivePartitionMessage partitionMessage = new ActivePartitionMessage(activeRuntimeId, jobId,
- ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED, null);
+ ActivePartitionMessage partitionMessage =
+ new ActivePartitionMessage(activeRuntimeId, jobId, Event.RUNTIME_REGISTERED, null);
partitionMessage.handle(appCtx);
start.sync();
if (start.hasFailed()) {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java
index 3f68651..8d21b55 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java
@@ -21,7 +21,7 @@
import org.apache.asterix.active.SingleThreadEventProcessor;
import org.apache.asterix.metadata.declared.MetadataProvider;
-class Actor extends SingleThreadEventProcessor<Action> {
+public class Actor extends SingleThreadEventProcessor<Action> {
private final MetadataProvider actorMdProvider;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestNodeControllerActor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestNodeControllerActor.java
index e7e21b6..99499a3 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestNodeControllerActor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestNodeControllerActor.java
@@ -23,6 +23,7 @@
import org.apache.asterix.active.ActiveRuntimeId;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.active.message.ActivePartitionMessage;
+import org.apache.asterix.active.message.ActivePartitionMessage.Event;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.hyracks.api.job.JobId;
@@ -41,9 +42,8 @@
Action registration = new Action() {
@Override
protected void doExecute(MetadataProvider actorMdProvider) throws Exception {
- ActiveEvent event = new ActiveEvent(jobId, Kind.PARTITION_EVENT, entityId,
- new ActivePartitionMessage(new ActiveRuntimeId(entityId, id, partition), jobId,
- ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED, null));
+ ActiveEvent event = new ActiveEvent(jobId, Kind.PARTITION_EVENT, entityId, new ActivePartitionMessage(
+ new ActiveRuntimeId(entityId, id, partition), jobId, Event.RUNTIME_REGISTERED, null));
clusterController.activeEvent(event);
}
};
@@ -55,9 +55,8 @@
Action registration = new Action() {
@Override
protected void doExecute(MetadataProvider actorMdProvider) throws Exception {
- ActiveEvent event = new ActiveEvent(jobId, Kind.PARTITION_EVENT, entityId,
- new ActivePartitionMessage(new ActiveRuntimeId(entityId, id, partition), jobId,
- ActivePartitionMessage.ACTIVE_RUNTIME_DEREGISTERED, null));
+ ActiveEvent event = new ActiveEvent(jobId, Kind.PARTITION_EVENT, entityId, new ActivePartitionMessage(
+ new ActiveRuntimeId(entityId, id, partition), jobId, Event.RUNTIME_DEREGISTERED, null));
clusterController.activeEvent(event);
}
};
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index 1c29e86..a96adc0 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -103,13 +103,13 @@
// see
// https://stackoverflow.com/questions/417142/what-is-the-maximum-length-of-a-url-in-different-browsers/417184
private static final long MAX_URL_LENGTH = 2000l;
- private static final Pattern JAVA_BLOCK_COMMENT_PATTERN = Pattern.compile("/\\*.*\\*/",
- Pattern.MULTILINE | Pattern.DOTALL);
+ private static final Pattern JAVA_BLOCK_COMMENT_PATTERN =
+ Pattern.compile("/\\*.*\\*/", Pattern.MULTILINE | Pattern.DOTALL);
private static final Pattern JAVA_LINE_COMMENT_PATTERN = Pattern.compile("^//.*$", Pattern.MULTILINE);
private static final Pattern SHELL_LINE_COMMENT_PATTERN = Pattern.compile("^#.*$", Pattern.MULTILINE);
private static final Pattern REGEX_LINES_PATTERN = Pattern.compile("^(-)?/(.*)/([im]*)$");
- private static final Pattern POLL_TIMEOUT_PATTERN = Pattern.compile("polltimeoutsecs=(\\d+)(\\D|$)",
- Pattern.MULTILINE);
+ private static final Pattern POLL_TIMEOUT_PATTERN =
+ Pattern.compile("polltimeoutsecs=(\\d+)(\\D|$)", Pattern.MULTILINE);
private static final Pattern POLL_DELAY_PATTERN = Pattern.compile("polldelaysecs=(\\d+)(\\D|$)", Pattern.MULTILINE);
private static final Pattern HANDLE_VARIABLE_PATTERN = Pattern.compile("handlevariable=(\\w+)");
private static final Pattern VARIABLE_REF_PATTERN = Pattern.compile("\\$(\\w+)");
@@ -168,10 +168,10 @@
public void runScriptAndCompareWithResult(File scriptFile, PrintWriter print, File expectedFile, File actualFile,
ComparisonEnum compare) throws Exception {
System.err.println("Expected results file: " + expectedFile.toString());
- BufferedReader readerExpected = new BufferedReader(
- new InputStreamReader(new FileInputStream(expectedFile), "UTF-8"));
- BufferedReader readerActual = new BufferedReader(
- new InputStreamReader(new FileInputStream(actualFile), "UTF-8"));
+ BufferedReader readerExpected =
+ new BufferedReader(new InputStreamReader(new FileInputStream(expectedFile), "UTF-8"));
+ BufferedReader readerActual =
+ new BufferedReader(new InputStreamReader(new FileInputStream(actualFile), "UTF-8"));
boolean regex = false;
try {
if (ComparisonEnum.BINARY.equals(compare)) {
@@ -354,10 +354,10 @@
public void runScriptAndCompareWithResultRegex(File scriptFile, File expectedFile, File actualFile)
throws Exception {
String lineExpected, lineActual;
- try (BufferedReader readerExpected = new BufferedReader(
- new InputStreamReader(new FileInputStream(expectedFile), "UTF-8"));
- BufferedReader readerActual = new BufferedReader(
- new InputStreamReader(new FileInputStream(actualFile), "UTF-8"))) {
+ try (BufferedReader readerExpected =
+ new BufferedReader(new InputStreamReader(new FileInputStream(expectedFile), "UTF-8"));
+ BufferedReader readerActual =
+ new BufferedReader(new InputStreamReader(new FileInputStream(actualFile), "UTF-8"))) {
StringBuilder actual = new StringBuilder();
while ((lineActual = readerActual.readLine()) != null) {
actual.append(lineActual).append('\n');
@@ -534,7 +534,7 @@
return executeQueryService(str, fmt, uri, params, jsonEncoded, responseCodeValidator, false);
}
- protected InputStream executeQueryService(String str, OutputFormat fmt, URI uri, List<Parameter> params,
+ public InputStream executeQueryService(String str, OutputFormat fmt, URI uri, List<Parameter> params,
boolean jsonEncoded, Predicate<Integer> responseCodeValidator, boolean cancellable) throws Exception {
final List<Parameter> newParams = upsertParam(params, "format", fmt.mimeType());
HttpUriRequest method = jsonEncoded ? constructPostMethodJson(str, uri, "statement", newParams)
@@ -697,8 +697,8 @@
// Insert and Delete statements are executed here
public void executeUpdate(String str, URI uri) throws Exception {
// Create a method instance.
- HttpUriRequest request = RequestBuilder.post(uri).setEntity(new StringEntity(str, StandardCharsets.UTF_8))
- .build();
+ HttpUriRequest request =
+ RequestBuilder.post(uri).setEntity(new StringEntity(str, StandardCharsets.UTF_8)).build();
// Execute the method.
executeAndCheckHttpRequest(request);
@@ -708,10 +708,10 @@
public InputStream executeAnyAQLAsync(String statement, boolean defer, OutputFormat fmt, URI uri,
Map<String, Object> variableCtx) throws Exception {
// Create a method instance.
- HttpUriRequest request = RequestBuilder.post(uri)
- .addParameter("mode", defer ? "asynchronous-deferred" : "asynchronous")
- .setEntity(new StringEntity(statement, StandardCharsets.UTF_8)).setHeader("Accept", fmt.mimeType())
- .build();
+ HttpUriRequest request =
+ RequestBuilder.post(uri).addParameter("mode", defer ? "asynchronous-deferred" : "asynchronous")
+ .setEntity(new StringEntity(statement, StandardCharsets.UTF_8))
+ .setHeader("Accept", fmt.mimeType()).build();
String handleVar = getHandleVariable(statement);
@@ -737,8 +737,8 @@
// create function statement
public void executeDDL(String str, URI uri) throws Exception {
// Create a method instance.
- HttpUriRequest request = RequestBuilder.post(uri).setEntity(new StringEntity(str, StandardCharsets.UTF_8))
- .build();
+ HttpUriRequest request =
+ RequestBuilder.post(uri).setEntity(new StringEntity(str, StandardCharsets.UTF_8)).build();
// Execute the method.
executeAndCheckHttpRequest(request);
@@ -748,8 +748,8 @@
// and returns the contents as a string
// This string is later passed to REST API for execution.
public String readTestFile(File testFile) throws Exception {
- BufferedReader reader = new BufferedReader(
- new InputStreamReader(new FileInputStream(testFile), StandardCharsets.UTF_8));
+ BufferedReader reader =
+ new BufferedReader(new InputStreamReader(new FileInputStream(testFile), StandardCharsets.UTF_8));
String line;
StringBuilder stringBuilder = new StringBuilder();
String ls = System.getProperty("line.separator");
@@ -804,8 +804,8 @@
private static String getProcessOutput(Process p) throws Exception {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
- Future<Integer> future = Executors.newSingleThreadExecutor()
- .submit(() -> IOUtils.copy(p.getInputStream(), new OutputStream() {
+ Future<Integer> future =
+ Executors.newSingleThreadExecutor().submit(() -> IOUtils.copy(p.getInputStream(), new OutputStream() {
@Override
public void write(int b) throws IOException {
baos.write(b);
@@ -1357,7 +1357,7 @@
if (failedGroup != null) {
failedGroup.getTestCase().add(testCaseCtx.getTestCase());
}
- throw new Exception("Test \"" + testFile + "\" FAILED!");
+ throw new Exception("Test \"" + testFile + "\" FAILED!", e);
}
} finally {
if (numOfFiles == testFileCtxs.size()) {
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/feed-stats/feed-stats.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/feed-stats/feed-stats.1.adm
index d0b0ea0..0fcdf15 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/feed-stats/feed-stats.1.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/feed-stats/feed-stats.1.adm
@@ -4,8 +4,7 @@
"adapter-stats" : {
"incoming-records-count" : 13,
"failed-at-parser-records-count" : 3
- },
- "executor-restart-times" : 0
+ }
} ]
}
}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 3da58e9..8abbeab 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -438,14 +438,15 @@
try {
dsInfo.wait();
} catch (InterruptedException e) {
- throw new HyracksDataException(e);
+ Thread.currentThread().interrupt();
+ throw HyracksDataException.create(e);
}
}
}
try {
flushDatasetOpenIndexes(dsInfo, false);
} catch (Exception e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
for (IndexInfo iInfo : dsInfo.getIndexes().values()) {
if (iInfo.isOpen()) {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 94fe951..5aed5f2 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -228,6 +228,7 @@
public static final int ACTIVE_EVENT_HANDLER_ALREADY_SUSPENDED = 3107;
public static final int FEED_STOPPED_WHILE_WAITING_FOR_A_NEW_RECORD = 3108;
public static final int METADATA_DROP_FUCTION_IN_USE = 3109;
+ public static final int FEED_FAILED_WHILE_GETTING_A_NEW_RECORD = 3110;
// Lifecycle management errors
public static final int DUPLICATE_PARTITION_ID = 4000;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java
index a74898e..a4c41df 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java
@@ -120,7 +120,7 @@
try {
return new ObjectMapper().writeValueAsString(this);
} catch (JsonProcessingException e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
@@ -128,7 +128,7 @@
try {
return new ObjectMapper().readValue(json, Checkpoint.class);
} catch (IOException e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAppRuntimeContextProvider.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAppRuntimeContextProvider.java
index 49f5457..229fb6d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAppRuntimeContextProvider.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAppRuntimeContextProvider.java
@@ -18,9 +18,10 @@
*/
package org.apache.asterix.common.transactions;
+import java.util.concurrent.ExecutorService;
+
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.api.ThreadExecutor;
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
@@ -29,7 +30,7 @@
public interface IAppRuntimeContextProvider {
- ThreadExecutor getThreadExecutor();
+ ExecutorService getThreadExecutor();
IBufferCache getBufferCache();
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index 2e98abd..824e30b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -126,7 +126,7 @@
}
return file;
} catch (Exception e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
}
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 4cc06a6..a7a1990 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -217,6 +217,7 @@
3107 = Active Notification Handler is already suspended
3108 = Feed stopped while waiting for a new record
3109 = Function %1$s is being used. It cannot be dropped.
+3110 = Feed failed while reading a new record
# Lifecycle management errors
4000 = Partition id %1$d for node %2$s already in use by node %3$s
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java
index a31b46d..b51416a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java
@@ -74,7 +74,7 @@
return new LookupAdapter<>(dataParser, reader, inRecDesc, ridReader, retainInput, retainMissing,
isMissingWriterFactory, ctx, writer);
} catch (Exception e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
index def0bf1..7412338 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
@@ -18,27 +18,29 @@
*/
package org.apache.asterix.external.api;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+@FunctionalInterface
public interface IDataFlowController {
- //TODO: Refactor this interface. Remove writer from start() signature
public void start(IFrameWriter writer) throws HyracksDataException, InterruptedException;
public default boolean pause() throws HyracksDataException {
- throw new HyracksDataException("Method not implemented");
+ throw new RuntimeDataException(ErrorCode.OPERATION_NOT_SUPPORTED);
}
public default boolean resume() throws HyracksDataException {
- throw new HyracksDataException("Method not implemented");
+ throw new RuntimeDataException(ErrorCode.OPERATION_NOT_SUPPORTED);
}
public default void flush() throws HyracksDataException {
- throw new HyracksDataException("Method not implemented");
+ throw new RuntimeDataException(ErrorCode.OPERATION_NOT_SUPPORTED);
}
public default boolean stop() throws HyracksDataException {
- throw new HyracksDataException("Method not implemented");
+ throw new RuntimeDataException(ErrorCode.OPERATION_NOT_SUPPORTED);
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java
index e62672d..472cdae 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java
@@ -18,8 +18,6 @@
*/
package org.apache.asterix.external.api;
-import java.io.Serializable;
-
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -30,7 +28,7 @@
* adapter(pull or push).
*/
@FunctionalInterface
-public interface IDataSourceAdapter extends Serializable {
+public interface IDataSourceAdapter {
public enum AdapterType {
INTERNAL,
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
index c87fe2d..53fa137 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
@@ -57,14 +57,5 @@
tupleForwarder.flush();
}
- @Override
- public abstract boolean stop() throws HyracksDataException;
-
- public abstract boolean handleException(Throwable th) throws HyracksDataException;
-
public abstract String getStats();
-
- public void fail() throws HyracksDataException {
- tupleForwarder.fail();
- }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index 5b9b96f..e24c26d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -20,6 +20,8 @@
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
@@ -32,16 +34,20 @@
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowController {
+ public enum State {
+ CREATED,
+ STARTED,
+ STOPPED
+ }
+
private static final Logger LOGGER = Logger.getLogger(FeedRecordDataFlowController.class.getName());
private final IRecordDataParser<T> dataParser;
private final IRecordReader<T> recordReader;
protected final AtomicBoolean closed = new AtomicBoolean(false);
protected static final long INTERVAL = 1000;
- protected boolean failed = false;
+ protected State state = State.CREATED;
protected long incomingRecordsCount = 0;
protected long failedRecordsCount = 0;
@@ -57,8 +63,15 @@
@Override
public void start(IFrameWriter writer) throws HyracksDataException, InterruptedException {
+ synchronized (this) {
+ if (state == State.STOPPED) {
+ return;
+ } else {
+ setState(State.STARTED);
+ }
+ }
+ Exception failure = null;
try {
- failed = false;
tupleForwarder.initialize(ctx, writer);
while (hasNext()) {
IRawRecord<? extends T> record = next();
@@ -74,21 +87,48 @@
}
}
} catch (HyracksDataException e) {
- LOGGER.log(Level.WARN, e);
+ LOGGER.log(Level.WARNING, "Exception during ingestion", e);
//if interrupted while waiting for a new record, then it is safe to not fail forward
if (e.getComponent() == ErrorCode.ASTERIX
- && e.getErrorCode() == ErrorCode.FEED_STOPPED_WHILE_WAITING_FOR_A_NEW_RECORD) {
- // Do nothing
+ && (e.getErrorCode() == ErrorCode.FEED_STOPPED_WHILE_WAITING_FOR_A_NEW_RECORD)) {
+ // Do nothing. interrupted by the active manager
+ } else if (e.getComponent() == ErrorCode.ASTERIX
+ && (e.getErrorCode() == ErrorCode.FEED_FAILED_WHILE_GETTING_A_NEW_RECORD)) {
+ // Failure but we know we can for sure push the previously parsed records safely
+ failure = e;
+ try {
+ flush();
+ } catch (Exception flushException) {
+ tupleForwarder.fail();
+ flushException.addSuppressed(e);
+ failure = flushException;
+ }
} else {
- failed = true;
- throw e;
+ failure = e;
+ tupleForwarder.fail();
}
} catch (Exception e) {
- failed = true;
- LOGGER.warn("Failure while operating a feed source", e);
- throw HyracksDataException.create(e);
+ failure = e;
+ tupleForwarder.fail();
+ LOGGER.log(Level.WARNING, "Failure while operating a feed source", e);
+ } finally {
+ failure = finish(failure);
}
- finish();
+ if (failure != null) {
+ if (failure instanceof InterruptedException) {
+ throw (InterruptedException) failure;
+ }
+ throw HyracksDataException.create(failure);
+ }
+ }
+
+ private synchronized void setState(State newState) {
+ LOGGER.log(Level.INFO, "State is being set from " + state + " to " + newState);
+ state = newState;
+ }
+
+ public synchronized State getState() {
+ return state;
}
private IRawRecord<? extends T> next() throws HyracksDataException {
@@ -97,47 +137,58 @@
} catch (InterruptedException e) { // NOSONAR Gracefully handling interrupt to push records in the pipeline
throw new RuntimeDataException(ErrorCode.FEED_STOPPED_WHILE_WAITING_FOR_A_NEW_RECORD, e);
} catch (Exception e) {
- throw HyracksDataException.create(e);
+ if (!recordReader.handleException(e)) {
+ throw new RuntimeDataException(ErrorCode.FEED_FAILED_WHILE_GETTING_A_NEW_RECORD, e);
+ }
+ return null;
}
}
private boolean hasNext() throws HyracksDataException {
- boolean hasNext;
- try {
- hasNext = recordReader.hasNext();
- } catch (InterruptedException e) { // NOSONAR Gracefully handling interrupt to push records in the pipeline
- throw new RuntimeDataException(ErrorCode.FEED_STOPPED_WHILE_WAITING_FOR_A_NEW_RECORD, e);
- } catch (Exception e) {
- throw HyracksDataException.create(e);
+ while (true) {
+ try {
+ return recordReader.hasNext();
+ } catch (InterruptedException e) { // NOSONAR Gracefully handling interrupt to push records in the pipeline
+ throw new RuntimeDataException(ErrorCode.FEED_STOPPED_WHILE_WAITING_FOR_A_NEW_RECORD, e);
+ } catch (Exception e) {
+ if (!recordReader.handleException(e)) {
+ throw new RuntimeDataException(ErrorCode.FEED_FAILED_WHILE_GETTING_A_NEW_RECORD, e);
+ }
+ }
}
- return hasNext;
}
- private void finish() throws HyracksDataException {
+ private Exception finish(Exception failure) {
HyracksDataException hde = null;
try {
- tupleForwarder.close();
- } catch (Throwable th) {
+ recordReader.close();
+ } catch (Exception th) {
+ LOGGER.log(Level.WARNING, "Failure during while operating a feed source", th);
hde = HyracksDataException.suppress(hde, th);
}
try {
- recordReader.close();
- } catch (Throwable th) {
- LOGGER.warn("Failure during while operating a feed sourcec", th);
+ tupleForwarder.close();
+ } catch (Exception th) {
hde = HyracksDataException.suppress(hde, th);
} finally {
closeSignal();
}
+ setState(State.STOPPED);
if (hde != null) {
- throw hde;
+ if (failure != null) {
+ failure.addSuppressed(hde);
+ } else {
+ return hde;
+ }
}
+ return failure;
}
private boolean parseAndForward(IRawRecord<? extends T> record) throws IOException {
try {
dataParser.parse(record, tb.getDataOutput());
} catch (Exception e) {
- LOGGER.warn(ExternalDataConstants.ERROR_PARSE_RECORD, e);
+ LOGGER.log(Level.WARNING, ExternalDataConstants.ERROR_PARSE_RECORD, e);
feedLogManager.logRecord(record.toString(), ExternalDataConstants.ERROR_PARSE_RECORD);
// continue the outer loop
return false;
@@ -172,44 +223,31 @@
@Override
public boolean stop() throws HyracksDataException {
- HyracksDataException hde = null;
+ synchronized (this) {
+ switch (state) {
+ case CREATED:
+ case STOPPED:
+ setState(State.STOPPED);
+ return true;
+ case STARTED:
+ break;
+ default:
+ throw new HyracksDataException("unknown state " + state);
+
+ }
+ }
if (recordReader.stop()) {
- if (failed) {
- // failed, close here
- try {
- tupleForwarder.close();
- } catch (Throwable th) {
- hde = HyracksDataException.suppress(hde, th);
- }
- try {
- recordReader.close();
- } catch (Throwable th) {
- hde = HyracksDataException.suppress(hde, th);
- }
- if (hde != null) {
- throw hde;
- }
- } else {
- try {
- waitForSignal();
- } catch (InterruptedException e) {
- throw HyracksDataException.create(e);
- }
+ try {
+ waitForSignal();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw HyracksDataException.create(e);
}
return true;
}
return false;
}
- @Override
- public boolean handleException(Throwable th) throws HyracksDataException {
- // This is not a parser record. most likely, this error happened in the record reader.
- if (!recordReader.handleException(th)) {
- finish();
- }
- return !closed.get();
- }
-
public IRecordReader<T> getReader() {
return recordReader;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
index cad11cd..1f1f545 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
@@ -43,8 +43,7 @@
try {
tupleForwarder.initialize(ctx, writer);
while (true) {
- tb.reset();
- if (!dataParser.parse(tb.getDataOutput())) {
+ if (!parseNext()) {
break;
}
tb.addFieldEndOffset();
@@ -52,12 +51,25 @@
incomingRecordsCount++;
}
} catch (Exception e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
} finally {
tupleForwarder.close();
}
}
+ private boolean parseNext() throws HyracksDataException {
+ while (true) {
+ try {
+ tb.reset();
+ return dataParser.parse(tb.getDataOutput());
+ } catch (Exception e) {
+ if (!handleException(e)) {
+ throw e;
+ }
+ }
+ }
+ }
+
@Override
public boolean stop() throws HyracksDataException {
try {
@@ -71,8 +83,7 @@
return false;
}
- @Override
- public boolean handleException(Throwable th) {
+ private boolean handleException(Throwable th) {
boolean handled = true;
try {
handled &= stream.handleException(th);
@@ -86,6 +97,7 @@
return handled;
}
+ @Override
public String getStats() {
return "{\"incoming-records-number\": " + incomingRecordsCount + "}";
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
index 3a8130b..f824b67 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
@@ -103,7 +103,7 @@
if (throwable != null) {
throwable.addSuppressed(e);
} else {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
} catch (Throwable th) {
if (throwable != null) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java
index 2b06775..c4f75e3 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java
@@ -32,8 +32,8 @@
private final IExternalIndexer indexer;
public IndexingDataFlowController(IHyracksTaskContext ctx, ITupleForwarder tupleForwarder,
- IRecordDataParser<T> dataParser, IRecordReader<? extends T> recordReader,
- IExternalIndexer indexer) throws IOException {
+ IRecordDataParser<T> dataParser, IRecordReader<? extends T> recordReader, IExternalIndexer indexer)
+ throws IOException {
super(ctx, tupleForwarder, dataParser, recordReader, 1 + indexer.getNumberOfFields());
this.indexer = indexer;
}
@@ -43,7 +43,7 @@
try {
indexer.index(tb);
} catch (IOException e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java
index eb5527f..f34b77d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java
@@ -71,7 +71,8 @@
try {
Thread.sleep(interTupleInterval);
} catch (InterruptedException e) {
- throw new HyracksDataException(e);
+ Thread.currentThread().interrupt();
+ throw HyracksDataException.create(e);
}
}
boolean success = appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
index 9f32a25..eeda80c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
@@ -24,7 +24,6 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
public class FeedAdapter implements IDataSourceAdapter {
- private static final long serialVersionUID = 1L;
private final AbstractFeedDataFlowController controller;
public FeedAdapter(AbstractFeedDataFlowController controller) {
@@ -40,10 +39,6 @@
return controller.stop();
}
- public boolean handleException(Throwable e) throws HyracksDataException {
- return controller.handleException(e);
- }
-
public boolean pause() throws HyracksDataException {
return controller.pause();
}
@@ -56,7 +51,7 @@
return controller.getStats();
}
- public void fail() throws HyracksDataException {
- controller.fail();
+ public AbstractFeedDataFlowController getController() {
+ return controller;
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
index 0681d71..916fe0a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
@@ -25,7 +25,6 @@
public class GenericAdapter implements IDataSourceAdapter {
- private static final long serialVersionUID = 1L;
private final IDataFlowController controller;
public GenericAdapter(IDataFlowController controller) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
deleted file mode 100644
index d102d0c..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.runtime;
-
-import org.apache.asterix.external.dataset.adapter.FeedAdapter;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.log4j.Logger;
-
-/**
- * The class in charge of executing feed adapters.
- */
-public class AdapterExecutor implements Runnable {
-
- private static final Logger LOGGER = Logger.getLogger(AdapterExecutor.class.getName());
-
- private final IFrameWriter writer; // A writer that sends frames to multiple receivers (that can
- // increase or decrease at any time)
- private final FeedAdapter adapter; // The adapter
- private final AdapterRuntimeManager adapterManager;// The runtime manager <-- two way visibility -->
- private int restartCount = 0;
-
- public AdapterExecutor(IFrameWriter writer, FeedAdapter adapter, AdapterRuntimeManager adapterManager) {
- this.writer = writer;
- this.adapter = adapter;
- this.adapterManager = adapterManager;
- }
-
- @Override
- public void run() {
- // Start by getting the partition number from the manager
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("Starting ingestion for partition:" + adapterManager.getPartition());
- }
- boolean failed = false;
- try {
- failed = doRun();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- } catch (Exception e) {
- failed = true;
- LOGGER.error("Unhandled Exception", e);
- } finally {
- // Done with the adapter. about to close, setting the stage based on the failed ingestion flag and notifying
- // the runtime manager
- adapterManager.setFailed(failed);
- adapterManager.setDone(true);
- synchronized (adapterManager) {
- adapterManager.notifyAll();
- }
- }
- }
-
- private boolean doRun() throws HyracksDataException, InterruptedException {
- boolean continueIngestion = true;
- boolean failedIngestion = false;
- while (continueIngestion) {
- try {
- // Start the adapter
- adapter.start(adapterManager.getPartition(), writer);
- // Adapter has completed execution
- continueIngestion = false;
- } catch (InterruptedException e) {
- adapter.fail();
- throw e;
- } catch (Exception e) {
- LOGGER.error("Exception during feed ingestion ", e);
- continueIngestion = adapter.handleException(e);
- if (!continueIngestion) {
- adapter.fail();
- }
- failedIngestion = !continueIngestion;
- restartCount++;
- }
- }
- return failedIngestion;
- }
-
- public String getStats() {
- return "{\"adapter-stats\": " + adapter.getStats() + ", \"executor-restart-times\": " + restartCount + "}";
- }
-}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
deleted file mode 100644
index 1b5eeac..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.runtime;
-
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.active.EntityId;
-import org.apache.asterix.external.dataset.adapter.FeedAdapter;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-/**
- * This class manages the execution of an adapter within a feed
- */
-public class AdapterRuntimeManager {
-
- private static final Logger LOGGER = Logger.getLogger(AdapterRuntimeManager.class.getName());
-
- private final EntityId feedId; // (dataverse-feed)
-
- private final FeedAdapter feedAdapter; // The adapter
-
- private final AdapterExecutor adapterExecutor; // The executor for the adapter
-
- private final int partition; // The partition number
-
- private final IHyracksTaskContext ctx;
-
- private Future<?> execution;
-
- private boolean started = false;
- private volatile boolean done = false;
- private volatile boolean failed = false;
-
- public AdapterRuntimeManager(IHyracksTaskContext ctx, EntityId entityId, FeedAdapter feedAdapter,
- IFrameWriter writer, int partition) {
- this.ctx = ctx;
- this.feedId = entityId;
- this.feedAdapter = feedAdapter;
- this.partition = partition;
- this.adapterExecutor = new AdapterExecutor(writer, feedAdapter, this);
- }
-
- public void start() {
- synchronized (adapterExecutor) {
- started = true;
- if (!done) {
- execution = ctx.getExecutorService().submit(adapterExecutor);
- } else {
- LOGGER.log(Level.WARNING, "Someone stopped me before I even start. I will simply not start");
- }
- }
- }
-
- public void stop() throws HyracksDataException, InterruptedException {
- synchronized (adapterExecutor) {
- try {
- if (started) {
- try {
- ctx.getExecutorService().submit(() -> {
- if (feedAdapter.stop()) {
- execution.get();
- }
- return null;
- }).get(30, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- LOGGER.log(Level.WARNING, "Interrupted while trying to stop an adapter runtime", e);
- throw e;
- } catch (Exception e) {
- LOGGER.log(Level.WARNING, "Exception while trying to stop an adapter runtime", e);
- throw HyracksDataException.create(e);
- } finally {
- execution.cancel(true);
- }
- } else {
- LOGGER.log(Level.WARNING, "Adapter executor was stopped before it starts");
- }
- } finally {
- done = true;
- }
- }
- }
-
- public EntityId getFeedId() {
- return feedId;
- }
-
- @Override
- public String toString() {
- return feedId + "[" + partition + "]";
- }
-
- public FeedAdapter getFeedAdapter() {
- return feedAdapter;
- }
-
- public AdapterExecutor getAdapterExecutor() {
- return adapterExecutor;
- }
-
- public int getPartition() {
- return partition;
- }
-
- public boolean isFailed() {
- return failed;
- }
-
- public void setFailed(boolean failed) {
- this.failed = failed;
- }
-
- public boolean isDone() {
- return done;
- }
-
- public void setDone(boolean done) {
- this.done = done;
- }
-
- public String getStats() {
- return adapterExecutor.getStats();
- }
-}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
index cbf784e..982cf5b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
@@ -137,7 +137,7 @@
restoreConfig(ctx);
return new HDFSInputStream(read, inputSplits, readSchedule, nodeName, conf, configuration, files, indexer);
} catch (Exception e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
index 00ac090..c5ca129 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
@@ -21,16 +21,17 @@
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.FeedLogManager;
import org.apache.asterix.external.util.FileSystemWatcher;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.log4j.Logger;
public class LocalFSInputStream extends AsterixInputStream {
@@ -155,24 +156,25 @@
if (in == null) {
return false;
}
- if (th instanceof HyracksDataException
- && ((HyracksDataException) th).getErrorCode() == ErrorCode.RECORD_READER_MALFORMED_INPUT_STREAM) {
+ Throwable root = ExceptionUtils.getRootCause(th);
+ if (root instanceof HyracksDataException
+ && ((HyracksDataException) root).getErrorCode() == ErrorCode.RECORD_READER_MALFORMED_INPUT_STREAM) {
if (currentFile != null) {
try {
logManager.logRecord(currentFile.getAbsolutePath(), "Corrupted input file");
} catch (IOException e) {
- LOGGER.warn("Filed to write to feed log file", e);
+ LOGGER.log(Level.WARNING, "Filed to write to feed log file", e);
}
- LOGGER.warn("Corrupted input file: " + currentFile.getAbsolutePath());
+ LOGGER.log(Level.WARNING, "Corrupted input file: " + currentFile.getAbsolutePath());
}
try {
advance();
return true;
} catch (Exception e) {
- LOGGER.warn("An exception was thrown while trying to skip a file", e);
+ LOGGER.log(Level.WARNING, "An exception was thrown while trying to skip a file", e);
}
}
- LOGGER.warn("Failed to recover from failure", th);
+ LOGGER.log(Level.WARNING, "Failed to recover from failure", th);
return false;
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java
index 9a0e718..caeaa07 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java
@@ -84,7 +84,7 @@
try {
return new SocketClientInputStream(sockets.get(partition));
} catch (IOException e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
index 05931b2..1f1fa5c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
@@ -124,7 +124,7 @@
server.bind(new InetSocketAddress(socket.second));
return new SocketServerInputStream(server);
} catch (IOException e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
index b32006c..12be449 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
@@ -97,7 +97,7 @@
try {
return new TwitterFirehoseInputStream(configuration, partition);
} catch (IOException e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
index 0d485924..2b5e248 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
@@ -23,6 +23,8 @@
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.ErrorCode;
@@ -95,6 +97,11 @@
public class JObjectAccessors {
+ private static final Logger LOGGER = Logger.getLogger(JObjectAccessors.class.getName());
+
+ private JObjectAccessors() {
+ }
+
public static IJObjectAccessor createFlatJObjectAccessor(ATypeTag aTypeTag) {
IJObjectAccessor accessor = null;
switch (aTypeTag) {
@@ -200,18 +207,16 @@
@Override
public IJObject access(IVisitablePointable pointable, IObjectPool<IJObject, IAType> objPool)
throws HyracksDataException {
- IJObject jObject = objPool.allocate(BuiltinType.ANULL);
- return jObject;
+ return objPool.allocate(BuiltinType.ANULL);
}
}
- public static class JMissingAccessor implements IJObjectAccessor {
+ public static class JMissingAccessor implements IJObjectAccessor {
@Override
public IJObject access(IVisitablePointable pointable, IObjectPool<IJObject, IAType> objPool)
throws HyracksDataException {
- IJObject jObject = objPool.allocate(BuiltinType.AMISSING);
- return jObject;
+ return objPool.allocate(BuiltinType.AMISSING);
}
}
@@ -271,7 +276,7 @@
try {
v = reader.readUTF(new DataInputStream(new ByteArrayInputStream(b, s + 1, l - 1)));
} catch (IOException e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
JObjectUtil.getNormalizedString(v);
@@ -539,8 +544,8 @@
}
} catch (Exception e) {
- e.printStackTrace();
- throw new HyracksDataException(e);
+ LOGGER.log(Level.WARNING, "Failure while accessing a java record", e);
+ throw HyracksDataException.create(e);
}
return jRecord;
}
@@ -593,7 +598,7 @@
list.add(listItem);
}
} catch (AsterixException exception) {
- throw new HyracksDataException(exception);
+ throw HyracksDataException.create(exception);
}
return list;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java
index 45d424e..242773e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java
@@ -89,7 +89,7 @@
try {
bulkLoader.end();
} catch (Throwable th) {
- throw new HyracksDataException(th);
+ throw HyracksDataException.create(th);
} finally {
try {
indexHelper.close();
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java
index 6299982..c096f69 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java
@@ -89,18 +89,21 @@
try {
snapshotAccessor.close();
} catch (Throwable th) {
- hde = new HyracksDataException(th);
+ hde = HyracksDataException.create(th);
}
try {
adapter.close();
} catch (Throwable th) {
if (hde == null) {
- hde = new HyracksDataException(th);
+ hde = HyracksDataException.create(th);
} else {
hde.addSuppressed(th);
}
}
}
+ if (hde != null) {
+ throw hde;
+ }
}
@Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
index 3a06a2b..770e978 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
@@ -96,8 +96,7 @@
if (adaptorFactory == null) {
adaptorFactory = createExternalAdapterFactory(ctx);
}
- return new FeedIntakeOperatorNodePushable(ctx, feedId, adaptorFactory, partition, policyAccessor,
- recordDescProvider, this);
+ return new FeedIntakeOperatorNodePushable(ctx, feedId, adaptorFactory, partition, recordDescProvider, this);
}
private IAdapterFactory createExternalAdapterFactory(IHyracksTaskContext ctx) throws HyracksDataException {
@@ -112,7 +111,7 @@
adapterFactory.setOutputType(adapterOutputType);
adapterFactory.configure(ctx.getJobletContext().getServiceContext(), adaptorConfiguration);
} catch (Exception e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
} else {
RuntimeDataException err = new RuntimeDataException(
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
index 8c6a420..16b8fba 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -18,15 +18,14 @@
*/
package org.apache.asterix.external.operators;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
import org.apache.asterix.active.ActiveRuntimeId;
import org.apache.asterix.active.ActiveSourceOperatorNodePushable;
import org.apache.asterix.active.EntityId;
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.external.api.IAdapterFactory;
import org.apache.asterix.external.dataset.adapter.FeedAdapter;
-import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
-import org.apache.asterix.external.feed.runtime.AdapterRuntimeManager;
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -42,74 +41,97 @@
* The artifacts are lazily activated when a feed receives a subscription request.
*/
public class FeedIntakeOperatorNodePushable extends ActiveSourceOperatorNodePushable {
-
- private final int partition;
- private final IAdapterFactory adapterFactory;
+ private static final Logger LOGGER = Logger.getLogger(FeedIntakeOperatorNodePushable.class.getName());
private final FeedIntakeOperatorDescriptor opDesc;
- private volatile AdapterRuntimeManager adapterRuntimeManager;
+ private final FeedAdapter adapter;
+ private boolean poisoned = false;
public FeedIntakeOperatorNodePushable(IHyracksTaskContext ctx, EntityId feedId, IAdapterFactory adapterFactory,
- int partition, FeedPolicyAccessor policyAccessor, IRecordDescriptorProvider recordDescProvider,
- FeedIntakeOperatorDescriptor feedIntakeOperatorDescriptor) {
+ int partition, IRecordDescriptorProvider recordDescProvider,
+ FeedIntakeOperatorDescriptor feedIntakeOperatorDescriptor) throws HyracksDataException {
super(ctx, new ActiveRuntimeId(feedId, FeedIntakeOperatorNodePushable.class.getSimpleName(), partition));
this.opDesc = feedIntakeOperatorDescriptor;
this.recordDesc = recordDescProvider.getOutputRecordDescriptor(opDesc.getActivityId(), 0);
- this.partition = partition;
- this.adapterFactory = adapterFactory;
+ adapter = (FeedAdapter) adapterFactory.createAdapter(ctx, runtimeId.getPartition());
}
@Override
protected void start() throws HyracksDataException, InterruptedException {
+ String before = Thread.currentThread().getName();
+ Thread.currentThread().setName("Intake Thread");
try {
writer.open();
- Thread.currentThread().setName("Intake Thread");
- FeedAdapter adapter = (FeedAdapter) adapterFactory.createAdapter(ctx, partition);
- adapterRuntimeManager = new AdapterRuntimeManager(ctx, runtimeId.getEntityId(), adapter, writer, partition);
- IFrame message = new VSizeFrame(ctx);
- TaskUtil.put(HyracksConstants.KEY_MESSAGE, message, ctx);
+ synchronized (this) {
+ if (poisoned) {
+ return;
+ }
+ }
/*
* Set null feed message. Feed pipeline carries with it a message with each frame
* Initially, the message is set to a null message that can be changed by feed adapters.
* One use case is adapters which consume data sources that allow restartability. Such adapters
* can propagate progress information through the ingestion pipeline to storage nodes
*/
+ IFrame message = new VSizeFrame(ctx);
+ TaskUtil.put(HyracksConstants.KEY_MESSAGE, message, ctx);
message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
message.getBuffer().flip();
- adapterRuntimeManager.start();
- synchronized (adapterRuntimeManager) {
- while (!adapterRuntimeManager.isDone()) {
- adapterRuntimeManager.wait();
- }
- }
- if (adapterRuntimeManager.isFailed()) {
- throw new RuntimeDataException(
- ErrorCode.OPERATORS_FEED_INTAKE_OPERATOR_NODE_PUSHABLE_FAIL_AT_INGESTION);
- }
+ run();
} catch (Exception e) {
- /*
- * An Interrupted Exception is thrown if the Intake job cannot progress further due to failure of another
- * node involved in the Hyracks job. As the Intake job involves only the intake operator, the exception is
- * indicative of a failure at the sibling intake operator location. The surviving intake partitions must
- * continue to live and receive data from the external source.
- */
- writer.fail();
+ LOGGER.log(Level.WARNING, "Failure during data ingestion", e);
throw e;
} finally {
writer.close();
+ Thread.currentThread().setName(before);
+ }
+ }
+
+ private void run() throws HyracksDataException {
+ // Start by getting the partition number from the manager
+ LOGGER.info("Starting ingestion for partition:" + ctx.getTaskAttemptId().getTaskId().getPartition());
+ try {
+ doRun();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw HyracksDataException.create(e);
+ } catch (Exception e) {
+ LOGGER.log(Level.WARNING, "Unhandled Exception", e);
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ private void doRun() throws HyracksDataException, InterruptedException {
+ while (true) {
+ try {
+ // Start the adapter
+ adapter.start(ctx.getTaskAttemptId().getTaskId().getPartition(), writer);
+ // Adapter has completed execution
+ return;
+ } catch (InterruptedException e) {
+ throw e;
+ } catch (Exception e) {
+ LOGGER.log(Level.WARNING, "Exception during feed ingestion ", e);
+ throw HyracksDataException.create(e);
+ }
}
}
@Override
protected void abort() throws HyracksDataException, InterruptedException {
- if (adapterRuntimeManager != null) {
- adapterRuntimeManager.stop();
+ LOGGER.info(runtimeId + " aborting...");
+ synchronized (this) {
+ poisoned = true;
+ if (!adapter.stop()) {
+ LOGGER.info(runtimeId + " failed to stop adapter. interrupting the thread...");
+ taskThread.interrupt();
+ }
}
}
@Override
public String getStats() {
- if (adapterRuntimeManager != null) {
- return adapterRuntimeManager.getStats();
+ if (adapter != null) {
+ return "{\"adapter-stats\": " + adapter.getStats() + "}";
} else {
return "\"Runtime stats is not available.\"";
}
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/BuiltinClassAdFunctions.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/BuiltinClassAdFunctions.java
index e965dce..5fc9df3 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/BuiltinClassAdFunctions.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/BuiltinClassAdFunctions.java
@@ -1849,7 +1849,7 @@
result.setBooleanValue(false);
List<String> list0 = objectPool.stringArrayListPool.get();
- Set<String> set1 = new HashSet<String>();
+ Set<String> set1 = new HashSet<>();
split_string_list(str0, have_delimiter ? delimiter_string.charAt(0) : ',', list0);
split_string_set(str1, have_delimiter ? delimiter_string.charAt(0) : ',', set1);
@@ -1943,7 +1943,7 @@
return true;
}
} catch (IOException e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
state.decrementDepth();
expr.setParentScope(state.getCurAd());
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
index c362969..2273bea 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
@@ -39,8 +39,6 @@
public class TestTypedAdapter extends FeedAdapter {
- private static final long serialVersionUID = 1L;
-
private final PipedOutputStream pos;
private final PipedInputStream pis;
@@ -145,11 +143,6 @@
}
@Override
- public boolean handleException(Throwable e) {
- return false;
- }
-
- @Override
public boolean pause() {
return false;
}
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
index 1c28940..5262e1f 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
@@ -106,7 +106,7 @@
}
forwarder.close();
} catch (Exception e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
};
@@ -115,7 +115,7 @@
try {
return new TestTypedAdapter(tupleParserFactory, outputType, ctx, configuration, partition);
} catch (IOException e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/builders/AbstractListBuilder.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/builders/AbstractListBuilder.java
index f64206e..f15b1e5 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/builders/AbstractListBuilder.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/builders/AbstractListBuilder.java
@@ -95,14 +95,14 @@
this.outputStream.write(data, start + 1, len - 1);
}
} catch (IOException e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
private boolean toWriteTag(byte serializedTypeTag) {
boolean toWriteTag = itemTypeTag == ATypeTag.ANY;
- toWriteTag = toWriteTag
- || (itemTypeTag == ATypeTag.NULL && serializedTypeTag == ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+ toWriteTag =
+ toWriteTag || (itemTypeTag == ATypeTag.NULL && serializedTypeTag == ATypeTag.SERIALIZED_NULL_TYPE_TAG);
return toWriteTag
|| (itemTypeTag == ATypeTag.MISSING && serializedTypeTag == ATypeTag.SERIALIZED_MISSING_TYPE_TAG);
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordAddFieldsDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordAddFieldsDescriptor.java
index 111557a..5df04f8 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordAddFieldsDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordAddFieldsDescriptor.java
@@ -118,17 +118,17 @@
private final RecordBuilder recordBuilder = new RecordBuilder();
private final RuntimeRecordTypeInfo requiredRecordTypeInfo = new RuntimeRecordTypeInfo();
- private final IBinaryHashFunction putHashFunc = ListItemBinaryHashFunctionFactory.INSTANCE
- .createBinaryHashFunction();
- private final IBinaryHashFunction getHashFunc = ListItemBinaryHashFunctionFactory.INSTANCE
- .createBinaryHashFunction();
+ private final IBinaryHashFunction putHashFunc =
+ ListItemBinaryHashFunctionFactory.INSTANCE.createBinaryHashFunction();
+ private final IBinaryHashFunction getHashFunc =
+ ListItemBinaryHashFunctionFactory.INSTANCE.createBinaryHashFunction();
private final BinaryEntry keyEntry = new BinaryEntry();
private final BinaryEntry valEntry = new BinaryEntry();
private final IVisitablePointable tempValReference = allocator.allocateEmpty();
- private final IBinaryComparator cmp = ListItemBinaryComparatorFactory.INSTANCE
- .createBinaryComparator();
- private BinaryHashMap hashMap = new BinaryHashMap(TABLE_SIZE, TABLE_FRAME_SIZE, putHashFunc,
- getHashFunc, cmp);
+ private final IBinaryComparator cmp =
+ ListItemBinaryComparatorFactory.INSTANCE.createBinaryComparator();
+ private BinaryHashMap hashMap =
+ new BinaryHashMap(TABLE_SIZE, TABLE_FRAME_SIZE, putHashFunc, getHashFunc, cmp);
private ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage();
private DataOutput out = resultStorage.getDataOutput();
@@ -157,7 +157,6 @@
vp0.set(argPtr0);
vp1.set(argPtr1);
-
ARecordVisitablePointable recordPointable = (ARecordVisitablePointable) vp0;
AListVisitablePointable listPointable = (AListVisitablePointable) vp1;
@@ -207,10 +206,10 @@
throw new AsterixException("Expected list of record, got "
+ PointableHelper.getTypeTag(inputFields.get(i)));
}
- List<IVisitablePointable> names = ((ARecordVisitablePointable) inputFields.get(i))
- .getFieldNames();
- List<IVisitablePointable> values = ((ARecordVisitablePointable) inputFields.get(i))
- .getFieldValues();
+ List<IVisitablePointable> names =
+ ((ARecordVisitablePointable) inputFields.get(i)).getFieldNames();
+ List<IVisitablePointable> values =
+ ((ARecordVisitablePointable) inputFields.get(i)).getFieldValues();
// Get name and value of the field to be added
// Use loop to account for the cases where users switches the order of the fields
@@ -241,8 +240,7 @@
tempValReference.set(entry.getBuf(), entry.getOffset(), entry.getLength());
// If value is not equal throw conflicting duplicate field, otherwise ignore
if (!PointableHelper.byteArrayEqual(valuePointable, tempValReference)) {
- throw new RuntimeDataException(ErrorCode.DUPLICATE_FIELD_NAME,
- getIdentifier());
+ throw new RuntimeDataException(ErrorCode.DUPLICATE_FIELD_NAME, getIdentifier());
}
} else {
if (pos > -1) {
@@ -256,7 +254,7 @@
}
}
} catch (AsterixException e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
};
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
index 3957c06..f2deb74 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
@@ -72,8 +72,8 @@
@Override
public void close() throws HyracksDataException {
try {
- INcApplicationContext appCtx = (INcApplicationContext) ctx.getJobletContext()
- .getServiceContext().getApplicationContext();
+ INcApplicationContext appCtx =
+ (INcApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext();
IDatasetLifecycleManager datasetLifeCycleManager = appCtx.getDatasetLifecycleManager();
ILockManager lockManager = appCtx.getTransactionSubsystem().getLockManager();
ITransactionManager txnManager = appCtx.getTransactionSubsystem().getTransactionManager();
@@ -84,7 +84,7 @@
// flush the dataset synchronously
datasetLifeCycleManager.flushDataset(datasetId.getId(), false);
} catch (ACIDException e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
index b835b3a..b7a4c14 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
@@ -65,12 +65,12 @@
((INCMessageBroker) serviceCtx.getMessageBroker()).sendMessageToCC(msg);
reponse = resourceIdResponseQ.take();
if (reponse.getException() != null) {
- throw new HyracksDataException(reponse.getException().getMessage());
+ throw HyracksDataException.create(reponse.getException());
}
}
return reponse.getResourceId();
} catch (Exception e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index ab7d657..b22a257 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -395,10 +395,11 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Registering intention to remove node id " + nodeId);
}
- if (!activeNcConfiguration.containsKey(nodeId)) {
+ if (activeNcConfiguration.containsKey(nodeId)) {
+ pendingRemoval.add(nodeId);
+ } else {
LOGGER.warning("Cannot register unknown node " + nodeId + " for pending removal");
}
- pendingRemoval.add(nodeId);
}
public synchronized boolean cancelRemovePending(String nodeId) {
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
index 367616e..f76cb89 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
@@ -46,8 +46,8 @@
private int pkHash;
public LockThenSearchOperationCallback(DatasetId datasetId, int[] entityIdFields,
- ITransactionSubsystem txnSubsystem,
- ITransactionContext txnCtx, IOperatorNodePushable operatorNodePushable) {
+ ITransactionSubsystem txnSubsystem, ITransactionContext txnCtx,
+ IOperatorNodePushable operatorNodePushable) {
super(datasetId, entityIdFields, txnCtx, txnSubsystem.getLockManager());
this.operatorNodePushable = (LSMIndexInsertUpdateDeleteOperatorNodePushable) operatorNodePushable;
this.logManager = txnSubsystem.getLogManager();
@@ -118,7 +118,7 @@
lockManager.lock(datasetId, pkHash, LockMode.X, txnCtx);
}
} catch (ACIDException e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java
index e8de90d..b13a08e 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java
@@ -56,7 +56,7 @@
try {
lockManager.lock(datasetId, pkHash, LockMode.S, txnCtx);
} catch (ACIDException e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
@@ -70,7 +70,7 @@
try {
lockManager.unlock(datasetId, pkHash, LockMode.S, txnCtx);
} catch (ACIDException e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
@@ -80,7 +80,7 @@
try {
lockManager.unlock(datasetId, pkHash, LockMode.S, txnCtx);
} catch (ACIDException e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
index 9e96fbb..a6cb61c 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
@@ -89,7 +89,7 @@
lockManager.lock(datasetId, pkHash, LockMode.X, txnCtx);
}
} catch (ACIDException e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
@@ -99,7 +99,7 @@
int pkHash = computePrimaryKeyHashValue(after, primaryKeyFields);
log(pkHash, after, before);
} catch (ACIDException e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
index 5527f47..932c925 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
@@ -74,7 +74,7 @@
txnCtx.registerIndexAndCallback(resource.getId(), index, (AbstractOperationCallback) modCallback, true);
return modCallback;
} catch (ACIDException e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
index 8c5b099..b339d27 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
@@ -70,7 +70,7 @@
txnCtx.registerIndexAndCallback(resource.getId(), index, (AbstractOperationCallback) modCallback, false);
return modCallback;
} catch (ACIDException e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
}
diff --git a/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java b/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
index a27f987..f897aca 100644
--- a/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
+++ b/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
@@ -20,11 +20,11 @@
import static org.mockito.Mockito.mock;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.api.ThreadExecutor;
import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
import org.apache.hyracks.api.io.IIOManager;
@@ -35,11 +35,11 @@
class TestRuntimeContextProvider implements IAppRuntimeContextProvider {
- ThreadExecutor ate = new ThreadExecutor(Executors.defaultThreadFactory());
+ ExecutorService ate = Executors.newCachedThreadPool(Executors.defaultThreadFactory());
IDatasetLifecycleManager dlcm = mock(IDatasetLifecycleManager.class);
@Override
- public ThreadExecutor getThreadExecutor() {
+ public ExecutorService getThreadExecutor() {
return ate;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java
index 8394057..067579e 100644
--- a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java
+++ b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java
@@ -18,6 +18,8 @@
*/
package org.apache.hyracks.algebricks.common.constraints;
+import java.util.Arrays;
+
public class AlgebricksAbsolutePartitionConstraint extends AlgebricksPartitionConstraint {
private final String[] locations;
@@ -33,4 +35,10 @@
public String[] getLocations() {
return locations;
}
+
+ @Override
+ public String toString() {
+ return Arrays.toString(locations);
+ }
+
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SplitOperatorDescriptor.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SplitOperatorDescriptor.java
index 68274ce..2314f88 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SplitOperatorDescriptor.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SplitOperatorDescriptor.java
@@ -71,8 +71,8 @@
@Override
public void contributeActivities(IActivityGraphBuilder builder) {
- SplitterMaterializerActivityNode sma = new SplitterMaterializerActivityNode(
- new ActivityId(odId, SPLITTER_MATERIALIZER_ACTIVITY_ID));
+ SplitterMaterializerActivityNode sma =
+ new SplitterMaterializerActivityNode(new ActivityId(odId, SPLITTER_MATERIALIZER_ACTIVITY_ID));
builder.addActivity(this, sma);
builder.addSourceEdge(0, sma, 0);
for (int i = 0; i < outputArity; i++) {
@@ -168,7 +168,7 @@
writers[i].close();
} catch (Throwable th) {
if (hde == null) {
- hde = new HyracksDataException(th);
+ hde = HyracksDataException.create(th);
} else {
hde.addSuppressed(th);
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
index 024f6f5..82f403e 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
@@ -137,8 +137,8 @@
ForwardScriptOutput fso = new ForwardScriptOutput(parser, process.getInputStream());
outputPipe = new Thread(fso);
outputPipe.start();
- DumpInStreamToPrintStream disps = new DumpInStreamToPrintStream(process.getErrorStream(),
- System.err);
+ DumpInStreamToPrintStream disps =
+ new DumpInStreamToPrintStream(process.getErrorStream(), System.err);
dumpStderr = new Thread(disps);
dumpStderr.start();
} catch (IOException e) {
@@ -174,7 +174,8 @@
outputPipe.join();
dumpStderr.join();
} catch (InterruptedException e) {
- throw new HyracksDataException(e);
+ Thread.currentThread().interrupt();
+ throw HyracksDataException.create(e);
}
if (ret != 0) {
throw new HyracksDataException("Process exit value: " + ret);
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
index f4c5114..0e70759 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
@@ -59,7 +59,7 @@
public static void delete(File file) throws HyracksDataException {
try {
if (file.isDirectory()) {
- FileUtils.deleteDirectory(file);
+ deleteDirectory(file);
} else {
Files.delete(file.toPath());
}
@@ -89,4 +89,39 @@
throw HyracksDataException.create(ErrorCode.CANNOT_CREATE_FILE, e, fileRef.getAbsolutePath());
}
}
+
+ public static void deleteDirectory(File directory) throws IOException {
+ if (!directory.exists()) {
+ return;
+ }
+ if (!FileUtils.isSymlink(directory)) {
+ cleanDirectory(directory);
+ }
+ Files.delete(directory.toPath());
+ }
+
+ public static void cleanDirectory(final File directory) throws IOException {
+ final File[] files = verifiedListFiles(directory);
+ for (final File file : files) {
+ delete(file);
+ }
+ }
+
+ private static File[] verifiedListFiles(File directory) throws IOException {
+ if (!directory.exists()) {
+ final String message = directory + " does not exist";
+ throw new IllegalArgumentException(message);
+ }
+
+ if (!directory.isDirectory()) {
+ final String message = directory + " is not a directory";
+ throw new IllegalArgumentException(message);
+ }
+
+ final File[] files = directory.listFiles();
+ if (files == null) { // null if security restricted
+ throw new IOException("Failed to list contents of " + directory);
+ }
+ return files;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java
index c9cc71e..2c1ce37 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.api.test;
import java.util.Collection;
+import java.util.Collections;
public class FrameWriterTestUtils {
public static final String EXCEPTION_MESSAGE = "IFrameWriter Exception in the call to the method ";
@@ -32,6 +33,10 @@
Close
}
+ public static TestFrameWriter create() {
+ return create(Collections.emptyList(), Collections.emptyList(), false);
+ }
+
public static TestFrameWriter create(Collection<FrameWriterOperation> exceptionThrowingOperations,
Collection<FrameWriterOperation> errorThrowingOperations, boolean deepCopyInputFrames) {
CountAnswer openAnswer =
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
index 2685f60..7a9306c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -151,12 +151,18 @@
@Override
public synchronized void reportJobFailure(JobId jobId, List<Exception> exceptions) {
+ LOGGER.log(Level.INFO, "job " + jobId + " failed and is being reported to " + getClass().getSimpleName(),
+ exceptions.get(0));
DatasetJobRecord djr = getDatasetJobRecord(jobId);
+ LOGGER.log(Level.INFO, "Dataset job record is " + djr);
if (djr != null) {
+ LOGGER.log(Level.INFO, "Setting exceptions in Dataset job record");
djr.fail(exceptions);
}
final JobResultInfo jobResultInfo = jobResultLocations.get(jobId);
+ LOGGER.log(Level.INFO, "Job result info is " + jobResultInfo);
if (jobResultInfo != null) {
+ LOGGER.log(Level.INFO, "Setting exceptions in Job result info");
jobResultInfo.setException(exceptions.isEmpty() ? null : exceptions.get(0));
}
notifyAll();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
index f18a917..dbbaf9f 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
@@ -66,7 +66,6 @@
import org.apache.hyracks.control.common.job.PartitionState;
import org.apache.hyracks.control.common.job.TaskAttemptDescriptor;
-
public class JobExecutor {
private static final Logger LOGGER = Logger.getLogger(JobExecutor.class.getName());
@@ -190,11 +189,11 @@
private void startRunnableActivityClusters() throws HyracksException {
Set<TaskCluster> taskClusterRoots = new HashSet<>();
- findRunnableTaskClusterRoots(taskClusterRoots, jobRun.getActivityClusterGraph().getActivityClusterMap()
- .values());
- if (LOGGER.isLoggable(Level.FINE)) {
- LOGGER.fine("Runnable TC roots: " + taskClusterRoots + ", inProgressTaskClusters: "
- + inProgressTaskClusters);
+ findRunnableTaskClusterRoots(taskClusterRoots,
+ jobRun.getActivityClusterGraph().getActivityClusterMap().values());
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.log(Level.INFO,
+ "Runnable TC roots: " + taskClusterRoots + ", inProgressTaskClusters: " + inProgressTaskClusters);
}
if (taskClusterRoots.isEmpty() && inProgressTaskClusters.isEmpty()) {
ccs.getWorkQueue()
@@ -344,8 +343,8 @@
for (int i = 0; i < tasks.length; ++i) {
Task ts = tasks[i];
TaskId tid = ts.getTaskId();
- TaskAttempt taskAttempt = new TaskAttempt(tcAttempt, new TaskAttemptId(new TaskId(tid.getActivityId(),
- tid.getPartition()), attempts), ts);
+ TaskAttempt taskAttempt = new TaskAttempt(tcAttempt,
+ new TaskAttemptId(new TaskId(tid.getActivityId(), tid.getPartition()), attempts), ts);
taskAttempt.setStatus(TaskAttempt.TaskStatus.INITIALIZED, null);
locationMap.put(tid,
new PartitionLocationExpression(tid.getActivityId().getOperatorDescriptorId(), tid.getPartition()));
@@ -496,8 +495,8 @@
final DeploymentId deploymentId = jobRun.getDeploymentId();
final JobId jobId = jobRun.getJobId();
final ActivityClusterGraph acg = jobRun.getActivityClusterGraph();
- final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = new HashMap<>(
- jobRun.getConnectorPolicyMap());
+ final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies =
+ new HashMap<>(jobRun.getConnectorPolicyMap());
INodeManager nodeManager = ccs.getNodeManager();
try {
byte[] acgBytes = predistributed ? null : JavaSerializationUtils.serialize(acg);
@@ -555,14 +554,14 @@
}
}
final JobId jobId = jobRun.getJobId();
- LOGGER.fine("Abort map for job: " + jobId + ": " + abortTaskAttemptMap);
+ LOGGER.info("Abort map for job: " + jobId + ": " + abortTaskAttemptMap);
INodeManager nodeManager = ccs.getNodeManager();
for (Map.Entry<String, List<TaskAttemptId>> entry : abortTaskAttemptMap.entrySet()) {
final NodeControllerState node = nodeManager.getNodeControllerState(entry.getKey());
final List<TaskAttemptId> abortTaskAttempts = entry.getValue();
if (node != null) {
- if (LOGGER.isLoggable(Level.FINE)) {
- LOGGER.fine("Aborting: " + abortTaskAttempts + " at " + entry.getKey());
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Aborting: " + abortTaskAttempts + " at " + entry.getKey());
}
try {
node.getNodeController().abortTasks(jobId, abortTaskAttempts);
@@ -582,6 +581,7 @@
}
private void abortDoomedTaskClusters() throws HyracksException {
+ LOGGER.log(Level.INFO, "aborting doomed task clusters");
Set<TaskCluster> doomedTaskClusters = new HashSet<>();
for (TaskCluster tc : inProgressTaskClusters) {
// Start search at TCs that produce no outputs (sinks)
@@ -590,6 +590,7 @@
}
}
+ LOGGER.log(Level.INFO, "number of doomed task clusters found = " + doomedTaskClusters.size());
for (TaskCluster tc : doomedTaskClusters) {
TaskClusterAttempt tca = findLastTaskClusterAttempt(tc);
if (tca != null) {
@@ -628,7 +629,7 @@
if ((maxState == null
|| (cPolicy.consumerWaitsForProducerToFinish() && maxState != PartitionState.COMMITTED))
&& findDoomedTaskClusters(partitionProducingTaskClusterMap.get(pid), doomedTaskClusters)) {
- doomed = true;
+ doomed = true;
}
}
if (doomed) {
@@ -663,28 +664,36 @@
/**
* Indicates that a single task attempt has encountered a failure.
- * @param ta Failed Task Attempt
- * @param exceptions exeptions thrown during the failure
+ *
+ * @param ta
+ * Failed Task Attempt
+ * @param exceptions
+ * exeptions thrown during the failure
*/
public void notifyTaskFailure(TaskAttempt ta, List<Exception> exceptions) {
try {
- LOGGER.fine("Received failure notification for TaskAttempt " + ta.getTaskAttemptId());
+ LOGGER.log(Level.INFO, "Received failure notification for TaskAttempt " + ta.getTaskAttemptId());
TaskAttemptId taId = ta.getTaskAttemptId();
TaskCluster tc = ta.getTask().getTaskCluster();
TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc);
if (lastAttempt != null && taId.getAttempt() == lastAttempt.getAttempt()) {
- LOGGER.fine("Marking TaskAttempt " + ta.getTaskAttemptId() + " as failed");
+ LOGGER.log(Level.INFO, "Marking TaskAttempt " + ta.getTaskAttemptId() + " as failed");
ta.setStatus(TaskAttempt.TaskStatus.FAILED, exceptions);
abortTaskCluster(lastAttempt, TaskClusterAttempt.TaskClusterStatus.FAILED);
abortDoomedTaskClusters();
- if (lastAttempt.getAttempt() >= jobRun.getActivityClusterGraph().getMaxReattempts() || isCancelled()) {
+ int maxReattempts = jobRun.getActivityClusterGraph().getMaxReattempts();
+ LOGGER.log(Level.INFO, "Marking TaskAttempt " + ta.getTaskAttemptId()
+ + " as failed and the number of max re-attempts = " + maxReattempts);
+ if (lastAttempt.getAttempt() >= maxReattempts || isCancelled()) {
+ LOGGER.log(Level.INFO, "Aborting the job of " + ta.getTaskAttemptId());
abortJob(exceptions);
return;
}
+ LOGGER.log(Level.INFO, "We will try to start runnable activity clusters of " + ta.getTaskAttemptId());
startRunnableActivityClusters();
} else {
- LOGGER.warning("Ignoring task failure notification: " + taId + " -- Current last attempt = "
- + lastAttempt);
+ LOGGER.warning(
+ "Ignoring task failure notification: " + taId + " -- Current last attempt = " + lastAttempt);
}
} catch (Exception e) {
abortJob(Collections.singletonList(e));
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
index 486e9c6..8f50087 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
@@ -19,6 +19,8 @@
package org.apache.hyracks.control.cc.work;
import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import org.apache.hyracks.api.dataflow.TaskAttemptId;
import org.apache.hyracks.api.job.JobId;
@@ -28,6 +30,7 @@
import org.apache.hyracks.control.cc.job.TaskAttempt;
public class TaskFailureWork extends AbstractTaskLifecycleWork {
+ private static final Logger LOGGER = Logger.getLogger(TaskFailureWork.class.getName());
private final List<Exception> exceptions;
public TaskFailureWork(ClusterControllerService ccs, JobId jobId, TaskAttemptId taId, String nodeId,
@@ -38,6 +41,7 @@
@Override
protected void performEvent(TaskAttempt ta) {
+ LOGGER.log(Level.WARNING, "Executing task failure work for " + this, exceptions.get(0));
IJobManager jobManager = ccs.getJobManager();
JobRun run = jobManager.get(jobId);
ccs.getDatasetDirectoryService().reportJobFailure(jobId, exceptions);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
index 0189735..b654d44 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
@@ -120,7 +120,7 @@
manager.reportPartitionWriteCompletion(jobId, resultSetId, partition);
}
} catch (HyracksException e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
index 92831f4..10c7415 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
@@ -229,7 +229,7 @@
try {
((FileHandle) fHandle).close();
} catch (IOException e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
@@ -242,7 +242,7 @@
try {
waf = File.createTempFile(prefix, WORKSPACE_FILE_SUFFIX, new File(dev.getMount(), waPath));
} catch (IOException e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
return dev.createFileRef(waPath + File.separator + waf.getName());
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java
index 7f5302a..4f5b556 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java
@@ -28,9 +28,6 @@
import org.apache.hyracks.control.nc.NodeControllerService;
import org.apache.hyracks.control.nc.application.NCServiceContext;
-/**
- * @author rico
- */
public class ApplicationMessageWork extends AbstractWork {
private static final Logger LOGGER = Logger.getLogger(ApplicationMessageWork.class.getName());
private byte[] message;
@@ -63,6 +60,6 @@
@Override
public String toString() {
- return getName() + ": nodeID: " + nodeId;
+ return getName() + ": nodeId: " + nodeId;
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java
index f4ee6b0..7728d16 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java
@@ -18,12 +18,16 @@
*/
package org.apache.hyracks.control.nc.work;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
import org.apache.hyracks.control.common.job.profiling.om.TaskProfile;
import org.apache.hyracks.control.common.work.AbstractWork;
import org.apache.hyracks.control.nc.NodeControllerService;
import org.apache.hyracks.control.nc.Task;
public class NotifyTaskCompleteWork extends AbstractWork {
+ private static final Logger LOGGER = Logger.getLogger(NotifyTaskCompleteWork.class.getName());
private final NodeControllerService ncs;
private final Task task;
@@ -40,8 +44,13 @@
ncs.getClusterController().notifyTaskComplete(task.getJobletContext().getJobId(), task.getTaskAttemptId(),
ncs.getId(), taskProfile);
} catch (Exception e) {
- e.printStackTrace();
+ LOGGER.log(Level.SEVERE, "Failed notifying task complete for " + task.getTaskAttemptId(), e);
}
task.getJoblet().removeTask(task);
}
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + ":" + task.getTaskAttemptId();
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
index fa8ba28..7ed2c09 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
@@ -35,7 +35,6 @@
private final Task task;
private final JobId jobId;
private final TaskAttemptId taskId;
-
private final List<Exception> exceptions;
public NotifyTaskFailureWork(NodeControllerService ncs, Task task, List<Exception> exceptions, JobId jobId,
@@ -49,6 +48,8 @@
@Override
public void run() {
+ LOGGER.log(Level.WARNING, ncs.getId() + " is sending a notification to cc that task " + taskId + " has failed",
+ exceptions.get(0));
try {
IDatasetPartitionManager dpm = ncs.getDatasetPartitionManager();
if (dpm != null) {
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/ArrayTupleBuilder.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/ArrayTupleBuilder.java
index e5f7d09..0c639db 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/ArrayTupleBuilder.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/ArrayTupleBuilder.java
@@ -104,7 +104,7 @@
fieldData.getDataOutput().writeInt(FrameConstants.FRAME_FIELD_MAGIC);
}
} catch (IOException e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
fEndOffsets[nextField++] = fieldData.getLength();
}
@@ -139,7 +139,7 @@
try {
fieldData.getDataOutput().write(bytes, start, length);
} catch (IOException e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
fEndOffsets[nextField++] = fieldData.getLength();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java
index 8ee4fa3..16c0a29 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java
@@ -84,16 +84,16 @@
@Override
public void contributeActivities(IActivityGraphBuilder builder) {
- ReplicatorMaterializerActivityNode sma = new ReplicatorMaterializerActivityNode(
- new ActivityId(odId, SPLITTER_MATERIALIZER_ACTIVITY_ID));
+ ReplicatorMaterializerActivityNode sma =
+ new ReplicatorMaterializerActivityNode(new ActivityId(odId, SPLITTER_MATERIALIZER_ACTIVITY_ID));
builder.addActivity(this, sma);
builder.addSourceEdge(0, sma, 0);
int pipelineOutputIndex = 0;
int activityId = MATERIALIZE_READER_ACTIVITY_ID;
for (int i = 0; i < outputArity; i++) {
if (outputMaterializationFlags[i]) {
- MaterializeReaderActivityNode mra = new MaterializeReaderActivityNode(
- new ActivityId(odId, activityId++));
+ MaterializeReaderActivityNode mra =
+ new MaterializeReaderActivityNode(new ActivityId(odId, activityId++));
builder.addActivity(this, mra);
builder.addBlockingEdge(sma, mra);
builder.addTargetEdge(i, mra, 0);
@@ -165,7 +165,7 @@
writers[i].close();
} catch (Throwable th) {
if (hde == null) {
- hde = new HyracksDataException(th);
+ hde = HyracksDataException.create(th);
} else {
hde.addSuppressed(th);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
index 7ff280b..6748a4d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
@@ -53,7 +53,8 @@
try {
wait();
} catch (InterruptedException e) {
- throw new HyracksDataException(e);
+ Thread.currentThread().interrupt();
+ throw HyracksDataException.create(e);
}
}
if (failed) {
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwarePartitionDataWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwarePartitionDataWriter.java
index 253b3e3..63bb72b 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwarePartitionDataWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwarePartitionDataWriter.java
@@ -130,7 +130,7 @@
appenders[i].write(pWriters[i], true);
} catch (Throwable th) {
if (closeException == null) {
- closeException = new HyracksDataException(th);
+ closeException = HyracksDataException.create(th);
} else {
closeException.addSuppressed(th);
}
@@ -139,7 +139,7 @@
pWriters[i].close();
} catch (Throwable th) {
if (closeException == null) {
- closeException = new HyracksDataException(th);
+ closeException = HyracksDataException.create(th);
} else {
closeException.addSuppressed(th);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNBroadcastConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNBroadcastConnectorDescriptor.java
index d748c8e..4246626 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNBroadcastConnectorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNBroadcastConnectorDescriptor.java
@@ -95,7 +95,7 @@
epWriters[i].close();
} catch (Throwable th) {
if (closeException == null) {
- closeException = new HyracksDataException(th);
+ closeException = HyracksDataException.create(th);
} else {
closeException.addSuppressed(th);
}
@@ -129,8 +129,8 @@
int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
BitSet expectedPartitions = new BitSet(nProducerPartitions);
expectedPartitions.set(0, nProducerPartitions);
- NonDeterministicChannelReader channelReader = new NonDeterministicChannelReader(nProducerPartitions,
- expectedPartitions);
+ NonDeterministicChannelReader channelReader =
+ new NonDeterministicChannelReader(nProducerPartitions, expectedPartitions);
NonDeterministicFrameReader frameReader = new NonDeterministicFrameReader(channelReader);
return new PartitionCollector(ctx, getConnectorId(), index, expectedPartitions, frameReader, channelReader);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FrameFileWriterOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FrameFileWriterOperatorDescriptor.java
index a34a322..b94f305 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FrameFileWriterOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FrameFileWriterOperatorDescriptor.java
@@ -66,7 +66,7 @@
try {
out.write(buffer.array());
} catch (IOException e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
@@ -79,7 +79,7 @@
try {
out.close();
} catch (IOException e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
};
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java
index 3a3f414..76ebaa4 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java
@@ -76,8 +76,8 @@
final FileSplit[] splits = fileSplitProvider.getFileSplits();
IIOManager ioManager = ctx.getIoManager();
// Frame accessor
- final FrameTupleAccessor frameTupleAccessor = new FrameTupleAccessor(
- recordDescProvider.getInputRecordDescriptor(getActivityId(), 0));
+ final FrameTupleAccessor frameTupleAccessor =
+ new FrameTupleAccessor(recordDescProvider.getInputRecordDescriptor(getActivityId(), 0));
// Record descriptor
final RecordDescriptor recordDescriptor = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
return new AbstractUnaryInputSinkOperatorNodePushable() {
@@ -131,7 +131,7 @@
try {
out.close();
} catch (IOException e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
};
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/map/ReflectionBasedDeserializedMapperFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/map/ReflectionBasedDeserializedMapperFactory.java
index e7e084c..fa812cf 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/map/ReflectionBasedDeserializedMapperFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/map/ReflectionBasedDeserializedMapperFactory.java
@@ -33,10 +33,8 @@
public IDeserializedMapper createMapper() throws HyracksDataException {
try {
return mapperClass.newInstance();
- } catch (InstantiationException e) {
- throw new HyracksDataException(e);
- } catch (IllegalAccessException e) {
- throw new HyracksDataException(e);
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
}
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-1.x/src/main/java/org/apache/hyracks/hdfs/ContextFactory.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-1.x/src/main/java/org/apache/hyracks/hdfs/ContextFactory.java
index 2dbcf83..ecee076 100644
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-1.x/src/main/java/org/apache/hyracks/hdfs/ContextFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-1.x/src/main/java/org/apache/hyracks/hdfs/ContextFactory.java
@@ -24,7 +24,6 @@
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
-
import org.apache.hyracks.api.exceptions.HyracksDataException;
/**
@@ -37,7 +36,7 @@
try {
return new Mapper().new Context(conf, tid, null, null, null, null, null);
} catch (Exception e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
@@ -46,7 +45,7 @@
TaskAttemptID tid = new TaskAttemptID("", 0, true, partition, 0);
return new TaskAttemptContext(conf, tid);
} catch (Exception e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-2.x/src/main/java/org/apache/hyracks/hdfs/ContextFactory.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-2.x/src/main/java/org/apache/hyracks/hdfs/ContextFactory.java
index dc6ca4e..96b0a23 100644
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-2.x/src/main/java/org/apache/hyracks/hdfs/ContextFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-2.x/src/main/java/org/apache/hyracks/hdfs/ContextFactory.java
@@ -26,7 +26,6 @@
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-
import org.apache.hyracks.api.exceptions.HyracksDataException;
/**
@@ -38,7 +37,7 @@
try {
return new TaskAttemptContextImpl(conf, tid);
} catch (Exception e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
@@ -47,7 +46,7 @@
TaskAttemptID tid = new TaskAttemptID("", 0, TaskType.REDUCE, partition, 0);
return new TaskAttemptContextImpl(conf, tid);
} catch (Exception e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/dataflow/ConfFactory.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/dataflow/ConfFactory.java
index 3840167..33b84dc 100644
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/dataflow/ConfFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/dataflow/ConfFactory.java
@@ -39,7 +39,7 @@
confBytes = bos.toByteArray();
dos.close();
} catch (Exception e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
@@ -51,7 +51,7 @@
dis.close();
return conf;
} catch (Exception e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
index e7a3111..dcf4508 100644
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
@@ -27,7 +27,6 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
-
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -124,7 +123,7 @@
tupleWriter.close(dos);
dos.close();
} catch (Exception e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
} finally {
Thread.currentThread().setContextClassLoader(ctxCL);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/dataflow/InputSplitsFactory.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/dataflow/InputSplitsFactory.java
index 927eb54..5cc09b7 100644
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/dataflow/InputSplitsFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/dataflow/InputSplitsFactory.java
@@ -103,7 +103,7 @@
dis.close();
return splits;
} catch (Exception e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/ConfFactory.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/ConfFactory.java
index 0e49fef..b02a97b 100644
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/ConfFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/ConfFactory.java
@@ -25,7 +25,6 @@
import java.io.Serializable;
import org.apache.hadoop.mapreduce.Job;
-
import org.apache.hyracks.api.exceptions.HyracksDataException;
public class ConfFactory implements Serializable {
@@ -40,7 +39,7 @@
confBytes = bos.toByteArray();
dos.close();
} catch (Exception e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
@@ -52,7 +51,7 @@
dis.close();
return conf;
} catch (Exception e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/FileSplitsFactory.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/FileSplitsFactory.java
index ef95ee8..682b49a 100644
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/FileSplitsFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/FileSplitsFactory.java
@@ -30,7 +30,6 @@
import java.util.List;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-
import org.apache.hyracks.api.exceptions.HyracksDataException;
@SuppressWarnings("rawtypes")
@@ -96,7 +95,7 @@
ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
DataInputStream dis = new DataInputStream(bis);
int size = dis.readInt();
- List<FileSplit> splits = new ArrayList<FileSplit>();
+ List<FileSplit> splits = new ArrayList<>();
for (int i = 0; i < size; i++) {
splits.add((FileSplit) defaultConstructor.newInstance());
splits.get(i).readFields(dis);
@@ -104,7 +103,7 @@
dis.close();
return splits;
} catch (Exception e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
index c27e4ec..1f163ba 100644
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
@@ -27,7 +27,6 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -123,7 +122,7 @@
tupleWriter.close(dos);
dos.close();
} catch (Exception e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
} finally {
Thread.currentThread().setContextClassLoader(ctxCL);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
index d9ab210..dea48bd 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
@@ -74,6 +74,7 @@
} catch (IOException e) {
throw new IPCException(e);
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
throw new IPCException(e);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
index b358f07..07d07c3 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
@@ -215,7 +215,7 @@
}
} catch (Throwable th) {
writer.fail();
- closeException = new HyracksDataException(th);
+ closeException = HyracksDataException.create(th);
}
}
@@ -223,7 +223,7 @@
cursor.close();
} catch (Throwable th) {
if (closeException == null) {
- closeException = new HyracksDataException(th);
+ closeException = HyracksDataException.create(th);
} else {
closeException.addSuppressed(th);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/MutableArrayValueReference.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/MutableArrayValueReference.java
index a19e69a..157450a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/MutableArrayValueReference.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/MutableArrayValueReference.java
@@ -18,6 +18,8 @@
*/
package org.apache.hyracks.storage.am.common.freepage;
+import java.nio.charset.StandardCharsets;
+
import org.apache.hyracks.data.std.api.IValueReference;
public class MutableArrayValueReference implements IValueReference {
@@ -46,4 +48,9 @@
return array == null ? 0 : array.length;
}
+ @Override
+ public String toString() {
+ return new String(array, StandardCharsets.UTF_8);
+ }
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
index eb8ec92..33bb60e 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
@@ -63,4 +63,9 @@
public int getFileReferenceCount() {
return btree.getBufferCache().getFileReferenceCount(btree.getFileId());
}
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + ":" + btree.getFileReference().getRelativePath();
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyDiskComponent.java
index 57b9092..0ba7c30 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyDiskComponent.java
@@ -74,4 +74,8 @@
return btree.getBufferCache().getFileReferenceCount(btree.getFileId());
}
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + ":" + btree.getFileReference().getRelativePath();
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 8ff907a..b1005bd 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -221,6 +221,7 @@
* See PrefixMergePolicy.isMergeLagging() for more details.
*/
if (opType == LSMOperationType.FLUSH) {
+ opTracker.notifyAll();
while (mergePolicy.isMergeLagging(lsmIndex)) {
try {
opTracker.wait();
@@ -672,16 +673,27 @@
}
/***
- * Ensures the index is in a modifiable state
- * @throws HyracksDataException if the index is not in a modifiable state
+ * Ensures the index is in a modifiable state (no failed flushes)
+ *
+ * @throws HyracksDataException
+ * if the index is not in a modifiable state
*/
private void ensureIndexModifiable() throws HyracksDataException {
+ // if current memory component has a flush request, it means that flush didn't start for it
+ if (lsmIndex.hasFlushRequestForCurrentMutableComponent()) {
+ return;
+ }
// find if there is any memory component which is in a writable state or eventually will be in a writable state
for (ILSMMemoryComponent memoryComponent : lsmIndex.getMemoryComponents()) {
switch (memoryComponent.getState()) {
case INACTIVE:
+ // will be activated on next modification
+ case UNREADABLE_UNWRITABLE:
+ // flush completed successfully but readers are still inside
case READABLE_WRITABLE:
+ // writable
case READABLE_UNWRITABLE_FLUSHING:
+ // flush is ongoing
return;
default:
// continue to the next component
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/utils/ComponentMetadataUtil.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/utils/ComponentMetadataUtil.java
index 6ccbc8d..40017d1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/utils/ComponentMetadataUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/utils/ComponentMetadataUtil.java
@@ -19,6 +19,8 @@
package org.apache.hyracks.storage.am.lsm.common.utils;
import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.IPointable;
@@ -32,8 +34,8 @@
public class ComponentMetadataUtil {
- public static final MutableArrayValueReference MARKER_LSN_KEY =
- new MutableArrayValueReference("Marker".getBytes());
+ private static final Logger LOGGER = Logger.getLogger(ComponentMetadataUtil.class.getName());
+ public static final MutableArrayValueReference MARKER_LSN_KEY = new MutableArrayValueReference("Marker".getBytes());
public static final long NOT_FOUND = -1L;
private ComponentMetadataUtil() {
@@ -71,16 +73,28 @@
* @throws HyracksDataException
*/
public static void get(ILSMIndex index, IValueReference key, IPointable pointable) throws HyracksDataException {
+ LOGGER.log(Level.INFO, "Getting " + key + " from index " + index);
// Lock the opTracker to ensure index components don't change
synchronized (index.getOperationTracker()) {
index.getCurrentMemoryComponent().getMetadata().get(key, pointable);
if (pointable.getLength() == 0) {
+ LOGGER.log(Level.INFO, key + " was not found in mutable memory component of " + index);
// was not found in the in current mutable component, search in the other in memory components
fromImmutableMemoryComponents(index, key, pointable);
if (pointable.getLength() == 0) {
+ LOGGER.log(Level.INFO, key + " was not found in all immmutable memory components of " + index);
// was not found in the in all in memory components, search in the disk components
fromDiskComponents(index, key, pointable);
+ if (pointable.getLength() == 0) {
+ LOGGER.log(Level.INFO, key + " was not found in all disk components of " + index);
+ } else {
+ LOGGER.log(Level.INFO, key + " was found in disk components of " + index);
+ }
+ } else {
+ LOGGER.log(Level.INFO, key + " was found in the immutable memory components of " + index);
}
+ } else {
+ LOGGER.log(Level.INFO, key + " was found in mutable memory component of " + index);
}
}
}
@@ -105,7 +119,9 @@
private static void fromDiskComponents(ILSMIndex index, IValueReference key, IPointable pointable)
throws HyracksDataException {
+ LOGGER.log(Level.INFO, "Getting " + key + " from disk components of " + index);
for (ILSMDiskComponent c : index.getImmutableComponents()) {
+ LOGGER.log(Level.INFO, "Getting " + key + " from disk components " + c);
c.getMetadata().get(key, pointable);
if (pointable.getLength() != 0) {
// Found
@@ -115,10 +131,13 @@
}
private static void fromImmutableMemoryComponents(ILSMIndex index, IValueReference key, IPointable pointable) {
+ LOGGER.log(Level.INFO, "Getting " + key + " from immutable memory components of " + index);
List<ILSMMemoryComponent> memComponents = index.getMemoryComponents();
int numOtherMemComponents = memComponents.size() - 1;
int next = index.getCurrentMemoryComponentIndex();
+ LOGGER.log(Level.INFO, index + " has " + numOtherMemComponents + " immutable memory components");
for (int i = 0; i < numOtherMemComponents; i++) {
+ LOGGER.log(Level.INFO, "trying to get " + key + " from immutable memory components number: " + (i + 1));
next = next - 1;
if (next < 0) {
next = memComponents.size() - 1;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
index f2b3284..2470a39 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
@@ -75,4 +75,9 @@
public int getFileReferenceCount() {
return deletedKeysBTree.getBufferCache().getFileReferenceCount(deletedKeysBTree.getFileId());
}
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + ":" + ((OnDiskInvertedIndex) invIndex).getInvListsFile().getRelativePath();
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
index d89a31a..9332302 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
@@ -43,8 +43,8 @@
import org.apache.hyracks.storage.am.common.api.IPageManagerFactory;
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
-import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexAccessor;
import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInPlaceInvertedIndex;
+import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexAccessor;
import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearcher;
import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedListBuilder;
import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
@@ -292,7 +292,7 @@
output.writeInt(invListBuilder.getListSize());
btreeTupleBuilder.addFieldEndOffset();
} catch (IOException e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
// Reset tuple reference and add it into the BTree load.
btreeTupleReference.reset(btreeTupleBuilder.getFieldEndOffsets(), btreeTupleBuilder.getByteArray());
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java
index 982f89b..54ef122 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java
@@ -76,4 +76,9 @@
public int getFileReferenceCount() {
return rtree.getBufferCache().getFileReferenceCount(rtree.getFileId());
}
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + ":" + rtree.getFileReference().getRelativePath();
+ }
}