add feed ingestion for posting metadata
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedAzureTwitterAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedAzureTwitterAdapterFactory.java
index 201058b..1184318 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedAzureTwitterAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedAzureTwitterAdapterFactory.java
@@ -41,7 +41,7 @@
@Override
public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
- return new AlgebricksCountPartitionConstraint(1);
+ return new AlgebricksCountPartitionConstraint(2);
}
@Override
@@ -84,25 +84,5 @@
} finally {
MetadataManager.INSTANCE.releaseReadLatch();
}
- // } else {
- // String[] coordFieldNames = { "coordinates", "type" };
- // IAType[] coordFieldTypes = { new AOrderedListType(BuiltinType.ADOUBLE, "coord_type"), BuiltinType.ASTRING };
- // ARecordType coordRecType = new ARecordType("coord_rec_type", coordFieldNames, coordFieldTypes, true);
- // AUnionType coordType = new AUnionType(Arrays.asList(new IAType[] { coordRecType, BuiltinType.ANULL }),
- // "coord_type");
- //
- // AUnionType langType = new AUnionType(
- // Arrays.asList(new IAType[] { BuiltinType.ASTRING, BuiltinType.ANULL }), "lang_type");
- // String[] userFieldNames = { "id", "id_str", "created_at", "followers_count", "lang", "location" };
- // IAType[] userFieldTypes = { BuiltinType.AINT64, BuiltinType.ASTRING, BuiltinType.ASTRING,
- // BuiltinType.AINT32, langType, BuiltinType.ASTRING };
- // ARecordType userRecType = new ARecordType("user_rec_type", userFieldNames, userFieldTypes, true);
- //
- // String[] fieldNames = { "posting_id", "user_id", /*"coordinates",*/"created_at", "id", "id_str", /*"lang",*/
- // "retweet_count", "text"/*, "user"*/};
- // IAType[] fieldTypes = { BuiltinType.AINT64, BuiltinType.AINT32, /*coordType,*/BuiltinType.ASTRING,
- // BuiltinType.AINT64, BuiltinType.ASTRING, /*langType,*/BuiltinType.AINT32, BuiltinType.ASTRING /*, userRecType*/};
- // recordType = new ARecordType("W4TwitterType", fieldNames, fieldTypes, false);
- // }
}
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureFeedClient.java
index f5175a8..666c5e5 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureFeedClient.java
@@ -3,14 +3,20 @@
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.json.JSONException;
import org.json.JSONObject;
import com.microsoft.windowsazure.services.core.storage.CloudStorageAccount;
import com.microsoft.windowsazure.services.table.client.CloudTableClient;
import com.microsoft.windowsazure.services.table.client.TableQuery;
+import com.microsoft.windowsazure.services.table.client.TableServiceEntity;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.external.library.java.JObjects.ByteArrayAccessibleInputStream;
@@ -19,18 +25,26 @@
import edu.uci.ics.asterix.runtime.operators.file.ADMDataParser;
public class PullBasedAzureFeedClient implements IPullBasedFeedClient {
+ private final String tableName;
private final ARecordType outputType;
private final CloudTableClient ctc;
- private Iterator<AzureTweetEntity> tweets;
+ private final TableQuery<? extends TableServiceEntity> tableQuery;
+ private Iterator<? extends TableServiceEntity> entityIt;
+
+ private final Pattern arrayPattern = Pattern.compile("\\[(?<vals>.*)\\]");
+ private final Pattern int32Pattern = Pattern.compile(":(?<int>\\d+)(,|})");
private final ResettableByteArrayOutputStream rbaos;
private final DataOutputStream dos;
private final ADMDataParser adp;
private final ByteArrayAccessibleInputStream baais;
- public PullBasedAzureFeedClient(CloudStorageAccount csa, ARecordType outputType) throws AsterixException {
+ public PullBasedAzureFeedClient(CloudStorageAccount csa, ARecordType outputType, String tableName)
+ throws AsterixException {
+ this.tableName = tableName;
this.outputType = outputType;
- ctc = csa.createCloudTableClient();
+ this.tableQuery = TableQuery.from(tableName, classFromString(tableName));
+ this.ctc = csa.createCloudTableClient();
rbaos = new ResettableByteArrayOutputStream();
dos = new DataOutputStream(rbaos);
baais = new ByteArrayAccessibleInputStream(rbaos.getByteArray(), 0, 0);
@@ -38,6 +52,10 @@
adp.initialize(baais, outputType, false);
}
+ private Class<? extends TableServiceEntity> classFromString(String tableName) {
+ return tableName.equals("Postings") ? AzureTweetEntity.class : AzureTweetMetadataEntity.class;
+ }
+
@Override
public void resetOnFailure(Exception e) throws AsterixException {
e.printStackTrace();
@@ -54,26 +72,20 @@
@Override
public InflowState nextTuple(DataOutput dataOutput) throws AsterixException {
- if (tweets == null) {
- TableQuery<AzureTweetEntity> tweetQuery = TableQuery.from("Postings", AzureTweetEntity.class);
- tweets = ctc.execute(tweetQuery).iterator();
+ if (entityIt == null) {
+ entityIt = ctc.execute(tableQuery).iterator();
}
- boolean moreTweets = tweets.hasNext();
+ boolean moreTweets = entityIt.hasNext();
if (moreTweets) {
- AzureTweetEntity tweet = tweets.next();
try {
- JSONObject tjo = new JSONObject(tweet.getJSON().toString());
- tjo.remove("id");
- JSONObject utjo = tjo.getJSONObject("user");
- utjo.remove("id");
- tjo.put("user", utjo);
- String tjs = tjo.toString().replaceAll("}}", "}, \"z\":null }");
- byte[] tjb = tjs.getBytes(StandardCharsets.UTF_8);
+ String json = getJSONString().replaceAll("}}", "}, \"z\":null }");
+ System.out.println(json);
+ byte[] jsonBytes = json.getBytes(StandardCharsets.UTF_8);
rbaos.reset();
- dos.write(tjb, 0, tjb.length);
+ dos.write(jsonBytes, 0, jsonBytes.length);
dos.flush();
- baais.setContent(rbaos.getByteArray(), 0, tjb.length);
+ baais.setContent(rbaos.getByteArray(), 0, jsonBytes.length);
adp.initialize(baais, outputType, false);
adp.parse(dataOutput);
} catch (Exception e) {
@@ -83,4 +95,63 @@
}
return moreTweets ? InflowState.DATA_AVAILABLE : InflowState.NO_MORE_DATA;
}
+
+ private String getJSONString() throws JSONException {
+ if (tableName.equals("Postings")) {
+ AzureTweetEntity tweet = (AzureTweetEntity) entityIt.next();
+ JSONObject tjo = new JSONObject(tweet.getJSON().toString());
+ tjo.remove("id");
+ JSONObject utjo = tjo.getJSONObject("user");
+ utjo.remove("id");
+ tjo.put("user", utjo);
+ return tjo.toString();
+ } else if (tableName.equals("PostingMetadata")) {
+ AzureTweetMetadataEntity tweetMD = (AzureTweetMetadataEntity) entityIt.next();
+ JSONObject tmdjo = new JSONObject();
+ tmdjo.put("created_at", stripTillColon(tweetMD.getCreationTimestamp()).replaceAll("\"", ""));
+ tmdjo.put("posting_type", stripTillColon(tweetMD.getPostingType()));
+ List<String> productIdList = Arrays.asList(extractArray(tweetMD.getProductId()));
+ tmdjo.put("product_id", productIdList);
+ if (tweetMD.getEthnicity() != null) {
+ tmdjo.put("ethnicity", new JSONObject(stripTillColon(tweetMD.getEthnicity())));
+ }
+ if (tweetMD.getGender() != null) {
+ tmdjo.put("gender", new JSONObject(stripTillColon(tweetMD.getGender())));
+ }
+ if (tweetMD.getLocation() != null) {
+ String locStr = stripTillColon(tweetMD.getLocation());
+ Matcher m = int32Pattern.matcher(locStr);
+ while (m.find()) {
+ locStr = locStr.replace(m.group("int"), m.group("int") + ".01");
+ }
+ tmdjo.put("location", new JSONObject(locStr));
+ }
+ if (tweetMD.getSentiment() != null) {
+ tmdjo.put("sentiment", stripTillColon(tweetMD.getSentiment()));
+ }
+ return tmdjo.toString();
+ } else {
+ throw new IllegalArgumentException();
+ }
+ }
+
+ private String stripTillColon(String str) {
+ return str.substring(str.indexOf(':') + 1);
+ }
+
+ private String[] extractArray(String str) {
+ Matcher m = arrayPattern.matcher(str);
+ m.find();
+ return m.group("vals").replaceAll("\\s", "").split(",");
+ }
+
+ public static void main(String[] args) throws Exception {
+ Pattern int32Pattern = Pattern.compile(/*"(?<int>\\d+)(?!\\.\\d*,)"*/":(?<int>\\d+)(,|})");
+ String locStr = "\"location\":{\"Latitude\":52.25,\"Longitude\":21}";
+ Matcher m = int32Pattern.matcher(locStr);
+ while (m.find()) {
+ locStr = locStr.replace(m.group("int"), m.group("int") + ".0");
+ }
+ System.out.println(locStr);
+ }
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureTwitterAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureTwitterAdapter.java
index 12628e4..a2c9b2c 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureTwitterAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureTwitterAdapter.java
@@ -17,6 +17,7 @@
private static final String ACCOUNT_NAME_KEY = "account_name";
private static final String ACCOUNT_KEY_KEY = "account_key";
+ private static final String TABLE_NAME_KEY = "table_name";
private final CloudStorageAccount csa;
private final String connectionString;
@@ -28,6 +29,7 @@
public PullBasedAzureTwitterAdapter(Map<String, String> configuration, IHyracksTaskContext ctx,
ARecordType outputType) throws AsterixException {
super(configuration, ctx);
+ String tableName = configuration.get(TABLE_NAME_KEY);
azureAccountName = configuration.get(ACCOUNT_NAME_KEY);
azureAccountKey = configuration.get(ACCOUNT_KEY_KEY);
if (azureAccountName == null || azureAccountKey == null) {
@@ -40,7 +42,7 @@
} catch (InvalidKeyException | URISyntaxException e) {
throw new IllegalArgumentException("You must specify a valid Azure account name and key", e);
}
- feedClient = new PullBasedAzureFeedClient(csa, outputType);
+ feedClient = new PullBasedAzureFeedClient(csa, outputType, tableName);
}
@Override
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java
index 16648b6..d956fd2 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java
@@ -106,6 +106,7 @@
writer.nextFrame(buffer);
}
} catch (Exception e) {
+ e.printStackTrace();
if (LOGGER.isLoggable(Level.SEVERE)) {
LOGGER.severe("Unable to write frame " + " on behalf of " + nodePushable.getDisplayName()
+ ":\n" + e);