checkpoint
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index ed0cab1..b519a39 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -33,7 +33,6 @@
import edu.uci.ics.asterix.metadata.MetadataManager;
import edu.uci.ics.asterix.metadata.api.IAsterixStateProxy;
import edu.uci.ics.asterix.metadata.bootstrap.AsterixStateProxy;
-import edu.uci.ics.asterix.metadata.feeds.FeedLifecycleListener;
import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
import edu.uci.ics.hyracks.api.application.ICCApplicationEntryPoint;
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterLifecycleListener.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
index cebc710..0e22cc0 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
@@ -82,6 +82,7 @@
if (!work.isEmpty()) {
executeWorkSet(work);
}
+
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedLifecycleListener.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
similarity index 75%
rename from asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedLifecycleListener.java
rename to asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
index c436fde..8e01f09 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedLifecycleListener.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
@@ -12,8 +12,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.asterix.metadata.feeds;
+package edu.uci.ics.asterix.hyracks.bootstrap;
+import java.io.PrintWriter;
import java.io.Serializable;
import java.rmi.RemoteException;
import java.util.ArrayList;
@@ -29,7 +30,15 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import edu.uci.ics.asterix.api.common.APIFramework.DisplayFormat;
+import edu.uci.ics.asterix.api.common.SessionConfig;
+import edu.uci.ics.asterix.aql.base.Statement;
+import edu.uci.ics.asterix.aql.expression.BeginFeedStatement;
+import edu.uci.ics.asterix.aql.expression.DataverseDecl;
+import edu.uci.ics.asterix.aql.expression.Identifier;
+import edu.uci.ics.asterix.aql.translator.AqlTranslator;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.hyracks.bootstrap.FeedLifecycleListener.FeedFailure.FailureType;
import edu.uci.ics.asterix.metadata.MetadataException;
import edu.uci.ics.asterix.metadata.MetadataManager;
import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
@@ -39,9 +48,13 @@
import edu.uci.ics.asterix.metadata.cluster.ClusterManager;
import edu.uci.ics.asterix.metadata.cluster.IClusterManagementWorkResponse;
import edu.uci.ics.asterix.metadata.entities.FeedActivity;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityDetails;
import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
-import edu.uci.ics.asterix.metadata.feeds.FeedLifecycleListener.FeedFailure.FailureType;
+import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies;
+import edu.uci.ics.asterix.metadata.feeds.FeedId;
+import edu.uci.ics.asterix.metadata.feeds.FeedIntakeOperatorDescriptor;
import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
+import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
import edu.uci.ics.asterix.om.util.AsterixClusterProperties.State;
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
@@ -70,6 +83,7 @@
private LinkedBlockingQueue<Message> jobEventInbox;
private LinkedBlockingQueue<IClusterManagementWorkResponse> responseInbox;
+ private State state;
private FeedLifecycleListener() {
jobEventInbox = new LinkedBlockingQueue<Message>();
@@ -80,6 +94,7 @@
new Thread(feedJobNotificationHandler).start();
new Thread(feedWorkRequestResponseHandler).start();
ClusterManager.INSTANCE.registerSubscriber(this);
+ state = AsterixClusterProperties.INSTANCE.getState();
}
private final FeedJobNotificationHandler feedJobNotificationHandler;
@@ -107,16 +122,19 @@
JobSpecification spec = acggf.getJobSpecification();
boolean feedIngestionJob = false;
FeedId feedId = null;
+ String feedPolicy = null;
for (IOperatorDescriptor opDesc : spec.getOperatorMap().values()) {
if (!(opDesc instanceof FeedIntakeOperatorDescriptor)) {
continue;
}
feedId = ((FeedIntakeOperatorDescriptor) opDesc).getFeedId();
+ feedPolicy = ((FeedIntakeOperatorDescriptor) opDesc).getFeedPolicy().get(
+ BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY);
feedIngestionJob = true;
break;
}
if (feedIngestionJob) {
- feedJobNotificationHandler.registerFeed(feedId, jobId, spec);
+ feedJobNotificationHandler.registerFeed(feedId, jobId, spec, feedPolicy);
}
}
@@ -168,11 +186,11 @@
return registeredFeeds.containsKey(jobId);
}
- public void registerFeed(FeedId feedId, JobId jobId, JobSpecification jobSpec) {
+ public void registerFeed(FeedId feedId, JobId jobId, JobSpecification jobSpec, String feedPolicy) {
if (registeredFeeds.containsKey(jobId)) {
throw new IllegalStateException(" Feed already registered ");
}
- registeredFeeds.put(jobId, new FeedInfo(feedId, jobSpec));
+ registeredFeeds.put(jobId, new FeedInfo(feedId, jobSpec, feedPolicy));
}
@Override
@@ -249,6 +267,7 @@
feedActivityDetails.put(FeedActivity.FeedActivityDetails.INGEST_LOCATIONS, ingestLocs.toString());
feedActivityDetails.put(FeedActivity.FeedActivityDetails.COMPUTE_LOCATIONS, computeLocs.toString());
+ feedActivityDetails.put(FeedActivity.FeedActivityDetails.FEED_POLICY_NAME, feedInfo.feedPolicy);
FeedActivity feedActivity = new FeedActivity(feedInfo.feedId.getDataverse(),
feedInfo.feedId.getDataset(), FeedActivityType.FEED_BEGIN, feedActivityDetails);
@@ -316,10 +335,12 @@
public List<String> ingestLocations = new ArrayList<String>();
public List<String> computeLocations = new ArrayList<String>();
public JobInfo jobInfo;
+ public String feedPolicy;
- public FeedInfo(FeedId feedId, JobSpecification jobSpec) {
+ public FeedInfo(FeedId feedId, JobSpecification jobSpec, String feedPolicy) {
this.feedId = feedId;
this.jobSpec = jobSpec;
+ this.feedPolicy = feedPolicy;
}
}
@@ -405,7 +426,26 @@
@Override
public Set<IClusterManagementWork> notifyNodeJoin(String joinedNodeId) {
- // TODO Auto-generated method stub
+ State newState = AsterixClusterProperties.INSTANCE.getState();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info(joinedNodeId + " joined the cluster. " + "Asterix state: " + newState);
+ }
+ if (!newState.equals(state)) {
+ if (newState == State.ACTIVE) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info(joinedNodeId + " Resuming loser feeds (if any)");
+ }
+ try {
+ FeedsActivator activator = new FeedsActivator();
+ (new Thread(activator)).start();
+ } catch (Exception e) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Exception in resuming feeds" + e.getMessage());
+ }
+ }
+ }
+ state = newState;
+ }
return null;
}
@@ -422,24 +462,84 @@
@Override
public void notifyStateChange(State previousState, State newState) {
- switch(newState){
- case ACTIVE:
- if(previousState.equals((State.UNUSABLE)){
- resumeActiveFeeds();
- }
- break;
- }
-
+ switch (newState) {
+ case ACTIVE:
+ if (previousState.equals(State.UNUSABLE)) {
+ try {
+ FeedsActivator activator = new FeedsActivator();
+ (new Thread(activator)).start();
+ } catch (Exception e) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Exception in resuming feeds" + e.getMessage());
+ }
+ }
+ }
+ break;
+ }
+
}
- private void resumeActiveFeeds() {
- MetadataTransactionContext ctx = null;
- try {
- ctx = MetadataManager.INSTANCE.beginTransaction();
- MetadataManager.INSTANCE.getActiveFeeds(ctx);
- } catch (Exception e) {
+ private static class FeedsActivator implements Runnable {
+
+ @Override
+ public void run() {
+ MetadataTransactionContext ctx = null;
+
+ SessionConfig pc = new SessionConfig(true, false, false, false, false, false, true, false);
+ PrintWriter writer = new PrintWriter(System.out, true);
+ try {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Attempting to Resume feeds!!!!!!");
+ }
+ Thread.sleep(2000);
+ MetadataManager.INSTANCE.init();
+ ctx = MetadataManager.INSTANCE.beginTransaction();
+ List<FeedActivity> activeFeeds = MetadataManager.INSTANCE.getActiveFeeds(ctx);
+ MetadataManager.INSTANCE.commitTransaction(ctx);
+ for (FeedActivity fa : activeFeeds) {
+
+ String feedPolicy = fa.getFeedActivityDetails().get(FeedActivityDetails.FEED_POLICY_NAME);
+ String dataverse = fa.getDataverseName();
+ String datasetName = fa.getDatasetName();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Resuming loser feed: " + dataverse + ":" + datasetName + " using policy "
+ + feedPolicy);
+ }
+ try {
+ DataverseDecl dataverseDecl = new DataverseDecl(new Identifier(dataverse));
+ BeginFeedStatement stmt = new BeginFeedStatement(new Identifier(dataverse), new Identifier(
+ datasetName), feedPolicy, 0);
+ List<Statement> statements = new ArrayList<Statement>();
+ statements.add(dataverseDecl);
+ statements.add(stmt);
+ AqlTranslator translator = new AqlTranslator(statements, writer, pc, DisplayFormat.TEXT);
+ translator.compileAndExecute(AsterixAppContextInfo.getInstance().getHcc(), null, false);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Resumed feed: " + dataverse + ":" + datasetName + " using policy "
+ + feedPolicy);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Exception in resuming loser feed: " + dataverse + ":" + datasetName
+ + " using policy " + feedPolicy + " Exception " + e.getMessage());
+ }
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ try {
+ MetadataManager.INSTANCE.abortTransaction(ctx);
+ } catch (Exception e1) {
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.severe("Exception in aborting" + e.getMessage());
+ }
+ throw new IllegalStateException(e1);
+ }
+ }
}
+
}
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedWorkRequestResponseHandler.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedWorkRequestResponseHandler.java
similarity index 95%
rename from asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedWorkRequestResponseHandler.java
rename to asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedWorkRequestResponseHandler.java
index 8f9beb7..8a889c9 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedWorkRequestResponseHandler.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedWorkRequestResponseHandler.java
@@ -1,24 +1,23 @@
-package edu.uci.ics.asterix.metadata.feeds;
+package edu.uci.ics.asterix.hyracks.bootstrap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
-import java.util.Map.Entry;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
+import edu.uci.ics.asterix.hyracks.bootstrap.FeedLifecycleListener.FeedFailure;
+import edu.uci.ics.asterix.hyracks.bootstrap.FeedLifecycleListener.FeedFailureReport;
+import edu.uci.ics.asterix.hyracks.bootstrap.FeedLifecycleListener.FeedInfo;
import edu.uci.ics.asterix.metadata.api.IClusterManagementWork;
import edu.uci.ics.asterix.metadata.cluster.AddNodeWork;
import edu.uci.ics.asterix.metadata.cluster.AddNodeWorkResponse;
import edu.uci.ics.asterix.metadata.cluster.IClusterManagementWorkResponse;
-import edu.uci.ics.asterix.metadata.feeds.FeedLifecycleListener.FeedFailure;
-import edu.uci.ics.asterix.metadata.feeds.FeedLifecycleListener.FeedFailureReport;
-import edu.uci.ics.asterix.metadata.feeds.FeedLifecycleListener.FeedInfo;
-import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
import edu.uci.ics.hyracks.api.constraints.Constraint;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
import edu.uci.ics.hyracks.api.constraints.expressions.ConstantExpression;
diff --git a/asterix-events/src/main/java/edu/uci/ics/asterix/event/error/VerificationUtil.java b/asterix-events/src/main/java/edu/uci/ics/asterix/event/error/VerificationUtil.java
index c643c42..b329171 100644
--- a/asterix-events/src/main/java/edu/uci/ics/asterix/event/error/VerificationUtil.java
+++ b/asterix-events/src/main/java/edu/uci/ics/asterix/event/error/VerificationUtil.java
@@ -99,7 +99,7 @@
// instance.setState(State.UNUSABLE);
}
if (!(instance.getState().equals(State.UNUSABLE))) {
- // instance.setState(State.ACTIVE);
+ instance.setState(State.ACTIVE);
}
} else {
if (state.getProcesses() != null && state.getProcesses().size() > 0) {
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
index 5618ef8..1854245 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
@@ -60,7 +60,8 @@
private ConfFactory confFactory;
private IAType atype;
private boolean configured = false;
- public static Scheduler hdfsScheduler = initializeHDFSScheduler();
+ public static Scheduler hdfsScheduler;
+ private static boolean initialized = false;
private static Scheduler initializeHDFSScheduler() {
ICCContext ccContext = AsterixAppContextInfo.getInstance().getCCApplicationContext().getCCContext();
@@ -124,6 +125,10 @@
@Override
public void configure(Map<String, Object> configuration) throws Exception {
+ if (!initialized) {
+ hdfsScheduler = initializeHDFSScheduler();
+ initialized = true;
+ }
this.configuration = configuration;
JobConf conf = configureJobConf(configuration);
confFactory = new ConfFactory(conf);
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
index 534b495..e3deba9 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
@@ -79,11 +79,14 @@
while (continueIngestion) {
tupleBuilder.reset();
try {
+ System.out.println("requesting next tuple");
inflowState = pullBasedFeedClient.nextTuple(tupleBuilder.getDataOutput());
switch (inflowState) {
case DATA_AVAILABLE:
tupleBuilder.addFieldEndOffset();
+ System.out.println("appending tuple");
appendTupleToFrame(writer);
+ System.out.println("appended tuple");
tupleCount++;
break;
case NO_MORE_DATA:
@@ -103,6 +106,7 @@
}
} catch (Exception failureException) {
try {
+ failureException.printStackTrace();
boolean continueIngestion = policyEnforcer.handleSoftwareFailure(failureException);
if (continueIngestion) {
pullBasedFeedClient.resetOnFailure(failureException);
@@ -120,7 +124,9 @@
private void appendTupleToFrame(IFrameWriter writer) throws HyracksDataException {
if (!appender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
+ System.out.println("flushing frame");
FrameUtils.flushFrame(frame, writer);
+ System.out.println("flushed frame");
appender.reset(frame, true);
if (!appender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
tupleBuilder.getSize())) {
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedFeedClient.java
index a5cb037..a11197b 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedFeedClient.java
@@ -71,6 +71,7 @@
@Override
public InflowState nextTuple(DataOutput dataOutput) throws AsterixException {
try {
+ System.out.println("Setting next record");
InflowState state = setNextRecord();
boolean first = true;
switch (state) {
@@ -85,7 +86,7 @@
recordBuilder.init();
writeRecord(mutableRecord, dataOutput, recordBuilder);
break;
-
+
case DATA_NOT_AVAILABLE:
break;
case NO_MORE_DATA:
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
index b84e357..7c6fba0 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
@@ -123,7 +123,7 @@
@Override
public void stop() {
// TODO Auto-generated method stub
-
+
}
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
index a008c1d..ba592c3 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
@@ -661,8 +661,8 @@
}
@Override
- public Collection<FeedActivity> getActiveFeeds(MetadataTransactionContext ctx) throws MetadataException {
- Collection<FeedActivity> feedActivities = null;
+ public List<FeedActivity> getActiveFeeds(MetadataTransactionContext ctx) throws MetadataException {
+ List<FeedActivity> feedActivities = null;
try {
feedActivities = metadataNode.getActiveFeeds(ctx.getJobId());
} catch (RemoteException e) {
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
index b180063..8bfca10 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
@@ -17,7 +17,6 @@
import java.rmi.RemoteException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -1245,7 +1244,8 @@
}
@Override
- public Collection<FeedActivity> getActiveFeeds(JobId jobId) throws MetadataException, RemoteException {
+ public List<FeedActivity> getActiveFeeds(JobId jobId) throws MetadataException, RemoteException {
+ List<FeedActivity> activeFeeds = new ArrayList<FeedActivity>();
try {
ITupleReference searchKey = createTuple();
FeedActivityTupleTranslator tupleReaderWriter = new FeedActivityTupleTranslator(true);
@@ -1262,13 +1262,17 @@
initiatedFeeds.put(fid, fa);
break;
case FEED_FAILURE:
+ break;
case FEED_END:
fid = new FeedId(fa.getDataverseName(), fa.getDatasetName());
initiatedFeeds.remove(fid);
break;
}
}
- return initiatedFeeds.values();
+ for (FeedActivity fa : initiatedFeeds.values()) {
+ activeFeeds.add(fa);
+ }
+ return activeFeeds;
} catch (Exception e) {
throw new MetadataException(e);
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
index a7833d2..d71db58 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
@@ -485,7 +485,7 @@
* @return
* @throws MetadataException
*/
- public Collection<FeedActivity> getActiveFeeds(MetadataTransactionContext ctx) throws MetadataException;
+ public List<FeedActivity> getActiveFeeds(MetadataTransactionContext ctx) throws MetadataException;
public void initializeDatasetIdFactory(MetadataTransactionContext ctx) throws MetadataException;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java
index 9ab483d..e789817 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java
@@ -531,5 +531,5 @@
* @throws MetadataException
* @throws RemoteException
*/
- public Collection<FeedActivity> getActiveFeeds(JobId jobId) throws MetadataException, RemoteException;
+ public List<FeedActivity> getActiveFeeds(JobId jobId) throws MetadataException, RemoteException;
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 9544abd..775ef9e 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -474,6 +474,7 @@
FeedPolicy feedPolicy = (FeedPolicy) ((AqlDataSource) dataSource).getProperties().get(
BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY);
+ feedPolicy.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, feedPolicy.getPolicyName());
FeedIntakeOperatorDescriptor feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, new FeedId(
dataset.getDataverseName(), dataset.getDatasetName()), adapterFactory, (ARecordType) adapterOutputType,
feedDesc, feedPolicy.getProperties());
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java
index 72a1f03..48f16b6 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java
@@ -15,7 +15,6 @@
package edu.uci.ics.asterix.metadata.entities;
-import java.util.List;
import java.util.Map;
import edu.uci.ics.asterix.metadata.MetadataCache;
@@ -54,6 +53,7 @@
public static final String INGESTION_RATE = "ingestion-rate";
public static final String EXCEPTION_LOCATION = "exception-location";
public static final String EXCEPTION_MESSAGE = "exception-message";
+ public static final String FEED_POLICY_NAME = "feed-policy-name";
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFailureHandler.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFailureHandler.java
deleted file mode 100644
index 77dbea0..0000000
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFailureHandler.java
+++ /dev/null
@@ -1,81 +0,0 @@
-package edu.uci.ics.asterix.metadata.feeds;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.event.schema.cluster.Node;
-import edu.uci.ics.asterix.metadata.cluster.ClusterManager;
-import edu.uci.ics.asterix.metadata.feeds.FeedLifecycleListener.FeedFailure;
-import edu.uci.ics.asterix.metadata.feeds.FeedLifecycleListener.FeedFailure.FailureType;
-import edu.uci.ics.asterix.metadata.feeds.FeedLifecycleListener.FeedFailureReport;
-import edu.uci.ics.asterix.metadata.feeds.FeedLifecycleListener.FeedInfo;
-import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
-
-public class FeedFailureHandler implements Runnable {
-
- private LinkedBlockingQueue<FeedFailureReport> inbox = null;
-
- public FeedFailureHandler(LinkedBlockingQueue<FeedFailureReport> inbox) {
- this.inbox = inbox;
- }
-
- @Override
- public void run() {
- while (true) {
- try {
- FeedFailureReport failureReport = inbox.take();
- Map<String, Map<FeedInfo, List<FailureType>>> failureMap = new HashMap<String, Map<FeedInfo, List<FailureType>>>();
- for (Map.Entry<FeedInfo, List<FeedFailure>> entry : failureReport.failures.entrySet()) {
- FeedInfo feedInfo = entry.getKey();
- List<FeedFailure> feedFailures = entry.getValue();
- for (FeedFailure feedFailure : feedFailures) {
- switch (feedFailure.failureType) {
- case COMPUTE_NODE:
- case INGESTION_NODE:
- Map<FeedInfo, List<FailureType>> failuresBecauseOfThisNode = failureMap
- .get(feedFailure.nodeId);
- if (failuresBecauseOfThisNode == null) {
- failuresBecauseOfThisNode = new HashMap<FeedInfo, List<FailureType>>();
- failuresBecauseOfThisNode.put(feedInfo, new ArrayList<FailureType>());
- failureMap.put(feedFailure.nodeId, failuresBecauseOfThisNode);
- }
- List<FailureType> feedF = failuresBecauseOfThisNode.get(feedInfo);
- if (feedF == null) {
- feedF = new ArrayList<FailureType>();
- failuresBecauseOfThisNode.put(feedInfo, feedF);
- }
- feedF.add(feedFailure.failureType);
- break;
- case STORAGE_NODE:
- }
- }
- }
-
- try {
- correctFailure(failureMap);
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- }
-
- private void correctFailure(Map<String, Map<FeedInfo, List<FailureType>>> failureMap) throws AsterixException {
- for (String nodeId : failureMap.keySet()) {
- Node node = AsterixClusterProperties.INSTANCE.getAvailableSubstitutionNode();
- if (node != null) {
- ClusterManager.INSTANCE.addNode(node);
- }
- }
-
- }
-
-}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
index 4270fe2..28d0da3 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
@@ -64,4 +64,8 @@
public FeedId getFeedId() {
return feedId;
}
+
+ public Map<String, String> getFeedPolicy() {
+ return feedPolicy;
+ }
}
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java
index 4742b16..5ba983e 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java
@@ -19,13 +19,10 @@
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.AUnorderedListType;
import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.asterix.tools.external.data.DataGenerator.InitializationInfo;
import edu.uci.ics.asterix.tools.external.data.DataGenerator.Message;
import edu.uci.ics.asterix.tools.external.data.DataGenerator.TweetMessage;
import edu.uci.ics.asterix.tools.external.data.DataGenerator.TweetMessageIterator;
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
/**
@@ -36,6 +33,9 @@
public class SyntheticTwitterFeedAdapter extends PullBasedAdapter {
private static final long serialVersionUID = 1L;
+
+ private static final Logger LOGGER = Logger.getLogger(SyntheticTwitterFeedAdapter.class.getName());
+
private Map<String, Object> configuration;
public SyntheticTwitterFeedAdapter(Map<String, Object> configuration, ARecordType outputType,
@@ -116,6 +116,7 @@
private void writeTweet(TweetMessage next) {
//tweet id
+ LOGGER.info("Generating next tweet");
((AMutableString) mutableFields[0]).setValue(next.getTweetid());
mutableRecord.setValueAtPos(0, mutableFields[0]);
@@ -149,8 +150,10 @@
// text
Message m = next.getMessageText();
char[] content = m.getMessage();
- ((AMutableString) mutableFields[5]).setValue(new String(content, 0, m.getLength()));
+ String tweetText = new String(content, 0, m.getLength());
+ ((AMutableString) mutableFields[5]).setValue(tweetText);
mutableRecord.setValueAtPos(5, mutableFields[5]);
+ LOGGER.info(tweetText);
}
@@ -168,10 +171,12 @@
@Override
public InflowState setNextRecord() throws Exception {
+ LOGGER.info("requesting next tweet");
boolean moreData = tweetIterator.hasNext();
if (!moreData) {
return InflowState.NO_MORE_DATA;
- }
+ }
+ LOGGER.info("writing next tweet");
writeTweet(tweetIterator.next());
if (tweetInterval != 0) {
tweetCount++;
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java
index 3d05870..036cb1c 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java
@@ -98,9 +98,13 @@
}
List<String> storageNodes = ng.getNodeNames();
Set<String> nodes = AsterixAppContextInfo.getInstance().getMetadataProperties().getNodeNames();
- nodes.removeAll(storageNodes);
+ String ingestionLocation = null;
+ if (nodes.size() > storageNodes.size()) {
+ nodes.removeAll(storageNodes);
+ }
+ String[] nodesArray = nodes.toArray(new String[] {});
Random r = new Random();
- String ingestionLocation = nodes.toArray(new String[] {})[r.nextInt(nodes.size())];
+ ingestionLocation = nodesArray[r.nextInt(nodes.size())];
return new AlgebricksAbsolutePartitionConstraint(new String[] { ingestionLocation });
}