syncing azure adapter
diff --git a/asterix-external-data/pom.xml b/asterix-external-data/pom.xml
index e88a37b..db77812 100644
--- a/asterix-external-data/pom.xml
+++ b/asterix-external-data/pom.xml
@@ -174,5 +174,10 @@
+		<dependency>
+			<groupId></groupId>
+			<artifactId>microsoft-windowsazure-api</artifactId>
+			<version>0.4.4</version>
+		</dependency>
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/
new file mode 100644
index 0000000..201058b
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/
@@ -0,0 +1,108 @@
+package edu.uci.ics.asterix.external.adapter.factory;
+import java.util.Map;
+import edu.uci.ics.asterix.external.dataset.adapter.PullBasedAzureTwitterAdapter;
+import edu.uci.ics.asterix.metadata.MetadataManager;
+import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
+import edu.uci.ics.asterix.metadata.entities.Datatype;
+import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+public class PullBasedAzureTwitterAdapterFactory implements ITypedAdapterFactory {
+    private static final long serialVersionUID = 1L;
+    private static final String OUTPUT_TYPE_KEY = "output-type";
+    private ARecordType recordType;
+    private Map<String, String> configuration;
+    @Override
+    public SupportedOperation getSupportedOperations() {
+        return SupportedOperation.READ;
+    }
+    @Override
+    public String getName() {
+        return "azure_twitter";
+    }
+    @Override
+    public AdapterType getAdapterType() {
+        return AdapterType.TYPED;
+    }
+    @Override
+    public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+        return new AlgebricksCountPartitionConstraint(1);
+    }
+    @Override
+    public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx) throws Exception {
+        return new PullBasedAzureTwitterAdapter(configuration, ctx, recordType);
+    }
+    @Override
+    public ARecordType getAdapterOutputType() {
+        return recordType;
+    }
+    @Override
+    public void configure(Map<String, String> configuration) throws Exception {
+        this.configuration = configuration;
+        String fqOutputType = configuration.get(OUTPUT_TYPE_KEY);
+        if (fqOutputType == null) {
+            throw new IllegalArgumentException("No output type specified");
+        }
+        String[] dataverseAndType = fqOutputType.split("[.]");
+        String dataverse = dataverseAndType[0];
+        String datatype = dataverseAndType[1];
+        MetadataTransactionContext ctx = null;
+        try {
+            MetadataManager.INSTANCE.acquireReadLatch();
+            ctx = MetadataManager.INSTANCE.beginTransaction();
+            Datatype t = MetadataManager.INSTANCE.getDatatype(ctx, dataverse, datatype);
+            IAType type = t.getDatatype();
+            if (type.getTypeTag() != ATypeTag.RECORD) {
+                throw new IllegalStateException();
+            }
+            recordType = (ARecordType) t.getDatatype();
+            MetadataManager.INSTANCE.commitTransaction(ctx);
+        } catch (Exception e) {
+            if (ctx != null) {
+                MetadataManager.INSTANCE.abortTransaction(ctx);
+            }
+            throw e;
+        } finally {
+            MetadataManager.INSTANCE.releaseReadLatch();
+        }
+        //        } else {
+        //            String[] coordFieldNames = { "coordinates", "type" };
+        //            IAType[] coordFieldTypes = { new AOrderedListType(BuiltinType.ADOUBLE, "coord_type"), BuiltinType.ASTRING };
+        //            ARecordType coordRecType = new ARecordType("coord_rec_type", coordFieldNames, coordFieldTypes, true);
+        //            AUnionType coordType = new AUnionType(Arrays.asList(new IAType[] { coordRecType, BuiltinType.ANULL }),
+        //                    "coord_type");
+        //
+        //            AUnionType langType = new AUnionType(
+        //                    Arrays.asList(new IAType[] { BuiltinType.ASTRING, BuiltinType.ANULL }), "lang_type");
+        //            String[] userFieldNames = { "id", "id_str", "created_at", "followers_count", "lang", "location" };
+        //            IAType[] userFieldTypes = { BuiltinType.AINT64, BuiltinType.ASTRING, BuiltinType.ASTRING,
+        //                    BuiltinType.AINT32, langType, BuiltinType.ASTRING };
+        //            ARecordType userRecType = new ARecordType("user_rec_type", userFieldNames, userFieldTypes, true);
+        //
+        //            String[] fieldNames = { "posting_id", "user_id", /*"coordinates",*/"created_at", "id", "id_str", /*"lang",*/
+        //            "retweet_count", "text"/*, "user"*/};
+        //            IAType[] fieldTypes = { BuiltinType.AINT64, BuiltinType.AINT32, /*coordType,*/BuiltinType.ASTRING,
+        //                    BuiltinType.AINT64, BuiltinType.ASTRING, /*langType,*/BuiltinType.AINT32, BuiltinType.ASTRING /*, userRecType*/};
+        //            recordType = new ARecordType("W4TwitterType", fieldNames, fieldTypes, false);
+        //        }
+    }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/
new file mode 100644
index 0000000..ed98abf
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/
@@ -0,0 +1,33 @@
+package edu.uci.ics.asterix.external.dataset.adapter;
+public class AzureTweetEntity extends TableServiceEntity {
+    private String postingType;
+    private String json;
+    public AzureTweetEntity() {
+    }
+    public AzureTweetEntity(String userID, String postingID) {
+        this.partitionKey = userID;
+        this.rowKey = postingID;
+    }
+    public String getPostingType() {
+        return postingType;
+    }
+    public void setPostingType(String postingType) {
+        this.postingType = postingType;
+    }
+    public void setJSON(String json) {
+        this.json = json;
+    }
+    public String getJSON() {
+        return json;
+    }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/
new file mode 100644
index 0000000..36c04a4
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/
@@ -0,0 +1,134 @@
+package edu.uci.ics.asterix.external.dataset.adapter;
+import java.nio.charset.StandardCharsets;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Iterator;
+import java.util.Map;
+import org.json.JSONObject;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.runtime.operators.file.ADMDataParser;
+public class PullBasedAzureFeedClient implements IPullBasedFeedClient {
+    private final ARecordType outputType;
+    private final CloudStorageAccount csa;
+    private final CloudTableClient ctc;
+    private Iterator<AzureTweetEntity> tweets;
+    private final IAObject[] mutableFields;
+    private final AMutableRecord coordRec;
+    private final ARecordType coordType;
+    private final AMutableRecord userRec;
+    private final ARecordType userType;
+    private final IValueParser dtParser;
+    private final String[] dateFormats = { "EEE MMM dd kk:mm:ss XXX yyyy", "EEE MMM dd kk:mm:ss ZZZZZ yyyy" };
+    private final DateFormat admDateFormat;
+    private final ResettableByteArrayOutputStream rbaos;
+    private final DataOutputStream dos;
+    private final ADMDataParser adp;
+    private final ByteArrayAccessibleInputStream baais;
+    public PullBasedAzureFeedClient(CloudStorageAccount csa, ARecordType outputType) throws AsterixException {
+        this.outputType = outputType;
+        this.csa = csa;
+        ctc = csa.createCloudTableClient();
+        try {
+            coordType = (ARecordType) outputType.getFieldType("coordinates");
+            coordRec = new AMutableRecord(coordType, null);
+            userType = (ARecordType) outputType.getFieldType("user");
+            userRec = new AMutableRecord(userType, null);
+            mutableFields = new IAObject[] { new AMutableInt64(-1), new AMutableInt64(-1), /*coordRec,*/
+            new AMutableDateTime(-1), new AMutableInt64(-1), new AMutableString(null),
+            /*new AMutableString(null),*/new AMutableInt32(-1), new AMutableString(null) /*, userRec */};
+        } catch (IOException e) {
+            throw new AsterixException(e);
+        }
+        admDateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssXXX");
+        dtParser = ADateTimeParserFactory.INSTANCE.createValueParser();
+        rbaos = new ResettableByteArrayOutputStream();
+        dos = new DataOutputStream(rbaos);
+        baais = new ByteArrayAccessibleInputStream(rbaos.getByteArray(), 0, 0);
+        adp = new ADMDataParser();
+        adp.initialize(baais, outputType, false);
+    }
+    @Override
+    public void resetOnFailure(Exception e) throws AsterixException {
+        e.printStackTrace();
+    }
+    @Override
+    public boolean alter(Map<String, String> configuration) {
+        return false;
+    }
+    @Override
+    public void stop() {
+    }
+    @Override
+    public InflowState nextTuple(DataOutput dataOutput) throws AsterixException {
+        if (tweets == null) {
+            TableQuery<AzureTweetEntity> tweetQuery = TableQuery.from("Postings", AzureTweetEntity.class);
+            tweets = ctc.execute(tweetQuery).iterator();
+        }
+        boolean moreTweets = tweets.hasNext();
+        if (moreTweets) {
+            AzureTweetEntity tweet =;
+            try {
+                JSONObject tjo = new JSONObject(tweet.getJSON().toString());
+                System.out.println(tjo);
+                tjo.remove("id");
+                JSONObject utjo = tjo.getJSONObject("user");
+                utjo.remove("id");
+                tjo.put("user", utjo);
+                String tjs = tjo.toString().replaceAll("}}", "} }");
+                System.out.println(tjo.getString("id_str") + " " + utjo.getString("id_str"));
+                byte[] tjb = tjs.getBytes(StandardCharsets.UTF_8);
+                rbaos.reset();
+                dos.write(tjb, 0, tjb.length);
+                dos.flush();
+                baais.setContent(rbaos.getByteArray(), 0, rbaos.getByteArray().length);
+                adp.initialize(baais, outputType, false);
+                //                ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                //                DataOutputStream dos = new DataOutputStream(baos);
+                adp.parse(dataOutput);
+//                dataOutput.write(baos.toByteArray());
+//                //                adp.parse(dataOutput);
+//                ISerializerDeserializer serde = AqlSerializerDeserializerProvider.INSTANCE
+//                        .getSerializerDeserializer(outputType);
+//                ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+//                DataInputStream dis = new DataInputStream(bais);
+//                IAObject o = (IAObject) serde.deserialize(dis);
+//                System.out.println(o);
+            } catch (Exception e) {
+                e.printStackTrace();
+                throw new AsterixException(e);
+            }
+        }
+        return moreTweets ? InflowState.DATA_AVAILABLE : InflowState.NO_MORE_DATA;
+    }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/
new file mode 100644
index 0000000..12628e4
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/
@@ -0,0 +1,50 @@
+package edu.uci.ics.asterix.external.dataset.adapter;
+import java.util.Map;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+public class PullBasedAzureTwitterAdapter extends PullBasedAdapter implements IDatasourceAdapter {
+    private static final long serialVersionUID = 1L;
+    private static final String ACCOUNT_NAME_KEY = "account_name";
+    private static final String ACCOUNT_KEY_KEY = "account_key";
+    private final CloudStorageAccount csa;
+    private final String connectionString;
+    private final String azureAccountName;
+    private final String azureAccountKey;
+    private final PullBasedAzureFeedClient feedClient;
+    public PullBasedAzureTwitterAdapter(Map<String, String> configuration, IHyracksTaskContext ctx,
+            ARecordType outputType) throws AsterixException {
+        super(configuration, ctx);
+        azureAccountName = configuration.get(ACCOUNT_NAME_KEY);
+        azureAccountKey = configuration.get(ACCOUNT_KEY_KEY);
+        if (azureAccountName == null || azureAccountKey == null) {
+            throw new IllegalArgumentException("You must specify a valid Azure account name and key");
+        }
+        connectionString = "DefaultEndpointsProtocol=http;" + "AccountName=" + azureAccountName + ";AccountKey="
+                + azureAccountKey + ";";
+        try {
+            csa = CloudStorageAccount.parse(connectionString);
+        } catch (InvalidKeyException | URISyntaxException e) {
+            throw new IllegalArgumentException("You must specify a valid Azure account name and key", e);
+        }
+        feedClient = new PullBasedAzureFeedClient(csa, outputType);
+    }
+    @Override
+    public IPullBasedFeedClient getFeedClient(int partition) throws Exception {
+        return feedClient;
+    }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/
index 2d60c41..7484737 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/
@@ -313,6 +313,7 @@
     private static void insertInitialAdapters(MetadataTransactionContext mdTxnCtx) throws Exception {
         String[] builtInAdapterClassNames = new String[] {
+                "edu.uci.ics.asterix.external.adapter.factory.PullBasedAzureTwitterAdapterFactory",
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/
index 89368e7..1beb01d 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/
@@ -438,8 +438,9 @@
             switch (adapterFactory.getAdapterType()) {
                 case TYPED:
-                    adapterOutputType = ((ITypedAdapterFactory) adapterFactory).getAdapterOutputType();
                     ((ITypedAdapterFactory) adapterFactory).configure(configuration);
+                    adapterOutputType = ((ITypedAdapterFactory) adapterFactory).getAdapterOutputType();
                 case GENERIC:
                     String outputTypeName = configuration.get("output-type-name");