added managix commands for starting/stopping a node
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index 91b4abb..2292cdf 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -86,7 +86,9 @@
import edu.uci.ics.asterix.metadata.entities.Dataverse;
import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
import edu.uci.ics.asterix.metadata.entities.FeedActivity;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails;
+import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails.FeedState;
import edu.uci.ics.asterix.metadata.entities.Function;
import edu.uci.ics.asterix.metadata.entities.Index;
import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
@@ -578,6 +580,10 @@
}
nodegroupName = dataverse + ":" + dd.getName().getValue();
MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, new NodeGroup(nodegroupName, selectedNodes));
+ //TODO: Remove this hack. In future we would not mandate metadata node to be one of the
+ // storage nodes. We require that currently so that ingestion node does not coincide with
+ // the metadata node.
+ nodeNames.add(metadataNodeName);
} else {
nodegroupName = MetadataConstants.METADATA_DEFAULT_NODEGROUP_NAME;
}
@@ -630,6 +636,16 @@
}
}
+ if (ds.getDatasetType().equals(DatasetType.FEED)) {
+ FeedActivity fa = MetadataManager.INSTANCE.getRecentFeedActivity(mdTxnCtx, dataverseName, datasetName,
+ null);
+ boolean activeFeed = FeedOperations.isFeedActive(fa);
+ if (activeFeed) {
+ throw new AsterixException("Feed " + datasetName + " is currently " + FeedState.ACTIVE + "."
+ + " Operation not supported.");
+ }
+ }
+
//#. add a new index with PendingAddOp
Index index = new Index(dataverseName, datasetName, indexName, stmtCreateIndex.getIndexType(),
stmtCreateIndex.getFieldExprs(), stmtCreateIndex.getGramLength(), false,
@@ -904,6 +920,16 @@
}
}
+ if (ds.getDatasetType().equals(DatasetType.FEED)) {
+ FeedActivity fa = MetadataManager.INSTANCE.getRecentFeedActivity(mdTxnCtx, dataverseName, datasetName,
+ null);
+ boolean activeFeed = FeedOperations.isFeedActive(fa);
+ if (activeFeed) {
+ throw new AsterixException("Feed " + datasetName + " is currently " + FeedState.ACTIVE + "."
+ + " Operation not supported.");
+ }
+ }
+
if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
//#. prepare jobs to drop the datatset and the indexes in NC
@@ -1012,6 +1038,16 @@
+ dataverseName);
}
+ if (ds.getDatasetType().equals(DatasetType.FEED)) {
+ FeedActivity fa = MetadataManager.INSTANCE.getRecentFeedActivity(mdTxnCtx, dataverseName, datasetName,
+ null);
+ boolean activeFeed = FeedOperations.isFeedActive(fa);
+ if (activeFeed) {
+ throw new AsterixException("Feed " + datasetName + " is currently " + FeedState.ACTIVE + "."
+ + " Operation not supported.");
+ }
+ }
+
if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
indexName = stmtIndexDrop.getIndexName().getValue();
Index index = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
@@ -1380,7 +1416,7 @@
}
FeedActivity recentActivity = MetadataManager.INSTANCE.getRecentFeedActivity(mdTxnCtx, dataverseName, bfs
- .getDatasetName().getValue());
+ .getDatasetName().getValue(), null);
boolean isFeedActive = FeedOperations.isFeedActive(recentActivity);
if (isFeedActive && !bfs.isForceBegin()) {
throw new AsterixException("Feed " + bfs.getDatasetName().getValue()
@@ -1442,7 +1478,7 @@
}
FeedActivity feedActivity = MetadataManager.INSTANCE.getRecentFeedActivity(mdTxnCtx, dataverseName,
- datasetName);
+ datasetName, FeedActivityType.FEED_BEGIN, FeedActivityType.FEED_RESUME);
boolean isFeedActive = FeedOperations.isFeedActive(feedActivity);
if (!isFeedActive) {
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java
index a379a24..024e660 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java
@@ -14,10 +14,6 @@
*/
package edu.uci.ics.asterix.file;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
import java.util.logging.Logger;
import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
@@ -25,10 +21,9 @@
import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
import edu.uci.ics.asterix.metadata.entities.Dataset;
import edu.uci.ics.asterix.metadata.entities.FeedActivity;
-import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails;
import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
+import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails;
import edu.uci.ics.asterix.metadata.feeds.AlterFeedMessage;
-import edu.uci.ics.asterix.metadata.feeds.FeedId;
import edu.uci.ics.asterix.metadata.feeds.FeedMessage;
import edu.uci.ics.asterix.metadata.feeds.IFeedMessage;
import edu.uci.ics.asterix.metadata.feeds.IFeedMessage.MessageType;
@@ -109,9 +104,7 @@
feedMessage = new FeedMessage(MessageType.END);
break;
case ALTER:
- Map<String, Object> wrappedProperties = new HashMap<String, Object>();
- wrappedProperties.putAll(controlFeedStatement.getProperties());
- feedMessage = new AlterFeedMessage(wrappedProperties);
+ feedMessage = new AlterFeedMessage(controlFeedStatement.getProperties());
break;
}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
index 15e6cd5..1d8ba1d 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
@@ -34,6 +34,8 @@
import edu.uci.ics.asterix.api.common.SessionConfig;
import edu.uci.ics.asterix.aql.base.Statement;
import edu.uci.ics.asterix.aql.expression.BeginFeedStatement;
+import edu.uci.ics.asterix.aql.expression.ControlFeedStatement;
+import edu.uci.ics.asterix.aql.expression.ControlFeedStatement.OperationType;
import edu.uci.ics.asterix.aql.expression.DataverseDecl;
import edu.uci.ics.asterix.aql.expression.Identifier;
import edu.uci.ics.asterix.aql.translator.AqlTranslator;
@@ -52,10 +54,10 @@
import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityDetails;
import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
-import edu.uci.ics.asterix.metadata.feeds.AdapterRuntimeManager;
import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies;
import edu.uci.ics.asterix.metadata.feeds.FeedId;
import edu.uci.ics.asterix.metadata.feeds.FeedIntakeOperatorDescriptor;
+import edu.uci.ics.asterix.metadata.feeds.FeedPolicyAccessor;
import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
import edu.uci.ics.asterix.om.util.AsterixClusterProperties.State;
@@ -66,6 +68,7 @@
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGenerator;
import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
@@ -87,6 +90,7 @@
private LinkedBlockingQueue<Message> jobEventInbox;
private LinkedBlockingQueue<IClusterManagementWorkResponse> responseInbox;
+ private Map<FeedInfo, List<String>> dependentFeeds = new HashMap<FeedInfo, List<String>>();
private State state;
@@ -127,14 +131,13 @@
JobSpecification spec = acggf.getJobSpecification();
boolean feedIngestionJob = false;
FeedId feedId = null;
- String feedPolicy = null;
+ Map<String, String> feedPolicy = null;
for (IOperatorDescriptor opDesc : spec.getOperatorMap().values()) {
if (!(opDesc instanceof FeedIntakeOperatorDescriptor)) {
continue;
}
feedId = ((FeedIntakeOperatorDescriptor) opDesc).getFeedId();
- feedPolicy = ((FeedIntakeOperatorDescriptor) opDesc).getFeedPolicy().get(
- BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY);
+ feedPolicy = ((FeedIntakeOperatorDescriptor) opDesc).getFeedPolicy();
feedIngestionJob = true;
break;
}
@@ -191,7 +194,7 @@
return registeredFeeds.containsKey(jobId);
}
- public void registerFeed(FeedId feedId, JobId jobId, JobSpecification jobSpec, String feedPolicy) {
+ public void registerFeed(FeedId feedId, JobId jobId, JobSpecification jobSpec, Map<String, String> feedPolicy) {
if (registeredFeeds.containsKey(jobId)) {
throw new IllegalStateException(" Feed already registered ");
}
@@ -284,15 +287,20 @@
feedActivityDetails.put(FeedActivity.FeedActivityDetails.INGEST_LOCATIONS, ingestLocs.toString());
feedActivityDetails.put(FeedActivity.FeedActivityDetails.COMPUTE_LOCATIONS, computeLocs.toString());
feedActivityDetails.put(FeedActivity.FeedActivityDetails.STORAGE_LOCATIONS, storageLocs.toString());
- feedActivityDetails.put(FeedActivity.FeedActivityDetails.FEED_POLICY_NAME, feedInfo.feedPolicy);
-
- FeedActivity feedActivity = new FeedActivity(feedInfo.feedId.getDataverse(),
- feedInfo.feedId.getDataset(), FeedActivityType.FEED_BEGIN, feedActivityDetails);
+ feedActivityDetails.put(FeedActivity.FeedActivityDetails.FEED_POLICY_NAME,
+ feedInfo.feedPolicy.get(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY));
MetadataManager.INSTANCE.acquireWriteLatch();
MetadataTransactionContext mdTxnCtx = null;
try {
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ FeedActivity fa = MetadataManager.INSTANCE.getRecentFeedActivity(mdTxnCtx,
+ feedInfo.feedId.getDataverse(), feedInfo.feedId.getDataset(), null);
+ FeedActivityType nextState = fa != null
+ && fa.getActivityType().equals(FeedActivityType.FEED_RECOVERY) ? FeedActivityType.FEED_RESUME
+ : FeedActivityType.FEED_BEGIN;
+ FeedActivity feedActivity = new FeedActivity(feedInfo.feedId.getDataverse(),
+ feedInfo.feedId.getDataset(), nextState, feedActivityDetails);
MetadataManager.INSTANCE.registerFeedActivity(mdTxnCtx, new FeedId(feedInfo.feedId.getDataverse(),
feedInfo.feedId.getDataset()), feedActivity);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -353,9 +361,9 @@
public List<String> computeLocations = new ArrayList<String>();
public List<String> storageLocations = new ArrayList<String>();
public JobInfo jobInfo;
- public String feedPolicy;
+ public Map<String, String> feedPolicy;
- public FeedInfo(FeedId feedId, JobSpecification jobSpec, String feedPolicy) {
+ public FeedInfo(FeedId feedId, JobSpecification jobSpec, Map<String, String> feedPolicy) {
this.feedId = feedId;
this.jobSpec = jobSpec;
this.feedPolicy = feedPolicy;
@@ -385,19 +393,71 @@
}
failures.add(new FeedFailure(FeedFailure.FailureType.COMPUTE_NODE, deadNodeId));
}
+ if (feedInfo.storageLocations.contains(deadNodeId)) {
+ List<FeedFailure> failures = failureReport.failures.get(feedInfo);
+ if (failures == null) {
+ failures = new ArrayList<FeedFailure>();
+ failureReport.failures.put(feedInfo, failures);
+ }
+ failures.add(new FeedFailure(FeedFailure.FailureType.STORAGE_NODE, deadNodeId));
+ }
}
}
-
- return handleFailure(failureReport);
+ if (failureReport.failures.isEmpty()) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("No feed is affected by the failure of node(s): ");
+ for (String deadNodeId : deadNodeIds) {
+ builder.append(deadNodeId + " ");
+ }
+ LOGGER.info(builder.toString());
+ }
+ return new HashSet<IClusterManagementWork>();
+ } else {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("Feed affected by the failure of node(s): ");
+ for (String deadNodeId : deadNodeIds) {
+ builder.append(deadNodeId + " ");
+ }
+ builder.append("\n");
+ for (FeedInfo fInfo : failureReport.failures.keySet()) {
+ builder.append(fInfo.feedId);
+ }
+ LOGGER.warning(builder.toString());
+ }
+ return handleFailure(failureReport);
+ }
}
private Set<IClusterManagementWork> handleFailure(FeedFailureReport failureReport) {
reportFeedFailure(failureReport);
Set<IClusterManagementWork> work = new HashSet<IClusterManagementWork>();
Map<String, Map<FeedInfo, List<FailureType>>> failureMap = new HashMap<String, Map<FeedInfo, List<FailureType>>>();
+ FeedPolicyAccessor fpa = null;
+ List<FeedInfo> feedsToTerminate = new ArrayList<FeedInfo>();
for (Map.Entry<FeedInfo, List<FeedFailure>> entry : failureReport.failures.entrySet()) {
FeedInfo feedInfo = entry.getKey();
+ fpa = new FeedPolicyAccessor(feedInfo.feedPolicy);
+ if (!fpa.continueOnHardwareFailure()) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Feed " + feedInfo.feedId + " is governed by policy "
+ + feedInfo.feedPolicy.get(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY));
+ LOGGER.warning("Feed policy does not require feed to recover from hardware failure. Feed will terminate");
+ }
+ continue;
+ } else {
+ // insert feed recovery mode
+ reportFeedRecoveryMode(feedInfo);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Feed " + feedInfo.feedId + " is governed by policy "
+ + feedInfo.feedPolicy.get(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY));
+ LOGGER.info("Feed policy requires feed to recover from hardware failure. Attempting to recover feed");
+ }
+ }
+
List<FeedFailure> feedFailures = entry.getValue();
+ boolean recoveryPossible = true;
for (FeedFailure feedFailure : feedFailures) {
switch (feedFailure.failureType) {
case COMPUTE_NODE:
@@ -414,20 +474,64 @@
failuresBecauseOfThisNode.put(feedInfo, feedF);
}
feedF.add(feedFailure.failureType);
-
break;
case STORAGE_NODE:
+ recoveryPossible = false;
if (LOGGER.isLoggable(Level.SEVERE)) {
LOGGER.severe("Unrecoverable situation! lost storage node for the feed " + feedInfo.feedId);
}
+ List<String> requiredNodeIds = dependentFeeds.get(feedInfo);
+ if (requiredNodeIds == null) {
+ requiredNodeIds = new ArrayList<String>();
+ dependentFeeds.put(feedInfo, requiredNodeIds);
+ }
+ requiredNodeIds.add(feedFailure.nodeId);
+ failuresBecauseOfThisNode = failureMap.get(feedFailure.nodeId);
+ if (failuresBecauseOfThisNode != null) {
+ failuresBecauseOfThisNode.remove(feedInfo);
+ if (failuresBecauseOfThisNode.isEmpty()) {
+ failureMap.remove(feedFailure.nodeId);
+ }
+ }
+ feedsToTerminate.add(feedInfo);
break;
}
}
+ if (!recoveryPossible) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Terminating irrecoverable feed (loss of storage node) ");
+ }
+ }
}
- AddNodeWork addNodesWork = new AddNodeWork(failureMap.keySet().size(), this);
- work.add(addNodesWork);
- feedWorkRequestResponseHandler.registerFeedWork(addNodesWork.getWorkId(), failureReport);
+ if (!feedsToTerminate.isEmpty()) {
+ Thread t = new Thread(new FeedsDeActivator(feedsToTerminate));
+ t.start();
+ }
+
+ int numRequiredNodes = 0;
+ for (Entry<String, Map<FeedInfo, List<FeedFailure.FailureType>>> entry : failureMap.entrySet()) {
+ Map<FeedInfo, List<FeedFailure.FailureType>> v = entry.getValue();
+ for (FeedInfo finfo : feedsToTerminate) {
+ v.remove(finfo);
+ }
+ if (v.size() > 0) {
+ numRequiredNodes++;
+ }
+ }
+
+ if (numRequiredNodes > 0) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Number of additional nodes requested " + numRequiredNodes);
+ }
+ AddNodeWork addNodesWork = new AddNodeWork(failureMap.keySet().size(), this);
+ work.add(addNodesWork);
+ feedWorkRequestResponseHandler.registerFeedWork(addNodesWork.getWorkId(), failureReport);
+ } else {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Not requesting any new node. Feeds unrecoverable until the lost node(s) rejoin");
+ }
+ }
return work;
}
@@ -463,6 +567,29 @@
}
}
+ private void reportFeedRecoveryMode(FeedInfo feedInfo) {
+ MetadataTransactionContext ctx = null;
+ FeedActivity fa = null;
+ Map<String, String> feedActivityDetails = new HashMap<String, String>();
+ try {
+ ctx = MetadataManager.INSTANCE.beginTransaction();
+ fa = new FeedActivity(feedInfo.feedId.getDataverse(), feedInfo.feedId.getDataset(),
+ FeedActivityType.FEED_RECOVERY, feedActivityDetails);
+ MetadataManager.INSTANCE.registerFeedActivity(ctx, feedInfo.feedId, fa);
+
+ MetadataManager.INSTANCE.commitTransaction(ctx);
+ } catch (Exception e) {
+ if (ctx != null) {
+ try {
+ MetadataManager.INSTANCE.abortTransaction(ctx);
+ } catch (Exception e2) {
+ e2.addSuppressed(e);
+ throw new IllegalStateException("Unable to abort transaction " + e2);
+ }
+ }
+ }
+ }
+
public static class FeedFailure {
public enum FailureType {
@@ -491,21 +618,39 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info(joinedNodeId + " joined the cluster. " + "Asterix state: " + newState);
}
- if (!newState.equals(state)) {
- if (newState == State.ACTIVE) {
+
+ boolean needToReActivateFeeds = !newState.equals(state) && (newState == State.ACTIVE);
+ if (needToReActivateFeeds) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info(joinedNodeId + " Resuming loser feeds (if any)");
+ }
+ try {
+ FeedsActivator activator = new FeedsActivator();
+ (new Thread(activator)).start();
+ } catch (Exception e) {
if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info(joinedNodeId + " Resuming loser feeds (if any)");
- }
- try {
- FeedsActivator activator = new FeedsActivator();
- (new Thread(activator)).start();
- } catch (Exception e) {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Exception in resuming feeds" + e.getMessage());
- }
+ LOGGER.info("Exception in resuming feeds" + e.getMessage());
}
}
state = newState;
+ } else {
+ List<FeedInfo> feedsThatCanBeRevived = new ArrayList<FeedInfo>();
+ for (Entry<FeedInfo, List<String>> entry : dependentFeeds.entrySet()) {
+ List<String> requiredNodeIds = entry.getValue();
+ if (requiredNodeIds.contains(joinedNodeId)) {
+ requiredNodeIds.remove(joinedNodeId);
+ if (requiredNodeIds.isEmpty()) {
+ feedsThatCanBeRevived.add(entry.getKey());
+ }
+ }
+ }
+ if (!feedsThatCanBeRevived.isEmpty()) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info(joinedNodeId + " Resuming feeds after rejoining of node " + joinedNodeId);
+ }
+ FeedsActivator activator = new FeedsActivator(feedsThatCanBeRevived);
+ (new Thread(activator)).start();
+ }
}
return null;
}
@@ -542,12 +687,57 @@
private static class FeedsActivator implements Runnable {
+ private List<FeedInfo> feedsToRevive;
+ private Mode mode;
+
+ public enum Mode {
+ REVIVAL_POST_CLUSTER_REBOOT,
+ REVIVAL_POST_NODE_REJOIN
+ }
+
+ public FeedsActivator() {
+ this.mode = Mode.REVIVAL_POST_CLUSTER_REBOOT;
+ }
+
+ public FeedsActivator(List<FeedInfo> feedsToRevive) {
+ this.feedsToRevive = feedsToRevive;
+ this.mode = Mode.REVIVAL_POST_NODE_REJOIN;
+ }
+
@Override
public void run() {
+ switch (mode) {
+ case REVIVAL_POST_CLUSTER_REBOOT:
+ revivePostClusterReboot();
+ break;
+ case REVIVAL_POST_NODE_REJOIN:
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e1) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Attempt to resume feed interrupted");
+ }
+ throw new IllegalStateException(e1.getMessage());
+ }
+ for (FeedInfo finfo : feedsToRevive) {
+ try {
+ JobId jobId = AsterixAppContextInfo.getInstance().getHcc().startJob(finfo.jobSpec);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Resumed feed :" + finfo.feedId + " job id " + jobId);
+ LOGGER.info("Job:" + finfo.jobSpec);
+ }
+ } catch (Exception e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Unable to resume feed " + finfo.feedId + " " + e.getMessage());
+ }
+ }
+ }
+ }
+ }
+
+ private void revivePostClusterReboot() {
MetadataTransactionContext ctx = null;
- SessionConfig pc = new SessionConfig(true, false, false, false, false, false, true, false);
- PrintWriter writer = new PrintWriter(System.out, true);
try {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Attempting to Resume feeds!");
@@ -558,9 +748,7 @@
List<FeedActivity> activeFeeds = MetadataManager.INSTANCE.getActiveFeeds(ctx);
MetadataManager.INSTANCE.commitTransaction(ctx);
for (FeedActivity fa : activeFeeds) {
-
String feedPolicy = fa.getFeedActivityDetails().get(FeedActivityDetails.FEED_POLICY_NAME);
-
FeedPolicy policy = MetadataManager.INSTANCE.getFeedPolicy(ctx, fa.getDataverseName(), feedPolicy);
if (policy == null) {
policy = MetadataManager.INSTANCE.getFeedPolicy(ctx, MetadataConstants.METADATA_DATAVERSE_NAME,
@@ -571,36 +759,27 @@
+ fa.getDatasetName() + "." + " Unknown policy :" + feedPolicy);
}
}
+ continue;
}
- String dataverse = fa.getDataverseName();
- String datasetName = fa.getDatasetName();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Resuming loser feed: " + dataverse + ":" + datasetName + " using policy "
- + feedPolicy);
- }
- try {
- DataverseDecl dataverseDecl = new DataverseDecl(new Identifier(dataverse));
- BeginFeedStatement stmt = new BeginFeedStatement(new Identifier(dataverse), new Identifier(
- datasetName), feedPolicy, 0);
- stmt.setForceBegin(true);
- List<Statement> statements = new ArrayList<Statement>();
- statements.add(dataverseDecl);
- statements.add(stmt);
- AqlTranslator translator = new AqlTranslator(statements, writer, pc, DisplayFormat.TEXT);
- translator.compileAndExecute(AsterixAppContextInfo.getInstance().getHcc(), null, false);
+ FeedPolicyAccessor fpa = new FeedPolicyAccessor(policy.getProperties());
+ if (fpa.autoRestartOnClusterReboot()) {
+ String dataverse = fa.getDataverseName();
+ String datasetName = fa.getDatasetName();
if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Resumed feed: " + dataverse + ":" + datasetName + " using policy "
- + feedPolicy);
+ LOGGER.info("Resuming feed after cluster revival: " + dataverse + ":" + datasetName
+ + " using policy " + feedPolicy);
}
- } catch (Exception e) {
- e.printStackTrace();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Exception in resuming loser feed: " + dataverse + ":" + datasetName
- + " using policy " + feedPolicy + " Exception " + e.getMessage());
+ reviveFeed(dataverse, datasetName, feedPolicy);
+ } else {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Feed " + fa.getDataverseName() + ":" + fa.getDatasetName()
+ + " governed by policy" + feedPolicy
+ + " does not state auto restart after cluster revival");
}
}
}
+
} catch (Exception e) {
e.printStackTrace();
try {
@@ -612,9 +791,85 @@
throw new IllegalStateException(e1);
}
}
+ }
+ private void reviveFeed(String dataverse, String dataset, String feedPolicy) {
+ PrintWriter writer = new PrintWriter(System.out, true);
+ SessionConfig pc = new SessionConfig(true, false, false, false, false, false, true, false);
+ try {
+ DataverseDecl dataverseDecl = new DataverseDecl(new Identifier(dataverse));
+ BeginFeedStatement stmt = new BeginFeedStatement(new Identifier(dataverse), new Identifier(dataset),
+ feedPolicy, 0);
+ stmt.setForceBegin(true);
+ List<Statement> statements = new ArrayList<Statement>();
+ statements.add(dataverseDecl);
+ statements.add(stmt);
+ AqlTranslator translator = new AqlTranslator(statements, writer, pc, DisplayFormat.TEXT);
+ translator.compileAndExecute(AsterixAppContextInfo.getInstance().getHcc(), null, false);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Resumed feed: " + dataverse + ":" + dataset + " using policy " + feedPolicy);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Exception in resuming loser feed: " + dataverse + ":" + dataset + " using policy "
+ + feedPolicy + " Exception " + e.getMessage());
+ }
+ }
}
}
+ private static class FeedsDeActivator implements Runnable {
+
+ private List<FeedInfo> feedsToTerminate;
+
+ public FeedsDeActivator(List<FeedInfo> feedsToTerminate) {
+ this.feedsToTerminate = feedsToTerminate;
+ }
+
+ @Override
+ public void run() {
+ for (FeedInfo feedInfo : feedsToTerminate) {
+ endFeed(feedInfo);
+ }
+ }
+
+ private void endFeed(FeedInfo feedInfo) {
+ MetadataTransactionContext ctx = null;
+ PrintWriter writer = new PrintWriter(System.out, true);
+ SessionConfig pc = new SessionConfig(true, false, false, false, false, false, true, false);
+ try {
+ ctx = MetadataManager.INSTANCE.beginTransaction();
+ ControlFeedStatement stmt = new ControlFeedStatement(OperationType.END, new Identifier(
+ feedInfo.feedId.getDataverse()), new Identifier(feedInfo.feedId.getDataset()));
+ List<Statement> statements = new ArrayList<Statement>();
+ DataverseDecl dataverseDecl = new DataverseDecl(new Identifier(feedInfo.feedId.getDataverse()));
+ statements.add(dataverseDecl);
+ statements.add(stmt);
+ AqlTranslator translator = new AqlTranslator(statements, writer, pc, DisplayFormat.TEXT);
+ translator.compileAndExecute(AsterixAppContextInfo.getInstance().getHcc(), null, false);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("End urecoverable feed: " + feedInfo.feedId.getDataverse() + ":"
+ + feedInfo.feedId.getDataset());
+ }
+ MetadataManager.INSTANCE.commitTransaction(ctx);
+ } catch (Exception e) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Exception in ending loser feed: " + feedInfo.feedId + " Exception " + e.getMessage());
+ }
+ e.printStackTrace();
+ try {
+ MetadataManager.INSTANCE.abortTransaction(ctx);
+ } catch (Exception e2) {
+ e2.addSuppressed(e);
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.severe("Exception in aborting transaction! System is in inconsistent state");
+ }
+ }
+
+ }
+
+ }
+ }
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
index 465a226..6880308 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
@@ -20,8 +20,11 @@
import java.util.Map;
import edu.uci.ics.asterix.external.dataset.adapter.CNNFeedAdapter;
-import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -29,13 +32,14 @@
/**
* A factory class for creating the @see {CNNFeedAdapter}.
*/
-public class CNNFeedAdapterFactory implements IAdapterFactory {
+public class CNNFeedAdapterFactory implements ITypedAdapterFactory {
private static final long serialVersionUID = 1L;
- private Map<String, Object> configuration;
+ private Map<String, String> configuration;
private List<String> feedURLs = new ArrayList<String>();
private static Map<String, String> topicFeeds = new HashMap<String, String>();
+ private ARecordType recordType;
public static final String KEY_RSS_URL = "topic";
public static final String KEY_INTERVAL = "interval";
@@ -77,7 +81,7 @@
@Override
public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx) throws Exception {
- CNNFeedAdapter cnnFeedAdapter = new CNNFeedAdapter(configuration, ctx);
+ CNNFeedAdapter cnnFeedAdapter = new CNNFeedAdapter(configuration, recordType, ctx);
return cnnFeedAdapter;
}
@@ -92,13 +96,17 @@
}
@Override
- public void configure(Map<String, Object> configuration) throws Exception {
+ public void configure(Map<String, String> configuration) throws Exception {
this.configuration = configuration;
String rssURLProperty = (String) configuration.get(KEY_RSS_URL);
if (rssURLProperty == null) {
throw new IllegalArgumentException("no rss url provided");
}
initializeFeedURLs(rssURLProperty);
+ recordType = new ARecordType("FeedRecordType", new String[] { "id", "title", "description", "link" },
+ new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING },
+ false);
+
}
private void initializeFeedURLs(String rssURLProperty) {
@@ -126,8 +134,7 @@
@Override
public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
- AlgebricksPartitionConstraint partitionConstraint = new AlgebricksCountPartitionConstraint(feedURLs.size());
- return new AlgebricksCountPartitionConstraint(1);
+ return new AlgebricksCountPartitionConstraint(feedURLs.size());
}
@Override
@@ -135,4 +142,9 @@
return SupportedOperation.READ;
}
+ @Override
+ public ARecordType getAdapterOutputType() {
+ return recordType;
+ }
+
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
index d50ea72..af057c9 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
@@ -24,6 +24,9 @@
import edu.uci.ics.asterix.external.dataset.adapter.HDFSAdapter;
import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IGenericAdapterFactory;
+import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
+import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
@@ -40,7 +43,7 @@
* A factory class for creating an instance of HDFSAdapter
*/
@SuppressWarnings("deprecation")
-public class HDFSAdapterFactory extends StreamBasedAdapterFactory {
+public class HDFSAdapterFactory extends StreamBasedAdapterFactory implements IGenericAdapterFactory {
private static final long serialVersionUID = 1L;
public static final String HDFS_ADAPTER_NAME = "hdfs";
@@ -99,7 +102,7 @@
return HDFS_ADAPTER_NAME;
}
- private JobConf configureJobConf(Map<String, Object> configuration) throws Exception {
+ private JobConf configureJobConf(Map<String, String> configuration) throws Exception {
JobConf conf = new JobConf();
conf.set("fs.default.name", ((String) configuration.get(KEY_HDFS_URL)).trim());
conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
@@ -124,7 +127,7 @@
}
@Override
- public void configure(Map<String, Object> configuration) throws Exception {
+ public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
if (!initialized) {
hdfsScheduler = initializeHDFSScheduler();
initialized = true;
@@ -144,7 +147,7 @@
Arrays.fill(executed, false);
configured = true;
- atype = (IAType) configuration.get(KEY_SOURCE_DATATYPE);
+ atype = (IAType) outputType;
configureFormat(atype);
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
index b29e150..991dadb 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
@@ -23,10 +23,11 @@
import edu.uci.ics.asterix.external.dataset.adapter.HDFSAdapter;
import edu.uci.ics.asterix.external.dataset.adapter.HiveAdapter;
-import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
-import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory.AdapterType;
import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IGenericAdapterFactory;
+import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -38,7 +39,7 @@
* A factory class for creating an instance of HiveAdapter
*/
@SuppressWarnings("deprecation")
-public class HiveAdapterFactory extends StreamBasedAdapterFactory {
+public class HiveAdapterFactory extends StreamBasedAdapterFactory implements IGenericAdapterFactory {
private static final long serialVersionUID = 1L;
public static final String HDFS_ADAPTER_NAME = "hdfs";
@@ -106,32 +107,32 @@
}
@Override
- public void configure(Map<String, Object> configuration) throws Exception {
+ public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
if (!configured) {
/** set up the factory --serializable stuff --- this if-block should be called only once for each factory instance */
configureJobConf(configuration);
JobConf conf = configureJobConf(configuration);
confFactory = new ConfFactory(conf);
- clusterLocations = (AlgebricksPartitionConstraint) configuration.get(CLUSTER_LOCATIONS);
+ clusterLocations = AsterixClusterProperties.INSTANCE.getClusterLocations();
int numPartitions = ((AlgebricksAbsolutePartitionConstraint) clusterLocations).getLocations().length;
InputSplit[] inputSplits = conf.getInputFormat().getSplits(conf, numPartitions);
inputSplitsFactory = new InputSplitsFactory(inputSplits);
- Scheduler scheduler = (Scheduler) configuration.get(SCHEDULER);
+ Scheduler scheduler = HDFSAdapterFactory.hdfsScheduler;
readSchedule = scheduler.getLocationConstraints(inputSplits);
executed = new boolean[readSchedule.length];
Arrays.fill(executed, false);
- atype = (IAType) configuration.get(KEY_SOURCE_DATATYPE);
+ atype = (IAType) outputType;
configureFormat(atype);
configured = true;
}
}
- private JobConf configureJobConf(Map<String, Object> configuration) throws Exception {
+ private JobConf configureJobConf(Map<String, String> configuration) throws Exception {
JobConf conf = new JobConf();
/** configure hive */
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
index 5f23f96..7ea5d18 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
@@ -20,6 +20,8 @@
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.external.dataset.adapter.NCFileSystemAdapter;
import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IGenericAdapterFactory;
+import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -32,7 +34,7 @@
* NCFileSystemAdapter reads external data residing on the local file system of
* an NC.
*/
-public class NCFileSystemAdapterFactory extends StreamBasedAdapterFactory {
+public class NCFileSystemAdapterFactory extends StreamBasedAdapterFactory implements IGenericAdapterFactory {
private static final long serialVersionUID = 1L;
public static final String NC_FILE_SYSTEM_ADAPTER_NAME = "localfs";
@@ -62,10 +64,10 @@
}
@Override
- public void configure(Map<String, Object> configuration) throws Exception {
+ public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
this.configuration = configuration;
String[] splits = ((String) configuration.get(KEY_PATH)).split(",");
- IAType sourceDatatype = (IAType) configuration.get(KEY_SOURCE_DATATYPE);
+ IAType sourceDatatype = (IAType) outputType;
configureFileSplits(splits);
configureFormat(sourceDatatype);
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
index 5937013..9c137e1 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
@@ -17,8 +17,11 @@
import java.util.Map;
import edu.uci.ics.asterix.external.dataset.adapter.PullBasedTwitterAdapter;
-import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -28,11 +31,12 @@
* This adapter provides the functionality of fetching tweets from Twitter service
* via pull-based Twitter API.
*/
-public class PullBasedTwitterAdapterFactory implements IAdapterFactory {
+public class PullBasedTwitterAdapterFactory implements ITypedAdapterFactory {
private static final long serialVersionUID = 1L;
public static final String PULL_BASED_TWITTER_ADAPTER_NAME = "pull_twitter";
- private Map<String, Object> configuration;
+ private Map<String, String> configuration;
+ private static ARecordType recordType;
@Override
public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx) throws Exception {
@@ -55,8 +59,18 @@
}
@Override
- public void configure(Map<String, Object> configuration) throws Exception {
+ public void configure(Map<String, String> configuration) throws Exception {
this.configuration = configuration;
+ if (recordType != null) {
+ String[] fieldNames = { "id", "username", "location", "text", "timestamp" };
+ IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
+ BuiltinType.ASTRING };
+ try {
+ recordType = new ARecordType("FeedRecordType", fieldNames, fieldTypes, false);
+ } catch (Exception e) {
+ throw new IllegalStateException("Unable to create adapter output type");
+ }
+ }
}
@Override
@@ -64,4 +78,9 @@
return new AlgebricksCountPartitionConstraint(1);
}
+ @Override
+ public ARecordType getAdapterOutputType() {
+ return recordType;
+ }
+
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java
index d904af9..cc366f6 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java
@@ -21,6 +21,7 @@
import edu.uci.ics.asterix.external.dataset.adapter.RSSFeedAdapter;
import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.BuiltinType;
import edu.uci.ics.asterix.om.types.IAType;
@@ -32,20 +33,20 @@
* Factory class for creating an instance of @see {RSSFeedAdapter}.
* RSSFeedAdapter provides the functionality of fetching an RSS based feed.
*/
-public class RSSFeedAdapterFactory implements IAdapterFactory {
+public class RSSFeedAdapterFactory implements ITypedAdapterFactory {
private static final long serialVersionUID = 1L;
public static final String RSS_FEED_ADAPTER_NAME = "rss_feed";
public static final String KEY_RSS_URL = "url";
public static final String KEY_INTERVAL = "interval";
- private Map<String, Object> configuration;
+ private Map<String, String> configuration;
private ARecordType recordType;
private List<String> feedURLs = new ArrayList<String>();
@Override
public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx) throws Exception {
- RSSFeedAdapter rssFeedAdapter = new RSSFeedAdapter(configuration, ctx);
+ RSSFeedAdapter rssFeedAdapter = new RSSFeedAdapter(configuration, recordType, ctx);
return rssFeedAdapter;
}
@@ -65,7 +66,7 @@
}
@Override
- public void configure(Map<String, Object> configuration) throws Exception {
+ public void configure(Map<String, String> configuration) throws Exception {
this.configuration = configuration;
String rssURLProperty = (String) configuration.get(KEY_RSS_URL);
if (rssURLProperty == null) {
@@ -96,4 +97,9 @@
}
+ @Override
+ public ARecordType getAdapterOutputType() {
+ return recordType;
+ }
+
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/StreamBasedAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/StreamBasedAdapterFactory.java
index 1287de4..8ddc515 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/StreamBasedAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/StreamBasedAdapterFactory.java
@@ -31,7 +31,7 @@
private static final long serialVersionUID = 1L;
private static final Logger LOGGER = Logger.getLogger(StreamBasedAdapterFactory.class.getName());
- protected Map<String, Object> configuration;
+ protected Map<String, String> configuration;
protected static INodeResolver nodeResolver;
private static final INodeResolver DEFAULT_NODE_RESOLVER = new DNSResolverFactory().createNodeResolver();
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java
index 5debc53..017b511 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java
@@ -19,6 +19,7 @@
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
import edu.uci.ics.asterix.metadata.feeds.IFeedAdapter;
+import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
/**
@@ -27,10 +28,11 @@
*/
public class CNNFeedAdapter extends RSSFeedAdapter implements IDatasourceAdapter, IFeedAdapter {
- private static final long serialVersionUID = 2523303758114582251L;
+ private static final long serialVersionUID = 1L;
- public CNNFeedAdapter(Map<String, Object> configuration, IHyracksTaskContext ctx) throws AsterixException {
- super(configuration, ctx);
+ public CNNFeedAdapter(Map<String, String> configuration, ARecordType recordType, IHyracksTaskContext ctx)
+ throws AsterixException {
+ super(configuration, recordType, ctx);
}
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IPullBasedFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IPullBasedFeedClient.java
index e8aadff..ee28c3a 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IPullBasedFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IPullBasedFeedClient.java
@@ -51,7 +51,7 @@
/**
* @param configuration
*/
- public boolean alter(Map<String, Object> configuration);
+ public boolean alter(Map<String, String> configuration);
public void stop();
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
index e162ffe..4022584 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
@@ -48,14 +48,14 @@
private ByteBuffer frame;
protected boolean continueIngestion = true;
protected boolean alterRequested = false;
- private Map<String, Object> modifiedConfiguration = null;
+ private Map<String, String> modifiedConfiguration = null;
private long tupleCount = 0;
private final IHyracksTaskContext ctx;
- protected Map<String, Object> configuration;
+ protected Map<String, String> configuration;
public abstract IPullBasedFeedClient getFeedClient(int partition) throws Exception;
- public PullBasedAdapter(Map<String, Object> configuration, IHyracksTaskContext ctx) {
+ public PullBasedAdapter(Map<String, String> configuration, IHyracksTaskContext ctx) {
this.ctx = ctx;
this.configuration = configuration;
}
@@ -64,7 +64,7 @@
return tupleCount;
}
- public void alter(Map<String, Object> modifedConfiguration) {
+ public void alter(Map<String, String> modifedConfiguration) {
this.modifiedConfiguration = modifedConfiguration;
}
@@ -146,7 +146,7 @@
timer.cancel();
}
- public Map<String, Object> getConfiguration() {
+ public Map<String, String> getConfiguration() {
return configuration;
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
index a9820b7..41143d8 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
@@ -43,12 +43,8 @@
return tweetClient;
}
- public PullBasedTwitterAdapter(Map<String, Object> configuration, IHyracksTaskContext ctx) throws AsterixException {
+ public PullBasedTwitterAdapter(Map<String, String> configuration, IHyracksTaskContext ctx) throws AsterixException {
super(configuration, ctx);
- String[] fieldNames = { "id", "username", "location", "text", "timestamp" };
- IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
- BuiltinType.ASTRING };
- recordType = new ARecordType("FeedRecordType", fieldNames, fieldTypes, false);
this.ctx = ctx;
tweetClient = new PullBasedTwitterFeedClient(ctx, this);
}
@@ -59,7 +55,7 @@
}
@Override
- public void alter(Map<String, Object> properties) {
+ public void alter(Map<String, String> properties) {
alterRequested = true;
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
index 1c260c9..7a5aeea 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
@@ -100,7 +100,7 @@
}
@Override
- public boolean alter(Map<String, Object> configuration) {
+ public boolean alter(Map<String, String> configuration) {
// TODO Auto-generated method stub
return false;
}
@@ -110,7 +110,7 @@
// TODO Auto-generated method stub
}
- private void initialize(Map<String, Object> params) {
+ private void initialize(Map<String, String> params) {
this.keywords = (String) params.get(PullBasedTwitterAdapter.QUERY);
this.requestInterval = Integer.parseInt((String) params.get(PullBasedTwitterAdapter.INTERVAL));
this.query = new Query(keywords);
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
index bd503f2..102482d 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
@@ -22,8 +22,6 @@
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.metadata.feeds.IFeedAdapter;
import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
/**
@@ -36,22 +34,21 @@
private List<String> feedURLs = new ArrayList<String>();
private boolean isStopRequested = false;
private boolean isAlterRequested = false;
- private Map<String, Object> alteredParams = new HashMap<String, Object>();
+ private Map<String, String> alteredParams = new HashMap<String, String>();
private String id_prefix = "";
- private ARecordType recordType;
private IPullBasedFeedClient rssFeedClient;
+ private ARecordType recordType;
public boolean isStopRequested() {
return isStopRequested;
}
- public RSSFeedAdapter(Map<String, Object> configuration, IHyracksTaskContext ctx) throws AsterixException {
+ public RSSFeedAdapter(Map<String, String> configuration, ARecordType recordType, IHyracksTaskContext ctx)
+ throws AsterixException {
super(configuration, ctx);
- recordType = new ARecordType("FeedRecordType", new String[] { "id", "title", "description", "link" },
- new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING },
- false);
id_prefix = ctx.getJobletContext().getApplicationContext().getNodeId();
+ this.recordType = recordType;
}
public void setStopRequested(boolean isStopRequested) {
@@ -59,7 +56,7 @@
}
@Override
- public void alter(Map<String, Object> properties) {
+ public void alter(Map<String, String> properties) {
isAlterRequested = true;
this.alteredParams = properties;
reconfigure(properties);
@@ -78,8 +75,8 @@
}
}
- protected void reconfigure(Map<String, Object> arguments) {
- String rssURLProperty = (String) configuration.get("KEY_RSS_URL");
+ protected void reconfigure(Map<String, String> arguments) {
+ String rssURLProperty = configuration.get("KEY_RSS_URL");
if (rssURLProperty != null) {
initializeFeedURLs(rssURLProperty);
}
@@ -89,7 +86,7 @@
return isAlterRequested;
}
- public Map<String, Object> getAlteredParams() {
+ public Map<String, String> getAlteredParams() {
return alteredParams;
}
@@ -101,7 +98,7 @@
return rssFeedClient;
}
- public ARecordType getAdapterOutputType() {
+ public ARecordType getRecordType() {
return recordType;
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedClient.java
index 531eb3d..8a4b301 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedClient.java
@@ -76,7 +76,7 @@
fetcher.addFetcherEventListener(listener);
mutableFields = new IAObject[] { new AMutableString(null), new AMutableString(null), new AMutableString(null),
new AMutableString(null) };
- recordType = adapter.getAdapterOutputType();
+ recordType = adapter.getRecordType();
mutableRecord = new AMutableRecord(recordType, mutableFields);
tupleFieldValues = new String[recordType.getFieldNames().length];
}
@@ -140,7 +140,7 @@
}
@Override
- public boolean alter(Map<String, Object> configuration) {
+ public boolean alter(Map<String, String> configuration) {
// TODO Auto-generated method stub
return false;
}
diff --git a/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/CommandHandler.java b/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/CommandHandler.java
index 2f26547..45cc9f7 100644
--- a/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/CommandHandler.java
+++ b/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/CommandHandler.java
@@ -67,6 +67,17 @@
case HELP:
cmd = new HelpCommand();
break;
+ case STOPNODE:
+ cmd = new StopNodeCommand();
+ break;
+ case STARTNODE:
+ cmd = new StartNodeCommand();
+ break;
+ case VERSION:
+ cmd = new VersionCommand();
+ break;
+ default:
+ break;
}
cmd.execute(args);
}
diff --git a/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/HelpCommand.java b/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/HelpCommand.java
index b12b167..68f8532 100644
--- a/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/HelpCommand.java
+++ b/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/HelpCommand.java
@@ -66,6 +66,15 @@
case LOG:
helpMessage = new LogCommand().getUsageDescription();
break;
+ case STOPNODE:
+ helpMessage = new StopNodeCommand().getUsageDescription();
+ break;
+ case STARTNODE:
+ helpMessage = new StartNodeCommand().getUsageDescription();
+ break;
+ case VERSION:
+ helpMessage = new VersionCommand().getUsageDescription();
+ break;
default:
helpMessage = "Unknown command " + command;
}
diff --git a/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/ICommand.java b/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/ICommand.java
index 7c68d1d..288c882 100644
--- a/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/ICommand.java
+++ b/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/ICommand.java
@@ -31,7 +31,10 @@
UNINSTALL,
LOG,
SHUTDOWN,
- HELP
+ HELP,
+ STOPNODE,
+ STARTNODE,
+ VERSION
}
public void execute(String args[]) throws Exception;
diff --git a/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/driver/InstallerDriver.java b/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/driver/InstallerDriver.java
index 0e0e6dd..53ec136 100644
--- a/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/driver/InstallerDriver.java
+++ b/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/driver/InstallerDriver.java
@@ -48,7 +48,6 @@
String eventHome = managixHome + File.separator + MANAGIX_INTERNAL_DIR;
AsterixEventService.initialize(conf, asterixDir, eventHome);
-
ILookupService lookupService = ServiceProvider.INSTANCE.getLookupService();
if (ensureLookupServiceIsRunning && !lookupService.isRunning(conf)) {
lookupService.startService(conf);
@@ -103,6 +102,8 @@
+ " Produce a tar archive contianing log files from the master and worker nodes" + "\n");
buffer.append("shutdown " + ":" + " Shutdown the installer service" + "\n");
buffer.append("help " + ":" + " Provides usage description of a command" + "\n");
+ buffer.append("version " + ":" + " Provides version of Asterix/Managix" + "\n");
+
buffer.append("\nTo get more information about a command, use managix help -cmd <command>");
LOGGER.info(buffer.toString());
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
index 4e5813b..9132c63 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
@@ -16,7 +16,6 @@
package edu.uci.ics.asterix.metadata;
import java.rmi.RemoteException;
-import java.util.Collection;
import java.util.List;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -33,6 +32,7 @@
import edu.uci.ics.asterix.metadata.entities.Datatype;
import edu.uci.ics.asterix.metadata.entities.Dataverse;
import edu.uci.ics.asterix.metadata.entities.FeedActivity;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
import edu.uci.ics.asterix.metadata.entities.Function;
import edu.uci.ics.asterix.metadata.entities.Index;
@@ -617,12 +617,13 @@
}
@Override
- public FeedActivity getRecentFeedActivity(MetadataTransactionContext ctx, String dataverseName, String datasetName)
- throws MetadataException {
+ public FeedActivity getRecentFeedActivity(MetadataTransactionContext ctx, String dataverseName, String datasetName,
+ FeedActivityType... feedActivityTypes) throws MetadataException {
FeedActivity feedActivity = null;
try {
- feedActivity = metadataNode.getRecentFeedActivity(ctx.getJobId(), new FeedId(dataverseName, datasetName));
+ feedActivity = metadataNode.getRecentFeedActivity(ctx.getJobId(), new FeedId(dataverseName, datasetName),
+ feedActivityTypes);
} catch (RemoteException e) {
throw new MetadataException(e);
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
index 01e9df6..ff3909a 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
@@ -46,6 +46,7 @@
import edu.uci.ics.asterix.metadata.entities.Datatype;
import edu.uci.ics.asterix.metadata.entities.Dataverse;
import edu.uci.ics.asterix.metadata.entities.FeedActivity;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
import edu.uci.ics.asterix.metadata.entities.Function;
import edu.uci.ics.asterix.metadata.entities.Index;
@@ -1254,7 +1255,8 @@
}
@Override
- public FeedActivity getRecentFeedActivity(JobId jobId, FeedId feedId) throws MetadataException, RemoteException {
+ public FeedActivity getRecentFeedActivity(JobId jobId, FeedId feedId, FeedActivityType... activityType)
+ throws MetadataException, RemoteException {
try {
ITupleReference searchKey = createTuple(feedId.getDataverse(), feedId.getDataset());
FeedActivityTupleTranslator tupleReaderWriter = new FeedActivityTupleTranslator(false);
@@ -1264,7 +1266,17 @@
searchIndex(jobId, MetadataPrimaryIndexes.FEED_ACTIVITY_DATASET, searchKey, valueExtractor, results);
if (!results.isEmpty()) {
Collections.sort(results);
- return results.get(results.size() - 1);
+ if (activityType == null) {
+ return results.get(0);
+ } else {
+ for (FeedActivity result : results) {
+ for (FeedActivityType ft : activityType) {
+ if (result.getActivityType().equals(ft)) {
+ return result;
+ }
+ }
+ }
+ }
}
return null;
} catch (Exception e) {
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
index 8623dbc..8befe1f 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
@@ -28,6 +28,7 @@
import edu.uci.ics.asterix.metadata.entities.Datatype;
import edu.uci.ics.asterix.metadata.entities.Dataverse;
import edu.uci.ics.asterix.metadata.entities.FeedActivity;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
import edu.uci.ics.asterix.metadata.entities.Function;
import edu.uci.ics.asterix.metadata.entities.Index;
@@ -461,8 +462,8 @@
* @return
* @throws MetadataException
*/
- public FeedActivity getRecentFeedActivity(MetadataTransactionContext ctx, String dataverseName, String datasetName)
- throws MetadataException;
+ public FeedActivity getRecentFeedActivity(MetadataTransactionContext ctx, String dataverseName, String datasetName,
+ FeedActivityType... activityTypeFilter) throws MetadataException;
/**
* @param ctx
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java
index 3d8a0fe..3c2e241 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java
@@ -18,7 +18,6 @@
import java.io.Serializable;
import java.rmi.Remote;
import java.rmi.RemoteException;
-import java.util.Collection;
import java.util.List;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
@@ -30,6 +29,7 @@
import edu.uci.ics.asterix.metadata.entities.Datatype;
import edu.uci.ics.asterix.metadata.entities.Dataverse;
import edu.uci.ics.asterix.metadata.entities.FeedActivity;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
import edu.uci.ics.asterix.metadata.entities.Function;
import edu.uci.ics.asterix.metadata.entities.Index;
@@ -484,7 +484,8 @@
* @throws MetadataException
* @throws RemoteException
*/
- public FeedActivity getRecentFeedActivity(JobId jobId, FeedId feedId) throws MetadataException, RemoteException;
+ public FeedActivity getRecentFeedActivity(JobId jobId, FeedId feedId, FeedActivityType... feedActivityFilter)
+ throws MetadataException, RemoteException;
public void initializeDatasetIdFactory(JobId jobId) throws MetadataException, RemoteException;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 3d3bcac..4eeb072 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -64,6 +64,7 @@
import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory.SupportedOperation;
import edu.uci.ics.asterix.metadata.feeds.IFeedMessage;
+import edu.uci.ics.asterix.metadata.feeds.IGenericAdapterFactory;
import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
@@ -164,7 +165,6 @@
private final AsterixStorageProperties storageProperties;
private static final Map<String, String> adapterFactoryMapping = initializeAdapterFactoryMapping();
- public static transient String SCHEDULER = "hdfs-scheduler";
public String getPropertyValue(String propertyName) {
return config.get(propertyName);
@@ -358,9 +358,16 @@
adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
}
- Map<String, Object> configuration = this.wrapProperties(datasetDetails.getProperties());
- configuration.put("source-datatype", itemType);
- adapterFactory.configure(configuration);
+ Map<String, String> configuration = datasetDetails.getProperties();
+
+ switch (adapterFactory.getAdapterType()) {
+ case GENERIC:
+ ((IGenericAdapterFactory) adapterFactory).configure(configuration, (ARecordType) itemType);
+ break;
+ case TYPED:
+ ((ITypedAdapterFactory) adapterFactory).configure(configuration);
+ break;
+ }
} catch (AlgebricksException ae) {
throw ae;
} catch (Exception e) {
@@ -438,25 +445,23 @@
adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
}
- Map<String, Object> configuration = this.wrapProperties(datasetDetails.getProperties());
+ //Map<String, Object> configuration = this.wrapProperties(datasetDetails.getProperties());
+ Map<String, String> configuration = datasetDetails.getProperties();
switch (adapterFactory.getAdapterType()) {
case TYPED:
adapterOutputType = ((ITypedAdapterFactory) adapterFactory).getAdapterOutputType();
+ ((ITypedAdapterFactory) adapterFactory).configure(configuration);
break;
case GENERIC:
String outputTypeName = datasetDetails.getProperties().get("output-type-name");
adapterOutputType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getDataverseName(),
outputTypeName).getDatatype();
+ ((IGenericAdapterFactory) adapterFactory).configure(configuration, (ARecordType) adapterOutputType);
break;
default:
throw new IllegalStateException(" Unknown factory type for " + adapterFactoryClassname);
}
- configuration.put("source-datatype", adapterOutputType);
- adapterFactory.configure(configuration);
-
- } catch (AlgebricksException ae) {
- throw ae;
} catch (Exception e) {
e.printStackTrace();
throw new AlgebricksException("unable to create adapter " + e);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java
index b57f8fe..8e6dab3 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java
@@ -41,6 +41,8 @@
FEED_BEGIN,
FEED_END,
FEED_FAILURE,
+ FEED_RECOVERY,
+ FEED_RESUME,
FEED_STATS,
FEED_EXPAND,
FEED_SHRINK
@@ -145,7 +147,7 @@
@Override
public int compareTo(FeedActivity o) {
- return this.activityId - o.getActivityId();
+ return o.getActivityId() - this.activityId;
}
}
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AlterFeedMessage.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AlterFeedMessage.java
index 96f1dca..9c0d3e5 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AlterFeedMessage.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AlterFeedMessage.java
@@ -25,9 +25,9 @@
private static final long serialVersionUID = 1L;
- private final Map<String, Object> alteredConfParams;
+ private final Map<String, String> alteredConfParams;
- public AlterFeedMessage(Map<String, Object> alteredConfParams) {
+ public AlterFeedMessage(Map<String, String> alteredConfParams) {
super(MessageType.ALTER);
this.alteredConfParams = alteredConfParams;
}
@@ -37,7 +37,7 @@
return MessageType.ALTER;
}
- public Map<String, Object> getAlteredConfParams() {
+ public Map<String, String> getAlteredConfParams() {
return alteredConfParams;
}
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ConditionalPushTupleParserFactory.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ConditionalPushTupleParserFactory.java
index a74627c..2d40000 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ConditionalPushTupleParserFactory.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ConditionalPushTupleParserFactory.java
@@ -43,7 +43,7 @@
}
private final ARecordType recordType;
- private final Map<String, Object> configuration;
+ private final Map<String, String> configuration;
private IValueParserFactory[] valueParserFactories;
private char delimiter;
private final ParserType parserType;
@@ -55,7 +55,7 @@
}
public ConditionalPushTupleParserFactory(ARecordType recordType, IValueParserFactory[] valueParserFactories,
- char fieldDelimiter, Map<String, Object> configuration) {
+ char fieldDelimiter, Map<String, String> configuration) {
this.recordType = recordType;
this.valueParserFactories = valueParserFactories;
this.delimiter = fieldDelimiter;
@@ -64,7 +64,7 @@
}
- public ConditionalPushTupleParserFactory(ARecordType recordType, Map<String, Object> configuration) {
+ public ConditionalPushTupleParserFactory(ARecordType recordType, Map<String, String> configuration) {
this.recordType = recordType;
this.configuration = configuration;
this.parserType = ParserType.ADM;
@@ -88,7 +88,7 @@
public static final String BATCH_INTERVAL = "batch-interval";
public ConditionalPushTupleParser(IHyracksTaskContext ctx, ARecordType recType, IDataParser dataParser,
- Map<String, Object> configuration) {
+ Map<String, String> configuration) {
super(ctx, recType);
this.dataParser = dataParser;
String propValue = (String) configuration.get(BATCH_SIZE);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
index 3893e54..0d46e43 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
@@ -20,7 +20,6 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import edu.uci.ics.asterix.common.api.AsterixThreadExecutor;
import edu.uci.ics.asterix.metadata.feeds.AdapterRuntimeManager.State;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
@@ -82,8 +81,14 @@
}
FeedManager.INSTANCE.deRegisterFeedRuntime(adapterRuntimeMgr);
} catch (InterruptedException ie) {
- // check policy
- adapterRuntimeMgr.setState(State.INACTIVE_INGESTION);
+ if (policyEnforcer.getFeedPolicyAccessor().continueOnHardwareFailure()) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Continuing on failure as per feed policy");
+ }
+ adapterRuntimeMgr.setState(State.INACTIVE_INGESTION);
+ } else {
+ throw new HyracksDataException(ie);
+ }
} catch (Exception e) {
e.printStackTrace();
throw new HyracksDataException(e);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
index 03daa145..b02e05b 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
@@ -14,6 +14,9 @@
*/
package edu.uci.ics.asterix.metadata.feeds;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -24,6 +27,8 @@
*/
public class FeedMessageOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
+ private static final Logger LOGGER = Logger.getLogger(FeedMessageOperatorNodePushable.class.getName());
+
private final FeedId feedId;
private final IFeedMessage feedMessage;
private final int partition;
@@ -43,6 +48,9 @@
if (adapterRuntimeMgr != null) {
switch (feedMessage.getMessageType()) {
case END:
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Ending feed:" + feedId);
+ }
adapterRuntimeMgr.stop();
break;
case ALTER:
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IAdapterFactory.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IAdapterFactory.java
index cc8e165..5ea79d0 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IAdapterFactory.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IAdapterFactory.java
@@ -67,11 +67,6 @@
public AdapterType getAdapterType();
/**
- * @param configuration
- */
- public void configure(Map<String, Object> configuration) throws Exception;
-
- /**
* Returns a list of partition constraints. A partition constraint can be a
* requirement to execute at a particular location or could be cardinality
* constraints indicating the number of instances that need to run in
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedAdapter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedAdapter.java
index 9c5a612..164d292 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedAdapter.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedAdapter.java
@@ -38,6 +38,6 @@
* A HashMap containing the set of configuration parameters
* that need to be altered.
*/
- public void alter(Map<String, Object> properties);
+ public void alter(Map<String, String> properties);
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ITypedAdapterFactory.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ITypedAdapterFactory.java
index a871aba..76ea93a 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ITypedAdapterFactory.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ITypedAdapterFactory.java
@@ -1,8 +1,12 @@
package edu.uci.ics.asterix.metadata.feeds;
+import java.util.Map;
+
import edu.uci.ics.asterix.om.types.ARecordType;
public interface ITypedAdapterFactory extends IAdapterFactory {
public ARecordType getAdapterOutputType();
+
+ public void configure(Map<String, String> configuration) throws Exception;
}
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixClusterProperties.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixClusterProperties.java
index 96cba42..81fd78d 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixClusterProperties.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixClusterProperties.java
@@ -15,6 +15,7 @@
package edu.uci.ics.asterix.om.util;
import java.io.InputStream;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -25,8 +26,12 @@
import javax.xml.bind.JAXBException;
import javax.xml.bind.Unmarshaller;
+import edu.uci.ics.asterix.common.config.AsterixMetadataProperties;
+import edu.uci.ics.asterix.common.config.IAsterixPropertiesProvider;
import edu.uci.ics.asterix.event.schema.cluster.Cluster;
import edu.uci.ics.asterix.event.schema.cluster.Node;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
/**
* A holder class for properties related to the Asterix cluster.
@@ -45,6 +50,8 @@
public static final String CLUSTER_CONFIGURATION_FILE = "cluster.xml";
private final Cluster cluster;
+ private AlgebricksAbsolutePartitionConstraint clusterPartitionConstraint;
+
private AsterixClusterProperties() {
InputStream is = this.getClass().getClassLoader().getResourceAsStream(CLUSTER_CONFIGURATION_FILE);
if (is != null) {
@@ -71,6 +78,7 @@
public void removeNCConfiguration(String nodeId) {
// state = State.UNUSABLE;
ncConfiguration.remove(nodeId);
+ resetClusterPartitionConstraint();
}
public void addNCConfiguration(String nodeId, Map<String, String> configuration) {
@@ -82,6 +90,7 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info(" Registering configuration parameters for node id" + nodeId);
}
+ resetClusterPartitionConstraint();
}
/**
@@ -118,4 +127,27 @@
return subNodes == null || subNodes.isEmpty() ? null : subNodes.get(0);
}
+ public AlgebricksPartitionConstraint getClusterLocations() {
+ if (clusterPartitionConstraint == null) {
+ resetClusterPartitionConstraint();
+ }
+ return clusterPartitionConstraint;
+ }
+
+ private void resetClusterPartitionConstraint() {
+ Map<String, String[]> stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
+ ArrayList<String> locs = new ArrayList<String>();
+ for (String i : stores.keySet()) {
+ String[] nodeStores = stores.get(i);
+ int numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(i);
+ for (int j = 0; j < nodeStores.length; j++) {
+ for (int k = 0; k < numIODevices; k++) {
+ locs.add(i);
+ }
+ }
+ }
+ String[] cluster = new String[locs.size()];
+ cluster = locs.toArray(cluster);
+ clusterPartitionConstraint = new AlgebricksAbsolutePartitionConstraint(cluster);
+ }
}
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapter.java
index b693073..bd456a2 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapter.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapter.java
@@ -24,13 +24,13 @@
private static final Logger LOGGER = Logger.getLogger(GenericSocketFeedAdapter.class.getName());
- private Map<String, Object> configuration;
+ private Map<String, String> configuration;
private SocketFeedServer socketFeedServer;
private static final int DEFAULT_PORT = 2909;
- public GenericSocketFeedAdapter(Map<String, Object> configuration, ITupleParserFactory parserFactory,
+ public GenericSocketFeedAdapter(Map<String, String> configuration, ITupleParserFactory parserFactory,
ARecordType outputtype, IHyracksTaskContext ctx) throws AsterixException, IOException {
super(parserFactory, outputtype, ctx);
this.configuration = configuration;
@@ -53,7 +53,7 @@
private ServerSocket serverSocket;
private InputStream inputStream;
- public SocketFeedServer(Map<String, Object> configuration, ARecordType outputtype, int port)
+ public SocketFeedServer(Map<String, String> configuration, ARecordType outputtype, int port)
throws IOException, AsterixException {
try {
serverSocket = new ServerSocket(port);
@@ -90,7 +90,7 @@
}
@Override
- public void alter(Map<String, Object> properties) {
+ public void alter(Map<String, String> properties) {
// TODO Auto-generated method stub
}
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapterFactory.java
index 961e519..788bb4f 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapterFactory.java
@@ -18,7 +18,7 @@
import edu.uci.ics.asterix.external.adapter.factory.StreamBasedAdapterFactory;
import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
-import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
+import edu.uci.ics.asterix.metadata.feeds.IGenericAdapterFactory;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -30,7 +30,7 @@
* Data received is transformed into Asterix Data Format (ADM) and stored into
* a dataset a configured in the Adapter configuration.
*/
-public class GenericSocketFeedAdapterFactory extends StreamBasedAdapterFactory implements ITypedAdapterFactory {
+public class GenericSocketFeedAdapterFactory extends StreamBasedAdapterFactory implements IGenericAdapterFactory {
/**
*
@@ -55,9 +55,9 @@
}
@Override
- public void configure(Map<String, Object> configuration) throws Exception {
+ public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
this.configuration = configuration;
- outputType = (ARecordType) configuration.get(KEY_SOURCE_DATATYPE);
+ outputType = (ARecordType) outputType;
this.configureFormat(outputType);
}
@@ -71,9 +71,4 @@
return new GenericSocketFeedAdapter(configuration, parserFactory, outputType, ctx);
}
- @Override
- public ARecordType getAdapterOutputType() {
- return outputType;
- }
-
}
\ No newline at end of file
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
index 81d19ae..80565b2 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
@@ -35,7 +35,7 @@
private static final long serialVersionUID = 1L;
private FileSystemBasedAdapter coreAdapter;
- public RateControlledFileSystemBasedAdapter(ARecordType atype, Map<String, Object> configuration,
+ public RateControlledFileSystemBasedAdapter(ARecordType atype, Map<String, String> configuration,
FileSystemBasedAdapter coreAdapter, String format, ITupleParserFactory parserFactory,
IHyracksTaskContext ctx) throws Exception {
super(parserFactory, atype, ctx);
@@ -48,8 +48,8 @@
}
@Override
- public void alter(Map<String, Object> properties) {
- ((RateControlledTupleParser) tupleParser).setInterTupleInterval(Long.parseLong((String) properties
+ public void alter(Map<String, String> properties) {
+ ((RateControlledTupleParser) tupleParser).setInterTupleInterval(Long.parseLong(properties
.get(RateControlledTupleParser.INTER_TUPLE_INTERVAL)));
}
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
index dbe5d9a..1de9c9f 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
@@ -19,12 +19,12 @@
import java.util.Map;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.external.adapter.factory.StreamBasedAdapterFactory;
import edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory;
import edu.uci.ics.asterix.external.adapter.factory.NCFileSystemAdapterFactory;
+import edu.uci.ics.asterix.external.adapter.factory.StreamBasedAdapterFactory;
import edu.uci.ics.asterix.external.dataset.adapter.FileSystemBasedAdapter;
-import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IGenericAdapterFactory;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.ATypeTag;
import edu.uci.ics.asterix.runtime.operators.file.ADMDataParser;
@@ -47,7 +47,8 @@
* on the local file system or on HDFS. The feed ends when the content of the
* source file has been ingested.
*/
-public class RateControlledFileSystemBasedAdapterFactory extends StreamBasedAdapterFactory {
+public class RateControlledFileSystemBasedAdapterFactory extends StreamBasedAdapterFactory implements
+ IGenericAdapterFactory {
private static final long serialVersionUID = 1L;
public static final String KEY_FILE_SYSTEM = "fs";
@@ -56,9 +57,9 @@
public static final String KEY_PATH = "path";
public static final String KEY_FORMAT = "format";
- private IAdapterFactory adapterFactory;
+ private IGenericAdapterFactory adapterFactory;
private String format;
- private Map<String, Object> configuration;
+ private Map<String, String> configuration;
private ARecordType atype;
@Override
@@ -72,7 +73,7 @@
return "file_feed";
}
- private void checkRequiredArgs(Map<String, Object> configuration) throws Exception {
+ private void checkRequiredArgs(Map<String, String> configuration) throws Exception {
if (configuration.get(KEY_FILE_SYSTEM) == null) {
throw new Exception("File system type not specified. (fs=?) File system could be 'localfs' or 'hdfs'");
}
@@ -98,7 +99,7 @@
}
@Override
- public void configure(Map<String, Object> configuration) throws Exception {
+ public void configure(Map<String, String> configuration, ARecordType recordType) throws Exception {
this.configuration = configuration;
checkRequiredArgs(configuration);
String fileSystem = (String) configuration.get(KEY_FILE_SYSTEM);
@@ -110,11 +111,11 @@
} else {
throw new AsterixException("Unsupported file system type " + fileSystem);
}
- format = (String) configuration.get(KEY_FORMAT);
- adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClass).newInstance();
- adapterFactory.configure(configuration);
+ format = configuration.get(KEY_FORMAT);
+ adapterFactory = (IGenericAdapterFactory) Class.forName(adapterFactoryClass).newInstance();
+ adapterFactory.configure(configuration, recordType);
- atype = (ARecordType) configuration.get(KEY_SOURCE_DATATYPE);
+ atype = (ARecordType) recordType;
configureFormat();
}
@@ -163,7 +164,7 @@
private static final long serialVersionUID = 1L;
private final ARecordType recordType;
- private final Map<String, Object> configuration;
+ private final Map<String, String> configuration;
private IValueParserFactory[] valueParserFactories;
private char delimiter;
private final ParserType parserType;
@@ -174,7 +175,7 @@
}
public RateControlledTupleParserFactory(ARecordType recordType, IValueParserFactory[] valueParserFactories,
- char fieldDelimiter, Map<String, Object> configuration) {
+ char fieldDelimiter, Map<String, String> configuration) {
this.recordType = recordType;
this.valueParserFactories = valueParserFactories;
this.delimiter = fieldDelimiter;
@@ -182,7 +183,7 @@
this.parserType = ParserType.DELIMITED_DATA;
}
- public RateControlledTupleParserFactory(ARecordType recordType, Map<String, Object> configuration) {
+ public RateControlledTupleParserFactory(ARecordType recordType, Map<String, String> configuration) {
this.recordType = recordType;
this.configuration = configuration;
this.parserType = ParserType.ADM;
@@ -214,10 +215,10 @@
public static final String INTER_TUPLE_INTERVAL = "tuple-interval";
public RateControlledTupleParser(IHyracksTaskContext ctx, ARecordType recType, IDataParser dataParser,
- Map<String, Object> configuration) {
+ Map<String, String> configuration) {
super(ctx, recType);
this.dataParser = dataParser;
- String propValue = (String) configuration.get(INTER_TUPLE_INTERVAL);
+ String propValue = configuration.get(INTER_TUPLE_INTERVAL);
if (propValue != null) {
interTupleInterval = Long.parseLong(propValue);
} else {
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java
index 0e58ed2..d26e764 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java
@@ -36,9 +36,9 @@
private static final Logger LOGGER = Logger.getLogger(SyntheticTwitterFeedAdapter.class.getName());
- private Map<String, Object> configuration;
+ private Map<String, String> configuration;
- public SyntheticTwitterFeedAdapter(Map<String, Object> configuration, ARecordType outputType,
+ public SyntheticTwitterFeedAdapter(Map<String, String> configuration, ARecordType outputType,
IHyracksTaskContext ctx) throws AsterixException {
super(configuration, ctx);
this.configuration = configuration;
@@ -71,7 +71,7 @@
private int tweetCountBeforeException = 0;
private int exceptionPeriod = -1;
- public SyntheticTwitterFeedClient(Map<String, Object> configuration, ARecordType outputRecordType, int partition)
+ public SyntheticTwitterFeedClient(Map<String, String> configuration, ARecordType outputRecordType, int partition)
throws AsterixException {
this.outputRecordType = outputRecordType;
String value = (String) configuration.get(KEY_DURATION);
@@ -164,7 +164,7 @@
}
@Override
- public boolean alter(Map<String, Object> configuration) {
+ public boolean alter(Map<String, String> configuration) {
// TODO Auto-generated method stub
return false;
}
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java
index afb57bf..408d648 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java
@@ -49,7 +49,7 @@
*/
private static final long serialVersionUID = 1L;
- private Map<String, Object> configuration;
+ private Map<String, String> configuration;
private static final String KEY_DATAVERSE_DATASET = "dataverse-dataset";
@@ -71,7 +71,7 @@
}
@Override
- public void configure(Map<String, Object> configuration) throws Exception {
+ public void configure(Map<String, String> configuration) throws Exception {
this.configuration = configuration;
}
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator.java
index 9cae374..9d47de2 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator.java
@@ -2,11 +2,8 @@
import java.io.IOException;
import java.io.OutputStream;
-import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
-import java.util.UUID;
-import java.util.logging.Level;
import java.util.logging.Logger;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
@@ -56,13 +53,13 @@
private byte[] EOL = "\n".getBytes();
private OutputStream os;
- public TweetGenerator(Map<String, Object> configuration, ARecordType outputRecordType, int partition, String format)
+ public TweetGenerator(Map<String, String> configuration, ARecordType outputRecordType, int partition, String format)
throws AsterixException {
this.outputRecordType = outputRecordType;
- String value = (String) configuration.get(KEY_DURATION);
+ String value = configuration.get(KEY_DURATION);
duration = value != null ? Integer.parseInt(value) : 60;
- initializeTweetRate((String) configuration.get(KEY_TPS));
- value = (String) configuration.get(KEY_EXCEPTION_PERIOD);
+ initializeTweetRate(configuration.get(KEY_TPS));
+ value = configuration.get(KEY_EXCEPTION_PERIOD);
if (value != null) {
exceptionPeriod = Integer.parseInt(value);
}
@@ -158,7 +155,7 @@
}
@Override
- public boolean alter(Map<String, Object> configuration) {
+ public boolean alter(Map<String, String> configuration) {
// TODO Auto-generated method stub
return false;
}
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
index c8fa860..a0d13ed 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
@@ -30,7 +30,7 @@
private static final Logger LOGGER = Logger.getLogger(TwitterFirehoseFeedAdapter.class.getName());
- private Map<String, Object> configuration;
+ private Map<String, String> configuration;
private TweetGenerator twitterFeedClient;
@@ -41,7 +41,9 @@
private static final String LOCALHOST = "127.0.0.1";
private static final int PORT = 2909;
- public TwitterFirehoseFeedAdapter(Map<String, Object> configuration, ITupleParserFactory parserFactory,
+ public static final String SIMULATE_UNREACHABLE_SERVER = "simulate-unreachable-server";
+
+ public TwitterFirehoseFeedAdapter(Map<String, String> configuration, ITupleParserFactory parserFactory,
ARecordType outputtype, IHyracksTaskContext ctx) throws AsterixException, IOException {
super(parserFactory, outputtype, ctx);
this.configuration = configuration;
@@ -68,7 +70,7 @@
private final Listener listener;
private int port = -1;
- public TwitterServer(Map<String, Object> configuration, ARecordType outputtype) throws IOException,
+ public TwitterServer(Map<String, String> configuration, ARecordType outputtype) throws IOException,
AsterixException {
int numAttempts = 0;
while (port < 0) {
@@ -128,7 +130,7 @@
private TweetGenerator tweetGenerator;
private boolean continuePush = true;
- public Listener(ServerSocket serverSocket, Map<String, Object> configuration, ARecordType outputtype)
+ public Listener(ServerSocket serverSocket, Map<String, String> configuration, ARecordType outputtype)
throws IOException, AsterixException {
this.serverSocket = serverSocket;
this.tweetGenerator = new TweetGenerator(configuration, outputtype, 0,
@@ -173,7 +175,7 @@
}
@Override
- public void alter(Map<String, Object> properties) {
+ public void alter(Map<String, String> properties) {
// TODO Auto-generated method stub
}
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
index 9500436..ca16dd3 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
@@ -71,7 +71,7 @@
}
@Override
- public void configure(Map<String, Object> configuration) throws Exception {
+ public void configure(Map<String, String> configuration) throws Exception {
this.configuration = configuration;
configuration.put(KEY_FORMAT, FORMAT_ADM);
this.configureFormat(initOutputType());