enable feeds to ingest partitions of the data stored in azure
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedAzureTwitterAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedAzureTwitterAdapterFactory.java
index 283fc1a..6c84e2b 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedAzureTwitterAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedAzureTwitterAdapterFactory.java
@@ -19,6 +19,7 @@
private static final long serialVersionUID = 1L;
+ private static final String INGESTOR_CARDINALITY_KEY = "ingestor-cardinality";
private static final String OUTPUT_TYPE_KEY = "output-type";
private ARecordType recordType;
@@ -41,7 +42,9 @@
@Override
public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
- return new AlgebricksCountPartitionConstraint(2);
+ String cardinalityStr = configuration.get(INGESTOR_CARDINALITY_KEY);
+ int cardinality = cardinalityStr == null ? 1 : Integer.parseInt(cardinalityStr);
+ return new AlgebricksCountPartitionConstraint(cardinality);
}
@Override
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureFeedClient.java
index ee1f56e..efd321b 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureFeedClient.java
@@ -15,7 +15,10 @@
import com.microsoft.windowsazure.services.core.storage.CloudStorageAccount;
import com.microsoft.windowsazure.services.table.client.CloudTableClient;
+import com.microsoft.windowsazure.services.table.client.TableConstants;
import com.microsoft.windowsazure.services.table.client.TableQuery;
+import com.microsoft.windowsazure.services.table.client.TableQuery.Operators;
+import com.microsoft.windowsazure.services.table.client.TableQuery.QueryComparisons;
import com.microsoft.windowsazure.services.table.client.TableServiceEntity;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
@@ -40,11 +43,11 @@
private final ADMDataParser adp;
private final ByteArrayAccessibleInputStream baais;
- public PullBasedAzureFeedClient(CloudStorageAccount csa, ARecordType outputType, String tableName)
- throws AsterixException {
+ public PullBasedAzureFeedClient(CloudStorageAccount csa, ARecordType outputType, String tableName, String lowKey,
+ String highKey) throws AsterixException {
this.tableName = tableName;
this.outputType = outputType;
- this.tableQuery = TableQuery.from(tableName, classFromString(tableName));
+ this.tableQuery = configureTableQuery(tableName, lowKey, highKey);
this.ctc = csa.createCloudTableClient();
rbaos = new ResettableByteArrayOutputStream();
dos = new DataOutputStream(rbaos);
@@ -53,6 +56,20 @@
adp.initialize(baais, outputType, false);
}
+ private TableQuery<? extends TableServiceEntity> configureTableQuery(String tableName, String lowKey, String highKey) {
+ TableQuery<? extends TableServiceEntity> baseTQ = TableQuery.from(tableName, classFromString(tableName));
+ if (lowKey != null && highKey != null) {
+ String lowKeyPredicate = TableQuery.generateFilterCondition(TableConstants.PARTITION_KEY,
+ QueryComparisons.GREATER_THAN_OR_EQUAL, lowKey);
+ String highKeyPredicate = TableQuery.generateFilterCondition(TableConstants.PARTITION_KEY,
+ QueryComparisons.LESS_THAN_OR_EQUAL, highKey);
+ String partitionPredicate = TableQuery.combineFilters(lowKeyPredicate, Operators.AND, highKeyPredicate);
+ return baseTQ.where(partitionPredicate);
+ }
+
+ return baseTQ;
+ }
+
private Class<? extends TableServiceEntity> classFromString(String tableName) {
return tableName.equals("Postings") ? AzureTweetEntity.class : AzureTweetMetadataEntity.class;
}
@@ -81,7 +98,6 @@
if (moreTweets) {
try {
String json = getJSONString().replaceAll("}}", "}, \"z\":null }");
- System.out.println(json);
byte[] jsonBytes = json.getBytes(StandardCharsets.UTF_8);
rbaos.reset();
dos.write(jsonBytes, 0, jsonBytes.length);
@@ -101,6 +117,8 @@
if (tableName.equals("Postings")) {
AzureTweetEntity tweet = (AzureTweetEntity) entityIt.next();
JSONObject tjo = new JSONObject(tweet.getJSON().toString());
+ tjo.put("posting_id", tweet.getRowKey());
+ tjo.put("user_id", tweet.getPartitionKey());
tjo.remove("id");
JSONObject utjo = tjo.getJSONObject("user");
utjo.remove("id");
@@ -109,9 +127,11 @@
} else if (tableName.equals("PostingMetadata")) {
AzureTweetMetadataEntity tweetMD = (AzureTweetMetadataEntity) entityIt.next();
JSONObject tmdjo = new JSONObject();
+ tmdjo.put("posting_id", tweetMD.getRowKey());
+ tmdjo.put("user_id", tweetMD.getPartitionKey());
tmdjo.put("created_at", stripTillColon(tweetMD.getCreationTimestamp()).replaceAll("\"", ""));
tmdjo.put("posting_type", stripTillColon(tweetMD.getPostingType()));
- List<String> productIdList = Arrays.asList(extractArray(tweetMD.getProductId()));
+ List<Integer> productIdList = Arrays.asList(extractArray(tweetMD.getProductId()));
tmdjo.put("product_id", productIdList);
if (tweetMD.getEthnicity() != null) {
tmdjo.put("ethnicity", new JSONObject(stripTillColon(tweetMD.getEthnicity())));
@@ -144,19 +164,14 @@
return str.substring(str.indexOf(':') + 1);
}
- private String[] extractArray(String str) {
+ private Integer[] extractArray(String str) {
Matcher m = arrayPattern.matcher(str);
m.find();
- return m.group("vals").replaceAll("\\s", "").split(",");
- }
-
- public static void main(String[] args) throws Exception {
- Pattern int32Pattern = Pattern.compile(/*"(?<int>\\d+)(?!\\.\\d*,)"*/":(?<int>\\d+)(,|})");
- String locStr = "\"location\":{\"Latitude\":52.25,\"Longitude\":21}";
- Matcher m = int32Pattern.matcher(locStr);
- while (m.find()) {
- locStr = locStr.replace(m.group("int"), m.group("int") + ".0");
+ String[] stringNums = m.group("vals").replaceAll("\\s", "").split(",");
+ Integer[] nums = new Integer[stringNums.length];
+ for (int i = 0; i < nums.length; ++i) {
+ nums[i] = Integer.parseInt(stringNums[i]);
}
- System.out.println(locStr);
+ return nums;
}
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureTwitterAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureTwitterAdapter.java
index c96fe7f..5cdd55a 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureTwitterAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureTwitterAdapter.java
@@ -3,6 +3,7 @@
import java.net.URISyntaxException;
import java.security.InvalidKeyException;
import java.util.Map;
+import java.util.logging.Logger;
import com.microsoft.windowsazure.services.core.storage.CloudStorageAccount;
@@ -12,27 +13,41 @@
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
public class PullBasedAzureTwitterAdapter extends PullBasedAdapter implements IDatasourceAdapter {
+ private static final Logger LOGGER = Logger.getLogger(PullBasedAzureTwitterAdapter.class.getName());
private static final long serialVersionUID = 1L;
- private static final String ACCOUNT_NAME_KEY = "account_name";
- private static final String ACCOUNT_KEY_KEY = "account_key";
- private static final String TABLE_NAME_KEY = "table_name";
+ private static final String ACCOUNT_NAME_KEY = "account-name";
+ private static final String ACCOUNT_KEY_KEY = "account-key";
+ private static final String TABLE_NAME_KEY = "table-name";
+ private static final String PARTITIONS_KEY = "partitions";
private final CloudStorageAccount csa;
private final String connectionString;
private final String azureAccountName;
private final String azureAccountKey;
+ private final ARecordType outputType;
+ private final String tableName;
+ private final boolean partitioned;
- private final PullBasedAzureFeedClient feedClient;
+ private String[] lowKeys;
+ private String[] highKeys;
public PullBasedAzureTwitterAdapter(Map<String, String> configuration, IHyracksTaskContext ctx,
ARecordType outputType) throws AsterixException {
super(configuration, ctx);
- String tableName = configuration.get(TABLE_NAME_KEY);
+ this.outputType = outputType;
+ this.tableName = configuration.get(TABLE_NAME_KEY);
if (tableName == null) {
throw new IllegalArgumentException("You must specify a valid table name");
}
+ String partitionsString = configuration.get(PARTITIONS_KEY);
+ if (partitionsString != null) {
+ partitioned = true;
+ configurePartitions(partitionsString);
+ } else {
+ partitioned = false;
+ }
azureAccountName = configuration.get(ACCOUNT_NAME_KEY);
azureAccountKey = configuration.get(ACCOUNT_KEY_KEY);
if (azureAccountName == null || azureAccountKey == null) {
@@ -45,11 +60,24 @@
} catch (InvalidKeyException | URISyntaxException e) {
throw new IllegalArgumentException("You must specify a valid Azure account name and key", e);
}
- feedClient = new PullBasedAzureFeedClient(csa, outputType, tableName);
+ }
+
+ private void configurePartitions(String partitionsString) {
+ String[] partitions = partitionsString.split(",");
+ lowKeys = new String[partitions.length];
+ highKeys = new String[partitions.length];
+ for (int i = 0; i < partitions.length; ++i) {
+ String[] loHi = partitions[i].split(":");
+ lowKeys[i] = loHi[0];
+ highKeys[i] = loHi[1];
+ }
}
@Override
public IPullBasedFeedClient getFeedClient(int partition) throws Exception {
- return feedClient;
+ if (partitioned) {
+ return new PullBasedAzureFeedClient(csa, outputType, tableName, lowKeys[partition], highKeys[partition]);
+ }
+ return new PullBasedAzureFeedClient(csa, outputType, tableName, null, null);
}
}