enable feeds to ingest partitions of the data stored in azure
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedAzureTwitterAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedAzureTwitterAdapterFactory.java
index 283fc1a..6c84e2b 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedAzureTwitterAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedAzureTwitterAdapterFactory.java
@@ -19,6 +19,7 @@
 
     private static final long serialVersionUID = 1L;
 
+    private static final String INGESTOR_CARDINALITY_KEY = "ingestor-cardinality";
     private static final String OUTPUT_TYPE_KEY = "output-type";
 
     private ARecordType recordType;
@@ -41,7 +42,9 @@
 
     @Override
     public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
-        return new AlgebricksCountPartitionConstraint(2);
+        String cardinalityStr = configuration.get(INGESTOR_CARDINALITY_KEY);
+        int cardinality = cardinalityStr == null ? 1 : Integer.parseInt(cardinalityStr);
+        return new AlgebricksCountPartitionConstraint(cardinality);
     }
 
     @Override
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureFeedClient.java
index ee1f56e..efd321b 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureFeedClient.java
@@ -15,7 +15,10 @@
 
 import com.microsoft.windowsazure.services.core.storage.CloudStorageAccount;
 import com.microsoft.windowsazure.services.table.client.CloudTableClient;
+import com.microsoft.windowsazure.services.table.client.TableConstants;
 import com.microsoft.windowsazure.services.table.client.TableQuery;
+import com.microsoft.windowsazure.services.table.client.TableQuery.Operators;
+import com.microsoft.windowsazure.services.table.client.TableQuery.QueryComparisons;
 import com.microsoft.windowsazure.services.table.client.TableServiceEntity;
 
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
@@ -40,11 +43,11 @@
     private final ADMDataParser adp;
     private final ByteArrayAccessibleInputStream baais;
 
-    public PullBasedAzureFeedClient(CloudStorageAccount csa, ARecordType outputType, String tableName)
-            throws AsterixException {
+    public PullBasedAzureFeedClient(CloudStorageAccount csa, ARecordType outputType, String tableName, String lowKey,
+            String highKey) throws AsterixException {
         this.tableName = tableName;
         this.outputType = outputType;
-        this.tableQuery = TableQuery.from(tableName, classFromString(tableName));
+        this.tableQuery = configureTableQuery(tableName, lowKey, highKey);
         this.ctc = csa.createCloudTableClient();
         rbaos = new ResettableByteArrayOutputStream();
         dos = new DataOutputStream(rbaos);
@@ -53,6 +56,20 @@
         adp.initialize(baais, outputType, false);
     }
 
+    private TableQuery<? extends TableServiceEntity> configureTableQuery(String tableName, String lowKey, String highKey) {
+        TableQuery<? extends TableServiceEntity> baseTQ = TableQuery.from(tableName, classFromString(tableName));
+        if (lowKey != null && highKey != null) {
+            String lowKeyPredicate = TableQuery.generateFilterCondition(TableConstants.PARTITION_KEY,
+                    QueryComparisons.GREATER_THAN_OR_EQUAL, lowKey);
+            String highKeyPredicate = TableQuery.generateFilterCondition(TableConstants.PARTITION_KEY,
+                    QueryComparisons.LESS_THAN_OR_EQUAL, highKey);
+            String partitionPredicate = TableQuery.combineFilters(lowKeyPredicate, Operators.AND, highKeyPredicate);
+            return baseTQ.where(partitionPredicate);
+        }
+
+        return baseTQ;
+    }
+
     private Class<? extends TableServiceEntity> classFromString(String tableName) {
         return tableName.equals("Postings") ? AzureTweetEntity.class : AzureTweetMetadataEntity.class;
     }
@@ -81,7 +98,6 @@
         if (moreTweets) {
             try {
                 String json = getJSONString().replaceAll("}}", "}, \"z\":null }");
-                System.out.println(json);
                 byte[] jsonBytes = json.getBytes(StandardCharsets.UTF_8);
                 rbaos.reset();
                 dos.write(jsonBytes, 0, jsonBytes.length);
@@ -101,6 +117,8 @@
         if (tableName.equals("Postings")) {
             AzureTweetEntity tweet = (AzureTweetEntity) entityIt.next();
             JSONObject tjo = new JSONObject(tweet.getJSON().toString());
+            tjo.put("posting_id", tweet.getRowKey());
+            tjo.put("user_id", tweet.getPartitionKey());
             tjo.remove("id");
             JSONObject utjo = tjo.getJSONObject("user");
             utjo.remove("id");
@@ -109,9 +127,11 @@
         } else if (tableName.equals("PostingMetadata")) {
             AzureTweetMetadataEntity tweetMD = (AzureTweetMetadataEntity) entityIt.next();
             JSONObject tmdjo = new JSONObject();
+            tmdjo.put("posting_id", tweetMD.getRowKey());
+            tmdjo.put("user_id", tweetMD.getPartitionKey());
             tmdjo.put("created_at", stripTillColon(tweetMD.getCreationTimestamp()).replaceAll("\"", ""));
             tmdjo.put("posting_type", stripTillColon(tweetMD.getPostingType()));
-            List<String> productIdList = Arrays.asList(extractArray(tweetMD.getProductId()));
+            List<Integer> productIdList = Arrays.asList(extractArray(tweetMD.getProductId()));
             tmdjo.put("product_id", productIdList);
             if (tweetMD.getEthnicity() != null) {
                 tmdjo.put("ethnicity", new JSONObject(stripTillColon(tweetMD.getEthnicity())));
@@ -144,19 +164,14 @@
         return str.substring(str.indexOf(':') + 1);
     }
 
-    private String[] extractArray(String str) {
+    private Integer[] extractArray(String str) {
         Matcher m = arrayPattern.matcher(str);
         m.find();
-        return m.group("vals").replaceAll("\\s", "").split(",");
-    }
-
-    public static void main(String[] args) throws Exception {
-        Pattern int32Pattern = Pattern.compile(/*"(?<int>\\d+)(?!\\.\\d*,)"*/":(?<int>\\d+)(,|})");
-        String locStr = "\"location\":{\"Latitude\":52.25,\"Longitude\":21}";
-        Matcher m = int32Pattern.matcher(locStr);
-        while (m.find()) {
-            locStr = locStr.replace(m.group("int"), m.group("int") + ".0");
+        String[] stringNums = m.group("vals").replaceAll("\\s", "").split(",");
+        Integer[] nums = new Integer[stringNums.length];
+        for (int i = 0; i < nums.length; ++i) {
+            nums[i] = Integer.parseInt(stringNums[i]);
         }
-        System.out.println(locStr);
+        return nums;
     }
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureTwitterAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureTwitterAdapter.java
index c96fe7f..5cdd55a 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureTwitterAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureTwitterAdapter.java
@@ -3,6 +3,7 @@
 import java.net.URISyntaxException;
 import java.security.InvalidKeyException;
 import java.util.Map;
+import java.util.logging.Logger;
 
 import com.microsoft.windowsazure.services.core.storage.CloudStorageAccount;
 
@@ -12,27 +13,41 @@
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 
 public class PullBasedAzureTwitterAdapter extends PullBasedAdapter implements IDatasourceAdapter {
+    private static final Logger LOGGER = Logger.getLogger(PullBasedAzureTwitterAdapter.class.getName());
 
     private static final long serialVersionUID = 1L;
 
-    private static final String ACCOUNT_NAME_KEY = "account_name";
-    private static final String ACCOUNT_KEY_KEY = "account_key";
-    private static final String TABLE_NAME_KEY = "table_name";
+    private static final String ACCOUNT_NAME_KEY = "account-name";
+    private static final String ACCOUNT_KEY_KEY = "account-key";
+    private static final String TABLE_NAME_KEY = "table-name";
+    private static final String PARTITIONS_KEY = "partitions";
 
     private final CloudStorageAccount csa;
     private final String connectionString;
     private final String azureAccountName;
     private final String azureAccountKey;
+    private final ARecordType outputType;
+    private final String tableName;
+    private final boolean partitioned;
 
-    private final PullBasedAzureFeedClient feedClient;
+    private String[] lowKeys;
+    private String[] highKeys;
 
     public PullBasedAzureTwitterAdapter(Map<String, String> configuration, IHyracksTaskContext ctx,
             ARecordType outputType) throws AsterixException {
         super(configuration, ctx);
-        String tableName = configuration.get(TABLE_NAME_KEY);
+        this.outputType = outputType;
+        this.tableName = configuration.get(TABLE_NAME_KEY);
         if (tableName == null) {
             throw new IllegalArgumentException("You must specify a valid table name");
         }
+        String partitionsString = configuration.get(PARTITIONS_KEY);
+        if (partitionsString != null) {
+            partitioned = true;
+            configurePartitions(partitionsString);
+        } else {
+            partitioned = false;
+        }
         azureAccountName = configuration.get(ACCOUNT_NAME_KEY);
         azureAccountKey = configuration.get(ACCOUNT_KEY_KEY);
         if (azureAccountName == null || azureAccountKey == null) {
@@ -45,11 +60,24 @@
         } catch (InvalidKeyException | URISyntaxException e) {
             throw new IllegalArgumentException("You must specify a valid Azure account name and key", e);
         }
-        feedClient = new PullBasedAzureFeedClient(csa, outputType, tableName);
+    }
+
+    private void configurePartitions(String partitionsString) {
+        String[] partitions = partitionsString.split(",");
+        lowKeys = new String[partitions.length];
+        highKeys = new String[partitions.length];
+        for (int i = 0; i < partitions.length; ++i) {
+            String[] loHi = partitions[i].split(":");
+            lowKeys[i] = loHi[0];
+            highKeys[i] = loHi[1];
+        }
     }
 
     @Override
     public IPullBasedFeedClient getFeedClient(int partition) throws Exception {
-        return feedClient;
+        if (partitioned) {
+            return new PullBasedAzureFeedClient(csa, outputType, tableName, lowKeys[partition], highKeys[partition]);
+        }
+        return new PullBasedAzureFeedClient(csa, outputType, tableName, null, null);
     }
 }