add feed ingestion for posting metadata
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedAzureTwitterAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedAzureTwitterAdapterFactory.java
index 201058b..1184318 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedAzureTwitterAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedAzureTwitterAdapterFactory.java
@@ -41,7 +41,7 @@
 
     @Override
     public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
-        return new AlgebricksCountPartitionConstraint(1);
+        return new AlgebricksCountPartitionConstraint(2);
     }
 
     @Override
@@ -84,25 +84,5 @@
         } finally {
             MetadataManager.INSTANCE.releaseReadLatch();
         }
-        //        } else {
-        //            String[] coordFieldNames = { "coordinates", "type" };
-        //            IAType[] coordFieldTypes = { new AOrderedListType(BuiltinType.ADOUBLE, "coord_type"), BuiltinType.ASTRING };
-        //            ARecordType coordRecType = new ARecordType("coord_rec_type", coordFieldNames, coordFieldTypes, true);
-        //            AUnionType coordType = new AUnionType(Arrays.asList(new IAType[] { coordRecType, BuiltinType.ANULL }),
-        //                    "coord_type");
-        //
-        //            AUnionType langType = new AUnionType(
-        //                    Arrays.asList(new IAType[] { BuiltinType.ASTRING, BuiltinType.ANULL }), "lang_type");
-        //            String[] userFieldNames = { "id", "id_str", "created_at", "followers_count", "lang", "location" };
-        //            IAType[] userFieldTypes = { BuiltinType.AINT64, BuiltinType.ASTRING, BuiltinType.ASTRING,
-        //                    BuiltinType.AINT32, langType, BuiltinType.ASTRING };
-        //            ARecordType userRecType = new ARecordType("user_rec_type", userFieldNames, userFieldTypes, true);
-        //
-        //            String[] fieldNames = { "posting_id", "user_id", /*"coordinates",*/"created_at", "id", "id_str", /*"lang",*/
-        //            "retweet_count", "text"/*, "user"*/};
-        //            IAType[] fieldTypes = { BuiltinType.AINT64, BuiltinType.AINT32, /*coordType,*/BuiltinType.ASTRING,
-        //                    BuiltinType.AINT64, BuiltinType.ASTRING, /*langType,*/BuiltinType.AINT32, BuiltinType.ASTRING /*, userRecType*/};
-        //            recordType = new ARecordType("W4TwitterType", fieldNames, fieldTypes, false);
-        //        }
     }
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureFeedClient.java
index f5175a8..666c5e5 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureFeedClient.java
@@ -3,14 +3,20 @@
 import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
+import org.json.JSONException;
 import org.json.JSONObject;
 
 import com.microsoft.windowsazure.services.core.storage.CloudStorageAccount;
 import com.microsoft.windowsazure.services.table.client.CloudTableClient;
 import com.microsoft.windowsazure.services.table.client.TableQuery;
+import com.microsoft.windowsazure.services.table.client.TableServiceEntity;
 
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.external.library.java.JObjects.ByteArrayAccessibleInputStream;
@@ -19,18 +25,26 @@
 import edu.uci.ics.asterix.runtime.operators.file.ADMDataParser;
 
 public class PullBasedAzureFeedClient implements IPullBasedFeedClient {
+    private final String tableName;
     private final ARecordType outputType;
     private final CloudTableClient ctc;
-    private Iterator<AzureTweetEntity> tweets;
+    private final TableQuery<? extends TableServiceEntity> tableQuery;
+    private Iterator<? extends TableServiceEntity> entityIt;
+
+    private final Pattern arrayPattern = Pattern.compile("\\[(?<vals>.*)\\]");
+    private final Pattern int32Pattern = Pattern.compile(":(?<int>\\d+)(,|})");
 
     private final ResettableByteArrayOutputStream rbaos;
     private final DataOutputStream dos;
     private final ADMDataParser adp;
     private final ByteArrayAccessibleInputStream baais;
 
-    public PullBasedAzureFeedClient(CloudStorageAccount csa, ARecordType outputType) throws AsterixException {
+    public PullBasedAzureFeedClient(CloudStorageAccount csa, ARecordType outputType, String tableName)
+            throws AsterixException {
+        this.tableName = tableName;
         this.outputType = outputType;
-        ctc = csa.createCloudTableClient();
+        this.tableQuery = TableQuery.from(tableName, classFromString(tableName));
+        this.ctc = csa.createCloudTableClient();
         rbaos = new ResettableByteArrayOutputStream();
         dos = new DataOutputStream(rbaos);
         baais = new ByteArrayAccessibleInputStream(rbaos.getByteArray(), 0, 0);
@@ -38,6 +52,10 @@
         adp.initialize(baais, outputType, false);
     }
 
+    private Class<? extends TableServiceEntity> classFromString(String tableName) {
+        return tableName.equals("Postings") ? AzureTweetEntity.class : AzureTweetMetadataEntity.class;
+    }
+
     @Override
     public void resetOnFailure(Exception e) throws AsterixException {
         e.printStackTrace();
@@ -54,26 +72,20 @@
 
     @Override
     public InflowState nextTuple(DataOutput dataOutput) throws AsterixException {
-        if (tweets == null) {
-            TableQuery<AzureTweetEntity> tweetQuery = TableQuery.from("Postings", AzureTweetEntity.class);
-            tweets = ctc.execute(tweetQuery).iterator();
+        if (entityIt == null) {
+            entityIt = ctc.execute(tableQuery).iterator();
         }
 
-        boolean moreTweets = tweets.hasNext();
+        boolean moreTweets = entityIt.hasNext();
         if (moreTweets) {
-            AzureTweetEntity tweet = tweets.next();
             try {
-                JSONObject tjo = new JSONObject(tweet.getJSON().toString());
-                tjo.remove("id");
-                JSONObject utjo = tjo.getJSONObject("user");
-                utjo.remove("id");
-                tjo.put("user", utjo);
-                String tjs = tjo.toString().replaceAll("}}", "}, \"z\":null }");
-                byte[] tjb = tjs.getBytes(StandardCharsets.UTF_8);
+                String json = getJSONString().replaceAll("}}", "}, \"z\":null }");
+                System.out.println(json);
+                byte[] jsonBytes = json.getBytes(StandardCharsets.UTF_8);
                 rbaos.reset();
-                dos.write(tjb, 0, tjb.length);
+                dos.write(jsonBytes, 0, jsonBytes.length);
                 dos.flush();
-                baais.setContent(rbaos.getByteArray(), 0, tjb.length);
+                baais.setContent(rbaos.getByteArray(), 0, jsonBytes.length);
                 adp.initialize(baais, outputType, false);
                 adp.parse(dataOutput);
             } catch (Exception e) {
@@ -83,4 +95,63 @@
         }
         return moreTweets ? InflowState.DATA_AVAILABLE : InflowState.NO_MORE_DATA;
     }
+
+    private String getJSONString() throws JSONException {
+        if (tableName.equals("Postings")) {
+            AzureTweetEntity tweet = (AzureTweetEntity) entityIt.next();
+            JSONObject tjo = new JSONObject(tweet.getJSON().toString());
+            tjo.remove("id");
+            JSONObject utjo = tjo.getJSONObject("user");
+            utjo.remove("id");
+            tjo.put("user", utjo);
+            return tjo.toString();
+        } else if (tableName.equals("PostingMetadata")) {
+            AzureTweetMetadataEntity tweetMD = (AzureTweetMetadataEntity) entityIt.next();
+            JSONObject tmdjo = new JSONObject();
+            tmdjo.put("created_at", stripTillColon(tweetMD.getCreationTimestamp()).replaceAll("\"", ""));
+            tmdjo.put("posting_type", stripTillColon(tweetMD.getPostingType()));
+            List<String> productIdList = Arrays.asList(extractArray(tweetMD.getProductId()));
+            tmdjo.put("product_id", productIdList);
+            if (tweetMD.getEthnicity() != null) {
+                tmdjo.put("ethnicity", new JSONObject(stripTillColon(tweetMD.getEthnicity())));
+            }
+            if (tweetMD.getGender() != null) {
+                tmdjo.put("gender", new JSONObject(stripTillColon(tweetMD.getGender())));
+            }
+            if (tweetMD.getLocation() != null) {
+                String locStr = stripTillColon(tweetMD.getLocation());
+                Matcher m = int32Pattern.matcher(locStr);
+                while (m.find()) {
+                    locStr = locStr.replace(m.group("int"), m.group("int") + ".01");
+                }
+                tmdjo.put("location", new JSONObject(locStr));
+            }
+            if (tweetMD.getSentiment() != null) {
+                tmdjo.put("sentiment", stripTillColon(tweetMD.getSentiment()));
+            }
+            return tmdjo.toString();
+        } else {
+            throw new IllegalArgumentException();
+        }
+    }
+
+    private String stripTillColon(String str) {
+        return str.substring(str.indexOf(':') + 1);
+    }
+
+    private String[] extractArray(String str) {
+        Matcher m = arrayPattern.matcher(str);
+        m.find();
+        return m.group("vals").replaceAll("\\s", "").split(",");
+    }
+
+    public static void main(String[] args) throws Exception {
+        Pattern int32Pattern = Pattern.compile(/*"(?<int>\\d+)(?!\\.\\d*,)"*/":(?<int>\\d+)(,|})");
+        String locStr = "\"location\":{\"Latitude\":52.25,\"Longitude\":21}";
+        Matcher m = int32Pattern.matcher(locStr);
+        while (m.find()) {
+            locStr = locStr.replace(m.group("int"), m.group("int") + ".0");
+        }
+        System.out.println(locStr);
+    }
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureTwitterAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureTwitterAdapter.java
index 12628e4..a2c9b2c 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureTwitterAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureTwitterAdapter.java
@@ -17,6 +17,7 @@
 
     private static final String ACCOUNT_NAME_KEY = "account_name";
     private static final String ACCOUNT_KEY_KEY = "account_key";
+    private static final String TABLE_NAME_KEY = "table_name";
 
     private final CloudStorageAccount csa;
     private final String connectionString;
@@ -28,6 +29,7 @@
     public PullBasedAzureTwitterAdapter(Map<String, String> configuration, IHyracksTaskContext ctx,
             ARecordType outputType) throws AsterixException {
         super(configuration, ctx);
+        String tableName = configuration.get(TABLE_NAME_KEY);
         azureAccountName = configuration.get(ACCOUNT_NAME_KEY);
         azureAccountKey = configuration.get(ACCOUNT_KEY_KEY);
         if (azureAccountName == null || azureAccountKey == null) {
@@ -40,7 +42,7 @@
         } catch (InvalidKeyException | URISyntaxException e) {
             throw new IllegalArgumentException("You must specify a valid Azure account name and key", e);
         }
-        feedClient = new PullBasedAzureFeedClient(csa, outputType);
+        feedClient = new PullBasedAzureFeedClient(csa, outputType, tableName);
     }
 
     @Override
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java
index 16648b6..d956fd2 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java
@@ -106,6 +106,7 @@
                         writer.nextFrame(buffer);
                     }
                 } catch (Exception e) {
+                    e.printStackTrace();
                     if (LOGGER.isLoggable(Level.SEVERE)) {
                         LOGGER.severe("Unable to write frame " + " on behalf of " + nodePushable.getDisplayName()
                                 + ":\n" + e);