added test case for fault-tolerance
diff --git a/asterix-app/pom.xml b/asterix-app/pom.xml
index 30e7f55..06d289e 100644
--- a/asterix-app/pom.xml
+++ b/asterix-app/pom.xml
@@ -86,7 +86,6 @@
<additionalClasspathElement>${basedir}/src/main/resources</additionalClasspathElement>
</additionalClasspathElements> -->
<forkMode>pertest</forkMode>
- <skipTests>true</skipTests>
<argLine>-enableassertions -Xmx${test.heap.size}m
-Dfile.encoding=UTF-8
-Djava.util.logging.config.file=src/test/resources/logging.properties
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
index 1d8ba1d..e1eb8c4 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
@@ -712,7 +712,7 @@
break;
case REVIVAL_POST_NODE_REJOIN:
try {
- Thread.sleep(2000);
+ Thread.sleep(10000);
} catch (InterruptedException e1) {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Attempt to resume feed interrupted");
@@ -739,14 +739,18 @@
MetadataTransactionContext ctx = null;
try {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Attempting to Resume feeds!");
- }
- Thread.sleep(2000);
+
+ Thread.sleep(4000);
MetadataManager.INSTANCE.init();
ctx = MetadataManager.INSTANCE.beginTransaction();
List<FeedActivity> activeFeeds = MetadataManager.INSTANCE.getActiveFeeds(ctx);
- MetadataManager.INSTANCE.commitTransaction(ctx);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Attempt to resume feeds that were active prior to instance shutdown!");
+ LOGGER.info("Number of feeds affected:" + activeFeeds.size());
+ for (FeedActivity fa : activeFeeds) {
+ LOGGER.info("Active feed " + fa.getDataverseName() + ":" + fa.getDatasetName());
+ }
+ }
for (FeedActivity fa : activeFeeds) {
String feedPolicy = fa.getFeedActivityDetails().get(FeedActivityDetails.FEED_POLICY_NAME);
FeedPolicy policy = MetadataManager.INSTANCE.getFeedPolicy(ctx, fa.getDataverseName(), feedPolicy);
@@ -758,8 +762,8 @@
LOGGER.severe("Unable to resume feed: " + fa.getDataverseName() + ":"
+ fa.getDatasetName() + "." + " Unknown policy :" + feedPolicy);
}
+ continue;
}
- continue;
}
FeedPolicyAccessor fpa = new FeedPolicyAccessor(policy.getProperties());
@@ -779,6 +783,7 @@
}
}
}
+ MetadataManager.INSTANCE.commitTransaction(ctx);
} catch (Exception e) {
e.printStackTrace();
diff --git a/asterix-events/src/main/java/edu/uci/ics/asterix/event/model/AsterixRuntimeState.java b/asterix-events/src/main/java/edu/uci/ics/asterix/event/model/AsterixRuntimeState.java
index efada48..b8fbabb 100644
--- a/asterix-events/src/main/java/edu/uci/ics/asterix/event/model/AsterixRuntimeState.java
+++ b/asterix-events/src/main/java/edu/uci/ics/asterix/event/model/AsterixRuntimeState.java
@@ -19,6 +19,7 @@
public class AsterixRuntimeState implements Serializable {
+ private static final long serialVersionUID = 1L;
private final List<ProcessInfo> processes;
private final List<String> failedNCs;
private final boolean ccRunning;
diff --git a/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/StartNodeCommand.java b/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/StartNodeCommand.java
index 6daf4fc..7d0a6ee 100644
--- a/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/StartNodeCommand.java
+++ b/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/StartNodeCommand.java
@@ -24,6 +24,7 @@
import edu.uci.ics.asterix.event.model.AsterixInstance;
import edu.uci.ics.asterix.event.model.AsterixInstance.State;
import edu.uci.ics.asterix.event.model.AsterixRuntimeState;
+import edu.uci.ics.asterix.event.model.ProcessInfo;
import edu.uci.ics.asterix.event.schema.cluster.Cluster;
import edu.uci.ics.asterix.event.schema.cluster.Node;
import edu.uci.ics.asterix.event.schema.pattern.Pattern;
@@ -33,6 +34,7 @@
import edu.uci.ics.asterix.event.service.ServiceProvider;
import edu.uci.ics.asterix.event.util.PatternCreator;
import edu.uci.ics.asterix.installer.driver.InstallerDriver;
+import edu.uci.ics.asterix.installer.error.InstallerException;
public class StartNodeCommand extends AbstractCommand {
@@ -45,9 +47,17 @@
Cluster cluster = instance.getCluster();
List<Pattern> pl = new ArrayList<Pattern>();
+ AsterixRuntimeState runtimeState = VerificationUtil.getAsterixRuntimeState(instance);
String[] nodesToBeAdded = ((StartNodeConfig) config).nodes.split(",");
+ List<String> aliveNodes = new ArrayList<String>();
+ for (ProcessInfo p : runtimeState.getProcesses()) {
+ aliveNodes.add(p.getNodeId());
+ }
List<Node> clusterNodes = cluster.getNode();
for (String n : nodesToBeAdded) {
+ if (aliveNodes.contains(n)) {
+ throw new InstallerException("Node: " + n + " is already alive");
+ }
for (Node node : clusterNodes) {
if (n.equals(node.getId())) {
String iodevices = node.getIodevices() == null ? cluster.getIodevices() : node.getIodevices();
@@ -61,7 +71,7 @@
Patterns patterns = new Patterns(pl);
AsterixEventServiceClient client = AsterixEventService.getAsterixEventServiceClient(cluster);
client.submit(patterns);
- AsterixRuntimeState runtimeState = VerificationUtil.getAsterixRuntimeState(instance);
+ runtimeState = VerificationUtil.getAsterixRuntimeState(instance);
VerificationUtil.updateInstanceWithRuntimeDescription(instance, runtimeState, true);
LOGGER.info(instance.getDescription(false));
ServiceProvider.INSTANCE.getLookupService().updateAsterixInstance(instance);
diff --git a/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/StopNodeCommand.java b/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/StopNodeCommand.java
index a0dc7c5..c7cc6de 100644
--- a/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/StopNodeCommand.java
+++ b/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/StopNodeCommand.java
@@ -20,9 +20,12 @@
import org.kohsuke.args4j.Option;
+import edu.uci.ics.asterix.event.error.VerificationUtil;
import edu.uci.ics.asterix.event.management.AsterixEventServiceClient;
import edu.uci.ics.asterix.event.model.AsterixInstance;
import edu.uci.ics.asterix.event.model.AsterixInstance.State;
+import edu.uci.ics.asterix.event.model.AsterixRuntimeState;
+import edu.uci.ics.asterix.event.model.ProcessInfo;
import edu.uci.ics.asterix.event.schema.cluster.Node;
import edu.uci.ics.asterix.event.schema.pattern.Pattern;
import edu.uci.ics.asterix.event.schema.pattern.Patterns;
@@ -46,6 +49,12 @@
.getCluster());
String[] nodesToStop = ((StopNodeConfig) config).nodeList.split(",");
+ AsterixRuntimeState runtimeState = VerificationUtil.getAsterixRuntimeState(asterixInstance);
+ List<String> aliveNodes = new ArrayList<String>();
+ for (ProcessInfo p : runtimeState.getProcesses()) {
+ aliveNodes.add(p.getNodeId());
+ }
+
List<String> validNodeIds = new ArrayList<String>();
for (Node node : asterixInstance.getCluster().getNode()) {
validNodeIds.add(node.getId());
@@ -55,6 +64,9 @@
if (!nodeId.contains(nodeId)) {
throw new InstallerException("Invalid nodeId: " + nodeId);
}
+ if (!aliveNodes.contains(nodeId)) {
+ throw new InstallerException("Node: " + nodeId + " is not alive");
+ }
ncKillPatterns.add(PatternCreator.INSTANCE.createNCStopPattern(nodeId, asterixInstanceName + "_" + nodeId));
}
diff --git a/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.1.ddl.aql b/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.1.ddl.aql
new file mode 100644
index 0000000..5d357c6
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.1.ddl.aql
@@ -0,0 +1,28 @@
+drop dataverse feeds if exists;
+create dataverse feeds;
+use dataverse feeds;
+
+create type TwitterUserType as closed {
+ screen-name: string,
+ lang: string,
+ friends_count: int32,
+ statuses_count: int32,
+ name: string,
+ followers_count: int32
+}
+
+create type TweetMessageType as closed {
+ tweetid: string,
+ user: TwitterUserType,
+ sender-location: point,
+ send-time: datetime,
+ referred-topics: {{ string }},
+ message-text: string
+}
+
+
+create feed dataset TwitterFirehoseFeed(TweetMessageType)
+using twitter_firehose
+(("duration"="30"),("tps"="50"),("dataverse-dataset"="feeds:TwitterFirehoseFeed"))
+primary key tweetid;
+
diff --git a/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.2.update.aql b/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.2.update.aql
new file mode 100644
index 0000000..382425f
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.2.update.aql
@@ -0,0 +1,4 @@
+use dataverse feeds;
+
+begin feed TwitterFirehoseFeed;
+
diff --git a/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.3.mgx.aql b/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.3.mgx.aql
new file mode 100644
index 0000000..2d8a23e
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.3.mgx.aql
@@ -0,0 +1 @@
+stop -n asterix
diff --git a/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.4.mgx.aql b/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.4.mgx.aql
new file mode 100644
index 0000000..4e99f33
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.4.mgx.aql
@@ -0,0 +1 @@
+start -n asterix
diff --git a/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.5.query.aql b/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.5.query.aql
new file mode 100644
index 0000000..474835a
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.5.query.aql
@@ -0,0 +1,4 @@
+use dataverse feeds;
+
+for $x in dataset Metadata.FeedActivity
+return $x
diff --git a/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.6.query.aql b/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.6.query.aql
new file mode 100644
index 0000000..8ce6cb9
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.6.query.aql
@@ -0,0 +1,4 @@
+use dataverse feeds;
+
+count(for $x in dataset TwitterFirehoseFeed
+return $x)
diff --git a/asterix-installer/src/test/resources/integrationts/fault-tolerance/testsuite.xml b/asterix-installer/src/test/resources/integrationts/fault-tolerance/testsuite.xml
index 859f29a..145c304 100644
--- a/asterix-installer/src/test/resources/integrationts/fault-tolerance/testsuite.xml
+++ b/asterix-installer/src/test/resources/integrationts/fault-tolerance/testsuite.xml
@@ -5,6 +5,11 @@
<output-dir compare="Text">IN2-1</output-dir>
</compilation-unit>
</test-case>
+ <test-case FilePath="feeds">
+ <compilation-unit name="IN1-cluster-restart">
+ <output-dir compare="Text">IN1-cluster-restart</output-dir>
+ </compilation-unit>
+ </test-case>
</test-group>
</test-suite>
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
index b02e05b..4ffa8a9 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
@@ -52,6 +52,7 @@
LOGGER.info("Ending feed:" + feedId);
}
adapterRuntimeMgr.stop();
+ FeedManager.INSTANCE.deRegisterFeedRuntime(adapterRuntimeMgr);
break;
case ALTER:
adapterRuntimeMgr.getFeedAdapter().alter(