re-orchestrated feeds fault tolerance integration test
diff --git a/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterix-app/src/test/resources/runtimets/testsuite.xml
index 10cb29d..7c614c0 100644
--- a/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -4272,7 +4272,6 @@
</test-case>
</test-group>
<test-group name="load">
- <!--
<test-case FilePath="load">
<compilation-unit name="csv_01">
<output-dir compare="Text">csv_01</output-dir>
@@ -4283,7 +4282,6 @@
<output-dir compare="Text">csv_02</output-dir>
</compilation-unit>
</test-case>
- -->
<test-case FilePath="load">
<compilation-unit name="issue14_query">
<output-dir compare="Text">none</output-dir>
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/StreamBasedAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/StreamBasedAdapterFactory.java
index 02ffdda..8d578f7 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/StreamBasedAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/StreamBasedAdapterFactory.java
@@ -1,6 +1,7 @@
package edu.uci.ics.asterix.external.adapter.factory;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -13,6 +14,7 @@
import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.AUnionType;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.asterix.runtime.operators.file.AdmSchemafullRecordParserFactory;
import edu.uci.ics.asterix.runtime.operators.file.NtDelimitedDataTupleParserFactory;
@@ -62,7 +64,19 @@
int n = recordType.getFieldTypes().length;
IValueParserFactory[] fieldParserFactories = new IValueParserFactory[n];
for (int i = 0; i < n; i++) {
- ATypeTag tag = recordType.getFieldTypes()[i].getTypeTag();
+ ATypeTag tag = null;
+ if (recordType.getFieldTypes()[i].getTypeTag() == ATypeTag.UNION) {
+ List<IAType> unionTypes = ((AUnionType) recordType.getFieldTypes()[i]).getUnionList();
+ if (unionTypes.size() != 2 && unionTypes.get(0).getTypeTag() != ATypeTag.NULL) {
+ throw new NotImplementedException("Non-optional UNION type is not supported.");
+ }
+ tag = unionTypes.get(1).getTypeTag();
+ } else {
+ tag = recordType.getFieldTypes()[i].getTypeTag();
+ }
+ if (tag == null) {
+ throw new NotImplementedException("Failed to get the type information for field " + i + ".");
+ }
IValueParserFactory vpf = typeToValueParserFactMap.get(tag);
if (vpf == null) {
throw new NotImplementedException("No value parser factory for delimited fields of type " + tag);
@@ -116,5 +130,4 @@
}
-
}
diff --git a/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.1.ddl.aql b/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.1.ddl.aql
index 2b3ce0d..1f0678e 100644
--- a/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.1.ddl.aql
+++ b/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.1.ddl.aql
@@ -12,7 +12,7 @@
}
create type TweetMessageType as closed {
- tweetid: string,
+ tweetid: int64,
user: TwitterUserType,
sender-location: point,
send-time: datetime,
@@ -25,4 +25,4 @@
create feed TwitterFirehose
using twitter_firehose
-(("duration"="5"),("tps"="50"));
+(("duration"="30"),("tps"="50"),("tput-duration"="5"),("mode"="controlled"));
diff --git a/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.3.mgx.aql b/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.3.mgx.aql
deleted file mode 100644
index 2d8a23e..0000000
--- a/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.3.mgx.aql
+++ /dev/null
@@ -1 +0,0 @@
-stop -n asterix
diff --git a/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.3.sleep.aql b/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.3.sleep.aql
new file mode 100644
index 0000000..5caff40
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.3.sleep.aql
@@ -0,0 +1 @@
+10000
diff --git a/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.4.mgx.aql b/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.4.mgx.aql
index 4e99f33..2d8a23e 100644
--- a/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.4.mgx.aql
+++ b/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.4.mgx.aql
@@ -1 +1 @@
-start -n asterix
+stop -n asterix
diff --git a/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.5.mgx.aql b/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.5.mgx.aql
new file mode 100644
index 0000000..4e99f33
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.5.mgx.aql
@@ -0,0 +1 @@
+start -n asterix
diff --git a/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.5.sleep.aql b/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.6.sleep.aql
similarity index 100%
rename from asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.5.sleep.aql
rename to asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.6.sleep.aql
diff --git a/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.6.query.aql b/asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.7.query.aql
similarity index 100%
rename from asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.6.query.aql
rename to asterix-installer/src/test/resources/integrationts/fault-tolerance/queries/feeds/IN1-cluster-restart/in1-cluster-restart.7.query.aql