fixed issue related to retrieval of information for active feeds post cluster shutdown
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
index ade794e..b806be0 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
@@ -18,8 +18,10 @@
import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
@@ -84,7 +86,6 @@
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleReference;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -1369,6 +1370,7 @@
public List<FeedActivity> getActiveFeeds(JobId jobId, String dataverse, String dataset) throws MetadataException,
RemoteException {
List<FeedActivity> activeFeeds = new ArrayList<FeedActivity>();
+ Map<FeedConnectionId, FeedActivity> aFeeds = new HashMap<FeedConnectionId, FeedActivity>();
try {
ITupleReference searchKey = createTuple();
FeedActivityTupleTranslator tupleReaderWriter = new FeedActivityTupleTranslator(true);
@@ -1389,14 +1391,21 @@
case FEED_RESUME:
case FEED_BEGIN:
if (!terminatedFeeds.contains(fid)) {
- activeFeeds.add(fa);
+ if (aFeeds.get(fid) == null) {
+ aFeeds.put(fid, fa);
+ }
}
break;
case FEED_END:
terminatedFeeds.add(fid);
break;
+ default: //ignore
}
}
+ for (FeedActivity f : aFeeds.values()) {
+ System.out.println("ACTIVE FEEDS " + f.getFeedName());
+ activeFeeds.add(f);
+ }
return activeFeeds;
} catch (Exception e) {
throw new MetadataException(e);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java
index c4f84e5..639d5d1 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java
@@ -98,16 +98,36 @@
if (!(other instanceof FeedActivity)) {
return false;
}
- FeedActivity otherDataset = (FeedActivity) other;
- if (!otherDataset.dataverseName.equals(dataverseName)) {
+
+ if (!((FeedActivity) other).dataverseName.equals(dataverseName)) {
return false;
}
- if (!otherDataset.datasetName.equals(datasetName)) {
+ if (!((FeedActivity) other).datasetName.equals(datasetName)) {
return false;
}
+ if (!((FeedActivity) other).getFeedName().equals(feedName)) {
+ return false;
+ }
+ if (!((FeedActivity) other).getFeedActivityType().equals(activityType)) {
+ return false;
+ }
+ if (((FeedActivity) other).getActivityId() != (activityId)) {
+ return false;
+ }
+
return true;
}
+ @Override
+ public int hashCode() {
+ return toString().hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return dataverseName + "." + feedName + " --> " + datasetName + " " + activityType + " " + activityId;
+ }
+
public FeedActivityType getFeedActivityType() {
return activityType;
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java
index c84560a..3a481f3 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java
@@ -9,13 +9,16 @@
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import edu.uci.ics.asterix.metadata.feeds.FeedRuntime.FeedRuntimeType;
+import edu.uci.ics.asterix.metadata.feeds.SuperFeedManager.FeedReportMessageType;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
public class FeedFrameWriter implements IFrameWriter {
@@ -53,6 +56,8 @@
private ExecutorService executorService;
+ private FrameTupleAccessor fta;
+
public enum Mode {
FORWARD,
STORE
@@ -60,7 +65,7 @@
public FeedFrameWriter(IFrameWriter writer, IOperatorNodePushable nodePushable, FeedConnectionId feedId,
FeedPolicyEnforcer policyEnforcer, String nodeId, FeedRuntimeType feedRuntimeType, int partition,
- ExecutorService executorService) {
+ ExecutorService executorService, FrameTupleAccessor fta) {
this.writer = writer;
this.mode = Mode.FORWARD;
this.nodePushable = nodePushable;
@@ -76,11 +81,11 @@
executorService.execute(task);
sfm = FeedManager.INSTANCE.getSuperFeedManager(feedId);
framePushWait = new FramePushWait(nodePushable, FLUSH_THRESHOLD_TIME, sfm, feedId, nodeId, feedRuntimeType,
- partition);
+ partition, FLUSH_THRESHOLD_TIME);
Timer timer = new Timer();
timer.scheduleAtFixedRate(framePushWait, 0, FLUSH_THRESHOLD_TIME);
}
-
+ this.fta = fta;
}
public Mode getMode() {
@@ -128,9 +133,10 @@
case FORWARD:
try {
if (collectStatistics) {
+ fta.reset(buffer);
framePushWait.notifyStart();
writer.nextFrame(buffer);
- framePushWait.notifyFinish();
+ framePushWait.notifyFinish(fta.getTupleCount());
} else {
writer.nextFrame(buffer);
}
@@ -171,9 +177,12 @@
private String nodeId;
private FeedRuntimeType feedRuntimeType;
private int partition;
+ private AtomicLong numTuplesInInterval = new AtomicLong(0);
+ private long period;
+ private boolean collectThroughput;
public FramePushWait(IOperatorNodePushable nodePushable, long flushThresholdTime, SuperFeedManager sfm,
- FeedConnectionId feedId, String nodeId, FeedRuntimeType feedRuntimeType, int partition) {
+ FeedConnectionId feedId, String nodeId, FeedRuntimeType feedRuntimeType, int partition, long period) {
this.nodePushable = nodePushable;
this.flushThresholdTime = flushThresholdTime;
this.state = State.INTIALIZED;
@@ -182,15 +191,19 @@
this.nodeId = nodeId;
this.feedRuntimeType = feedRuntimeType;
this.partition = partition;
+ this.period = period;
+ this.collectThroughput = feedRuntimeType.equals(FeedRuntimeType.INGESTION);
}
public void notifyStart() {
startTime = System.currentTimeMillis();
state = State.WAITING_FOR_FLUSH_COMPLETION;
+
}
- public void notifyFinish() {
+ public void notifyFinish(int numTuples) {
state = State.WAITNG_FOR_NEXT_FRAME;
+ numTuplesInInterval.set(numTuplesInInterval.get() + numTuples);
}
@Override
@@ -204,12 +217,29 @@
reportCongestionToSFM(currentTime - startTime);
}
}
+ if (collectThroughput) {
+ int instantTput = (int) ((numTuplesInInterval.get() * 1000) / period);
+ System.out.println("Instantaneous throughput " + instantTput + " (" + feedRuntimeType + "[" + partition
+ + "]" + ")");
+ reportThroughputToSFM(instantTput);
+ }
+ numTuplesInInterval.set(0);
}
private void reportCongestionToSFM(long waitingTime) {
+ sendReportToSFM(waitingTime, FeedReportMessageType.CONGESTION);
+ }
+
+ private void reportThroughputToSFM(long throughput) {
+ sendReportToSFM(throughput, FeedReportMessageType.THROUGHPUT);
+ }
+
+ private void sendReportToSFM(long value, SuperFeedManager.FeedReportMessageType mesgType) {
String feedRep = feedId.getDataverse() + ":" + feedId.getFeedName() + ":" + feedId.getDatasetName();
String operator = "" + feedRuntimeType;
- String mesg = feedRep + "|" + operator + "|" + partition + "|" + waitingTime + "|" + EOL;
+ String mesg = mesgType.name().toLowerCase() + "|" + feedRep + "|" + operator + "|" + partition + "|"
+ + value + "|" + EOL;
+
Socket sc = null;
try {
while (sfm == null) {
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
index 859990c..8f3a9b6 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
@@ -25,6 +25,7 @@
import edu.uci.ics.asterix.metadata.feeds.FeedRuntime.FeedRuntimeType;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
/**
@@ -42,6 +43,7 @@
private final FeedPolicyEnforcer policyEnforcer;
private FeedRuntime ingestionRuntime;
private final String nodeId;
+ private FrameTupleAccessor fta;
public FeedIntakeOperatorNodePushable(IHyracksTaskContext ctx, FeedConnectionId feedId, IFeedAdapter adapter,
Map<String, String> feedPolicy, int partition, IngestionRuntime ingestionRuntime) {
@@ -53,19 +55,20 @@
this.feedPolicy = feedPolicy;
policyEnforcer = new FeedPolicyEnforcer(feedId, feedPolicy);
nodeId = ctx.getJobletContext().getApplicationContext().getNodeId();
+ fta = new FrameTupleAccessor(ctx.getFrameSize(), recordDesc);
+
}
@Override
public void initialize() throws HyracksDataException {
AdapterRuntimeManager adapterRuntimeMgr = null;
- System.out.println("FEED INGESTION RUNTIME CALLED FOR " + partition);
try {
if (ingestionRuntime == null) {
ingestionRuntime = new IngestionRuntime(feedId, partition, FeedRuntimeType.INGESTION, adapterRuntimeMgr);
ExecutorService executorService = FeedManager.INSTANCE.registerFeedRuntime(ingestionRuntime);
FeedFrameWriter mWriter = new FeedFrameWriter(writer, this, feedId, policyEnforcer, nodeId,
- FeedRuntimeType.INGESTION, partition, executorService);
+ FeedRuntimeType.INGESTION, partition, executorService, fta);
adapterRuntimeMgr = new AdapterRuntimeManager(feedId, adapter, mWriter, partition, inbox);
if (adapter instanceof AbstractFeedDatasourceAdapter) {
@@ -75,7 +78,6 @@
LOGGER.info("Beginning new feed:" + feedId);
}
mWriter.open();
- System.out.println("STARTING FEED INGESTION RUNTIME FOR " + partition);
adapterRuntimeMgr.start();
} else {
adapterRuntimeMgr = ((IngestionRuntime) ingestionRuntime).getAdapterRuntimeManager();
@@ -85,7 +87,6 @@
adapter = adapterRuntimeMgr.getFeedAdapter();
writer.open();
adapterRuntimeMgr.getAdapterExecutor().setWriter(writer);
- System.out.println("RESUMED FEED INGESTION RUNTIME FOR " + partition);
adapterRuntimeMgr.setState(State.ACTIVE_INGESTION);
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
index 261585d..cdc4d2a 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
@@ -2,8 +2,6 @@
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -16,8 +14,10 @@
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeDuplicateKeyException;
@@ -69,6 +69,7 @@
private boolean resumeOldState;
private ExecutorService feedExecService;
private String nodeId;
+ private FrameTupleAccessor fta;
public FeedMetaNodePushable(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
int partition, int nPartitions, IOperatorDescriptor coreOperator, FeedConnectionId feedConnectionId,
@@ -80,6 +81,7 @@
this.runtimeType = runtimeType;
this.feedId = feedConnectionId;
this.nodeId = ctx.getJobletContext().getApplicationContext().getNodeId();
+ fta = new FrameTupleAccessor(ctx.getFrameSize(), recordDesc);
}
@Override
@@ -102,7 +104,7 @@
resumeOldState = true;
}
FeedFrameWriter mWriter = new FeedFrameWriter(writer, this, feedId, policyEnforcer, nodeId, runtimeType,
- partition, feedExecService);
+ partition, feedExecService, fta);
coreOperatorNodePushable.setOutputFrameWriter(0, mWriter, recordDesc);
coreOperatorNodePushable.open();
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java
index dc484a7..2f499e6 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java
@@ -48,6 +48,11 @@
private boolean isLocal = false;
+ public enum FeedReportMessageType {
+ CONGESTION,
+ THROUGHPUT
+ }
+
public SuperFeedManager(FeedConnectionId feedId, String nodeId, int port) throws Exception {
this.feedConnectionId = feedId;
this.nodeId = nodeId;
@@ -157,9 +162,15 @@
while (true) {
try {
String message = messages.take();
- String[] msgComp = message.split("|");
+ String[] messageComponents = message.split("|");
+ SuperFeedManager.FeedReportMessageType mesgType = FeedReportMessageType
+ .valueOf(messageComponents[0]);
+ switch (mesgType) {
+ case THROUGHPUT:
+ case CONGESTION:
+ }
if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Congestion Reported" + message);
+ LOGGER.warning(message);
}
} catch (InterruptedException ie) {
throw ie;