1) modified tweet schema to have tweet ids as int64. 2) Added GUID (Long) generator for generating cluster wide unique Ids
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
index 6880308..bf89ccb 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
@@ -80,7 +80,7 @@
}
@Override
- public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx) throws Exception {
+ public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
CNNFeedAdapter cnnFeedAdapter = new CNNFeedAdapter(configuration, recordType, ctx);
return cnnFeedAdapter;
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
index af057c9..f81c0d5 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
@@ -88,7 +88,7 @@
}
@Override
- public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx) throws Exception {
+ public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
JobConf conf = confFactory.getConf();
InputSplit[] inputSplits = inputSplitsFactory.getSplits();
String nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
index 991dadb..054fee3 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
@@ -82,7 +82,7 @@
}
@Override
- public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx) throws Exception {
+ public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
JobConf conf = confFactory.getConf();
InputSplit[] inputSplits = inputSplitsFactory.getSplits();
String nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
index 7c7b2a3..2e68588 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
@@ -49,7 +49,7 @@
private FileSplit[] fileSplits;
@Override
- public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx) throws Exception {
+ public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
NCFileSystemAdapter fsAdapter = new NCFileSystemAdapter(fileSplits, parserFactory, sourceDatatype, ctx);
return fsAdapter;
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
index 9c137e1..81b73e7 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
@@ -39,7 +39,7 @@
private static ARecordType recordType;
@Override
- public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx) throws Exception {
+ public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
return new PullBasedTwitterAdapter(configuration, ctx);
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java
index cc366f6..c7f25b0 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java
@@ -45,7 +45,7 @@
private List<String> feedURLs = new ArrayList<String>();
@Override
- public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx) throws Exception {
+ public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
RSSFeedAdapter rssFeedAdapter = new RSSFeedAdapter(configuration, recordType, ctx);
return rssFeedAdapter;
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ExternalDataScanOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ExternalDataScanOperatorDescriptor.java
index c5937f6..48c9a2c 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ExternalDataScanOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ExternalDataScanOperatorDescriptor.java
@@ -51,7 +51,7 @@
writer.open();
IDatasourceAdapter adapter = null;
try {
- adapter = ((IAdapterFactory) adapterFactory).createAdapter(ctx);
+ adapter = ((IAdapterFactory) adapterFactory).createAdapter(ctx, partition);
adapter.start(partition, writer);
} catch (Exception e) {
throw new HyracksDataException("exception during reading from external data source", e);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
index 10884c2..1792e1d 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
@@ -70,7 +70,7 @@
IngestionRuntime ingestionRuntime = (IngestionRuntime) FeedManager.INSTANCE.getFeedRuntime(feedRuntimeId);
try {
if (ingestionRuntime == null) {
- adapter = (IFeedAdapter) adapterFactory.createAdapter(ctx);
+ adapter = (IFeedAdapter) adapterFactory.createAdapter(ctx, partition);
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Beginning new feed:" + feedId);
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IAdapterFactory.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IAdapterFactory.java
index da4f131..f0a3aa4 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IAdapterFactory.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IAdapterFactory.java
@@ -82,9 +82,10 @@
* Creates an instance of IDatasourceAdapter.
*
* @param HyracksTaskContext
+ * @param partition
* @return An instance of IDatasourceAdapter.
* @throws Exception
*/
- public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx) throws Exception;
+ public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception;
}
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/DataGenerator2.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/DataGenerator2.java
index ad7daaf..3fa7170 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/DataGenerator2.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/DataGenerator2.java
@@ -198,23 +198,27 @@
message = randMessageGen.getNextRandomMessage();
Point location = randLocationGen.getRandomPoint();
DateTime sendTime = randDateGen.getNextRandomDatetime();
- twMessage.reset(twMessageId + "", user, location, sendTime, message.getReferredTopics(), message);
+ twMessage.reset(twMessageId, user, location, sendTime, message.getReferredTopics(), message);
twMessageId++;
appender.appendToFile(twMessage.toString());
}
}
- public Iterator<TweetMessage> getTwitterMessageIterator() {
- return new TweetMessageIterator(duration);
+ public Iterator<TweetMessage> getTwitterMessageIterator(int partition, byte seed) {
+ return new TweetMessageIterator(duration, partition, seed);
}
public class TweetMessageIterator implements Iterator<TweetMessage> {
private final int duration;
private long startTime = 0;
-
- public TweetMessageIterator(int duration) {
+ private int partition;
+ private final GULongIDGenerator idGen;
+
+ public TweetMessageIterator(int duration,int partition, byte seed) {
this.duration = duration;
+ this.partition = partition;
+ this.idGen = new GULongIDGenerator(partition, seed);
}
@Override
@@ -231,7 +235,7 @@
Message message = randMessageGen.getNextRandomMessage();
Point location = randLocationGen.getRandomPoint();
DateTime sendTime = randDateGen.getNextRandomDatetime();
- twMessage.reset(UUID.randomUUID().toString(), twUser, location, sendTime, message.getReferredTopics(),
+ twMessage.reset(idGen.getNextULong(), twUser, location, sendTime, message.getReferredTopics(),
message);
twMessageId++;
if (twUserId > numTwOnlyUsers) {
@@ -1273,7 +1277,7 @@
public static class TweetMessage {
- private String tweetid;
+ private long tweetid;
private TwitterUser user;
private Point senderLocation;
private DateTime sendTime;
@@ -1284,7 +1288,7 @@
}
- public TweetMessage(String tweetid, TwitterUser user, Point senderLocation, DateTime sendTime,
+ public TweetMessage(long tweetid, TwitterUser user, Point senderLocation, DateTime sendTime,
List<String> referredTopics, Message messageText) {
this.tweetid = tweetid;
this.user = user;
@@ -1294,7 +1298,7 @@
this.messageText = messageText;
}
- public void reset(String tweetid, TwitterUser user, Point senderLocation, DateTime sendTime,
+ public void reset(long tweetid, TwitterUser user, Point senderLocation, DateTime sendTime,
List<String> referredTopics, Message messageText) {
this.tweetid = tweetid;
this.user = user;
@@ -1308,7 +1312,7 @@
StringBuilder builder = new StringBuilder();
builder.append("{");
builder.append("\"tweetid\":");
- builder.append("\"" + tweetid + "\"");
+ builder.append("int64(\"" + tweetid + "\")");
builder.append(",");
builder.append("\"user\":");
builder.append(user);
@@ -1340,11 +1344,11 @@
return new String(builder);
}
- public String getTweetid() {
+ public long getTweetid() {
return tweetid;
}
- public void setTweetid(String tweetid) {
+ public void setTweetid(long tweetid) {
this.tweetid = tweetid;
}
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GULongIDGenerator.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GULongIDGenerator.java
new file mode 100644
index 0000000..59c133c
--- /dev/null
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GULongIDGenerator.java
@@ -0,0 +1,32 @@
+package edu.uci.ics.asterix.tools.external.data;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class GULongIDGenerator {
+
+ private final int partition;
+ private final long baseValue;
+ private final AtomicLong nextValue;
+
+ public GULongIDGenerator(int partition, byte seed) {
+ this.partition = partition;
+ ByteBuffer buffer = ByteBuffer.allocate(8);
+ buffer.put(seed);
+ buffer.put((byte) partition);
+ buffer.putInt(0);
+ buffer.putShort((short) 0);
+ buffer.flip();
+ this.baseValue = new Long(buffer.getLong());
+ this.nextValue = new AtomicLong(baseValue);
+ }
+
+ public long getNextULong() {
+ return nextValue.incrementAndGet();
+ }
+
+ public int getPartition(){
+ return partition;
+ }
+
+}
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapterFactory.java
index 788bb4f..03c65c7 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapterFactory.java
@@ -67,7 +67,7 @@
}
@Override
- public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx) throws Exception {
+ public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
return new GenericSocketFeedAdapter(configuration, parserFactory, outputType, ctx);
}
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
index 40bc814..fb2ac1bd 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
@@ -63,8 +63,8 @@
private ARecordType atype;
@Override
- public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx) throws Exception {
- FileSystemBasedAdapter coreAdapter = (FileSystemBasedAdapter) adapterFactory.createAdapter(ctx);
+ public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
+ FileSystemBasedAdapter coreAdapter = (FileSystemBasedAdapter) adapterFactory.createAdapter(ctx, partition);
return new RateControlledFileSystemBasedAdapter(atype, configuration, coreAdapter, format, parserFactory, ctx);
}
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java
index 11cbe54..baa7a63 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java
@@ -106,7 +106,7 @@
}
@Override
- public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx) throws Exception {
+ public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
return new SyntheticTwitterFeedAdapter(configuration, outputType, ctx);
}
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator2.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator2.java
index 6d0b1f9..b4db589 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator2.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator2.java
@@ -18,12 +18,10 @@
public static final String KEY_TPS = "tps";
public static final String KEY_MIN_TPS = "tps-min";
public static final String KEY_MAX_TPS = "tps-max";
-
-
public static final String KEY_TPUT_DURATION = "tput-duration";
-
- public static final String OUTPUT_FORMAT = "output-format";
+ public static final String KEY_GUID_SEED = "guid-seed";
+ public static final String OUTPUT_FORMAT = "output-format";
public static final String OUTPUT_FORMAT_ARECORD = "arecord";
public static final String OUTPUT_FORMAT_ADM_STRING = "adm-string";
@@ -32,18 +30,10 @@
private int numTweetsBeforeDelay;
private TweetMessageIterator tweetIterator = null;
private long exeptionInterval;
-
-
-
private int partition;
private int tweetCount = 0;
private int frameTweetCount = 0;
private int numFlushedTweets = 0;
-
- public int getTweetCount() {
- return tweetCount;
- }
-
private int exceptionPeriod = -1;
private boolean isOutputFormatRecord = false;
private byte[] EOL = "\n".getBytes();
@@ -52,6 +42,10 @@
private ByteBuffer outputBuffer = ByteBuffer.allocate(32 * 1024);
private int flushedTweetCount = 0;
+ public int getTweetCount() {
+ return tweetCount;
+ }
+
public TweetGenerator2(Map<String, String> configuration, int partition, String format) throws Exception {
String value = configuration.get(KEY_DURATION);
duration = value != null ? Integer.parseInt(value) : 60;
@@ -59,12 +53,15 @@
if (value != null) {
exceptionPeriod = Integer.parseInt(value);
}
-
+
isOutputFormatRecord = format.equalsIgnoreCase(OUTPUT_FORMAT_ARECORD);
InitializationInfo info = new InitializationInfo();
info.timeDurationInSecs = duration;
dataGenerator = new DataGenerator2(info);
- tweetIterator = dataGenerator.new TweetMessageIterator(duration);
+
+ String seedValue = configuration.get(KEY_GUID_SEED);
+ int seedInt = seedValue != null ? Integer.parseInt(seedValue) : 0;
+ tweetIterator = dataGenerator.new TweetMessageIterator(duration, partition, (byte) seedInt);
}
private void initializeTweetRate(String tps) {
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
index 5739c70..be09685 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
@@ -43,9 +43,9 @@
private ExecutorService executorService = Executors.newCachedThreadPool();
public TwitterFirehoseFeedAdapter(Map<String, String> configuration, ITupleParserFactory parserFactory,
- ARecordType outputtype, IHyracksTaskContext ctx) throws Exception {
+ ARecordType outputtype, IHyracksTaskContext ctx, int partition) throws Exception {
super(parserFactory, outputtype, ctx);
- this.twitterServer = new TwitterServer(configuration, outputtype, executorService);
+ this.twitterServer = new TwitterServer(configuration, outputtype, executorService, partition);
this.twitterClient = new TwitterClient(twitterServer.getPort());
}
@@ -68,7 +68,7 @@
private int port = -1;
private ExecutorService executorService;
- public TwitterServer(Map<String, String> configuration, ARecordType outputtype, ExecutorService executorService)
+ public TwitterServer(Map<String, String> configuration, ARecordType outputtype, ExecutorService executorService, int partition)
throws Exception {
int numAttempts = 0;
while (port < 0) {
@@ -86,7 +86,7 @@
LOGGER.info("Twitter server configured to use port: " + port);
}
String dvds = configuration.get("dataverse-dataset");
- listener = new Listener(serverSocket, configuration, outputtype, dvds);
+ listener = new Listener(serverSocket, configuration, outputtype, dvds, partition);
this.executorService = executorService;
}
@@ -155,9 +155,9 @@
}
public Listener(ServerSocket serverSocket, Map<String, String> configuration, ARecordType outputtype,
- String datasetName) throws Exception {
+ String datasetName, int partition) throws Exception {
this.serverSocket = serverSocket;
- this.tweetGenerator = new TweetGenerator2(configuration, 0, TweetGenerator.OUTPUT_FORMAT_ADM_STRING);
+ this.tweetGenerator = new TweetGenerator2(configuration, partition, TweetGenerator.OUTPUT_FORMAT_ADM_STRING);
String value = configuration.get(KEY_MODE);
String confValue = null;
if (value != null) {
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
index 1d8a607..b18a5f2 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
@@ -107,8 +107,8 @@
}
@Override
- public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx) throws Exception {
- return new TwitterFirehoseFeedAdapter(configuration, parserFactory, outputType, ctx);
+ public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
+ return new TwitterFirehoseFeedAdapter(configuration, parserFactory, outputType, ctx, partition);
}
@Override
@@ -130,7 +130,7 @@
"message-text" };
AUnorderedListType unorderedListType = new AUnorderedListType(BuiltinType.ASTRING, "referred-topics");
- IAType[] fieldTypes = new IAType[] { BuiltinType.ASTRING, userRecordType, BuiltinType.APOINT,
+ IAType[] fieldTypes = new IAType[] { BuiltinType.AINT64, userRecordType, BuiltinType.APOINT,
BuiltinType.ADATETIME, unorderedListType, BuiltinType.ASTRING };
outputType = new ARecordType("TweetMessageType", fieldNames, fieldTypes, false);