syncing azure adapter
diff --git a/asterix-external-data/pom.xml b/asterix-external-data/pom.xml
index e88a37b..db77812 100644
--- a/asterix-external-data/pom.xml
+++ b/asterix-external-data/pom.xml
@@ -174,5 +174,10 @@
<artifactId>asterix-common</artifactId>
<version>0.8.1-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>com.microsoft.windowsazure</groupId>
+ <artifactId>microsoft-windowsazure-api</artifactId>
+ <version>0.4.4</version>
+ </dependency>
</dependencies>
</project>
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedAzureTwitterAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedAzureTwitterAdapterFactory.java
new file mode 100644
index 0000000..201058b
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedAzureTwitterAdapterFactory.java
@@ -0,0 +1,108 @@
+package edu.uci.ics.asterix.external.adapter.factory;
+
+import java.util.Map;
+
+import edu.uci.ics.asterix.external.dataset.adapter.PullBasedAzureTwitterAdapter;
+import edu.uci.ics.asterix.metadata.MetadataManager;
+import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
+import edu.uci.ics.asterix.metadata.entities.Datatype;
+import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+public class PullBasedAzureTwitterAdapterFactory implements ITypedAdapterFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final String OUTPUT_TYPE_KEY = "output-type";
+
+ private ARecordType recordType;
+ private Map<String, String> configuration;
+
+ @Override
+ public SupportedOperation getSupportedOperations() {
+ return SupportedOperation.READ;
+ }
+
+ @Override
+ public String getName() {
+ return "azure_twitter";
+ }
+
+ @Override
+ public AdapterType getAdapterType() {
+ return AdapterType.TYPED;
+ }
+
+ @Override
+ public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+ return new AlgebricksCountPartitionConstraint(1);
+ }
+
+ @Override
+ public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx) throws Exception {
+ return new PullBasedAzureTwitterAdapter(configuration, ctx, recordType);
+ }
+
+ @Override
+ public ARecordType getAdapterOutputType() {
+ return recordType;
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration) throws Exception {
+ this.configuration = configuration;
+ String fqOutputType = configuration.get(OUTPUT_TYPE_KEY);
+
+ if (fqOutputType == null) {
+ throw new IllegalArgumentException("No output type specified");
+ }
+ String[] dataverseAndType = fqOutputType.split("[.]");
+ String dataverse = dataverseAndType[0];
+ String datatype = dataverseAndType[1];
+ MetadataTransactionContext ctx = null;
+ try {
+ MetadataManager.INSTANCE.acquireReadLatch();
+ ctx = MetadataManager.INSTANCE.beginTransaction();
+ Datatype t = MetadataManager.INSTANCE.getDatatype(ctx, dataverse, datatype);
+ IAType type = t.getDatatype();
+ if (type.getTypeTag() != ATypeTag.RECORD) {
+ throw new IllegalStateException();
+ }
+ recordType = (ARecordType) t.getDatatype();
+ MetadataManager.INSTANCE.commitTransaction(ctx);
+ } catch (Exception e) {
+ if (ctx != null) {
+ MetadataManager.INSTANCE.abortTransaction(ctx);
+ }
+ throw e;
+ } finally {
+ MetadataManager.INSTANCE.releaseReadLatch();
+ }
+ // } else {
+ // String[] coordFieldNames = { "coordinates", "type" };
+ // IAType[] coordFieldTypes = { new AOrderedListType(BuiltinType.ADOUBLE, "coord_type"), BuiltinType.ASTRING };
+ // ARecordType coordRecType = new ARecordType("coord_rec_type", coordFieldNames, coordFieldTypes, true);
+ // AUnionType coordType = new AUnionType(Arrays.asList(new IAType[] { coordRecType, BuiltinType.ANULL }),
+ // "coord_type");
+ //
+ // AUnionType langType = new AUnionType(
+ // Arrays.asList(new IAType[] { BuiltinType.ASTRING, BuiltinType.ANULL }), "lang_type");
+ // String[] userFieldNames = { "id", "id_str", "created_at", "followers_count", "lang", "location" };
+ // IAType[] userFieldTypes = { BuiltinType.AINT64, BuiltinType.ASTRING, BuiltinType.ASTRING,
+ // BuiltinType.AINT32, langType, BuiltinType.ASTRING };
+ // ARecordType userRecType = new ARecordType("user_rec_type", userFieldNames, userFieldTypes, true);
+ //
+ // String[] fieldNames = { "posting_id", "user_id", /*"coordinates",*/"created_at", "id", "id_str", /*"lang",*/
+ // "retweet_count", "text"/*, "user"*/};
+ // IAType[] fieldTypes = { BuiltinType.AINT64, BuiltinType.AINT32, /*coordType,*/BuiltinType.ASTRING,
+ // BuiltinType.AINT64, BuiltinType.ASTRING, /*langType,*/BuiltinType.AINT32, BuiltinType.ASTRING /*, userRecType*/};
+ // recordType = new ARecordType("W4TwitterType", fieldNames, fieldTypes, false);
+ // }
+ }
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AzureTweetEntity.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AzureTweetEntity.java
new file mode 100644
index 0000000..ed98abf
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AzureTweetEntity.java
@@ -0,0 +1,33 @@
+package edu.uci.ics.asterix.external.dataset.adapter;
+
+import com.microsoft.windowsazure.services.table.client.TableServiceEntity;
+
+public class AzureTweetEntity extends TableServiceEntity {
+
+ private String postingType;
+ private String json;
+
+ public AzureTweetEntity() {
+ }
+
+ public AzureTweetEntity(String userID, String postingID) {
+ this.partitionKey = userID;
+ this.rowKey = postingID;
+ }
+
+ public String getPostingType() {
+ return postingType;
+ }
+
+ public void setPostingType(String postingType) {
+ this.postingType = postingType;
+ }
+
+ public void setJSON(String json) {
+ this.json = json;
+ }
+
+ public String getJSON() {
+ return json;
+ }
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureFeedClient.java
new file mode 100644
index 0000000..36c04a4
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureFeedClient.java
@@ -0,0 +1,134 @@
+package edu.uci.ics.asterix.external.dataset.adapter;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.json.JSONObject;
+
+import com.microsoft.windowsazure.services.core.storage.CloudStorageAccount;
+import com.microsoft.windowsazure.services.table.client.CloudTableClient;
+import com.microsoft.windowsazure.services.table.client.TableQuery;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.external.library.java.JObjects.ByteArrayAccessibleInputStream;
+import edu.uci.ics.asterix.om.base.AMutableDateTime;
+import edu.uci.ics.asterix.om.base.AMutableInt32;
+import edu.uci.ics.asterix.om.base.AMutableInt64;
+import edu.uci.ics.asterix.om.base.AMutableRecord;
+import edu.uci.ics.asterix.om.base.AMutableString;
+import edu.uci.ics.asterix.om.base.IAObject;
+import edu.uci.ics.asterix.om.base.temporal.ADateTimeParserFactory;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.util.ResettableByteArrayOutputStream;
+import edu.uci.ics.asterix.runtime.operators.file.ADMDataParser;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParser;
+
+public class PullBasedAzureFeedClient implements IPullBasedFeedClient {
+ private final ARecordType outputType;
+ private final CloudStorageAccount csa;
+ private final CloudTableClient ctc;
+ private Iterator<AzureTweetEntity> tweets;
+
+ private final IAObject[] mutableFields;
+ private final AMutableRecord coordRec;
+ private final ARecordType coordType;
+ private final AMutableRecord userRec;
+ private final ARecordType userType;
+
+ private final IValueParser dtParser;
+ private final String[] dateFormats = { "EEE MMM dd kk:mm:ss XXX yyyy", "EEE MMM dd kk:mm:ss ZZZZZ yyyy" };
+ private final DateFormat admDateFormat;
+ private final ResettableByteArrayOutputStream rbaos;
+ private final DataOutputStream dos;
+ private final ADMDataParser adp;
+ private final ByteArrayAccessibleInputStream baais;
+
+ public PullBasedAzureFeedClient(CloudStorageAccount csa, ARecordType outputType) throws AsterixException {
+ this.outputType = outputType;
+ this.csa = csa;
+ ctc = csa.createCloudTableClient();
+ try {
+ coordType = (ARecordType) outputType.getFieldType("coordinates");
+ coordRec = new AMutableRecord(coordType, null);
+ userType = (ARecordType) outputType.getFieldType("user");
+ userRec = new AMutableRecord(userType, null);
+ mutableFields = new IAObject[] { new AMutableInt64(-1), new AMutableInt64(-1), /*coordRec,*/
+ new AMutableDateTime(-1), new AMutableInt64(-1), new AMutableString(null),
+ /*new AMutableString(null),*/new AMutableInt32(-1), new AMutableString(null) /*, userRec */};
+ } catch (IOException e) {
+ throw new AsterixException(e);
+ }
+ admDateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssXXX");
+ dtParser = ADateTimeParserFactory.INSTANCE.createValueParser();
+ rbaos = new ResettableByteArrayOutputStream();
+ dos = new DataOutputStream(rbaos);
+ baais = new ByteArrayAccessibleInputStream(rbaos.getByteArray(), 0, 0);
+ adp = new ADMDataParser();
+ adp.initialize(baais, outputType, false);
+ }
+
+ @Override
+ public void resetOnFailure(Exception e) throws AsterixException {
+ e.printStackTrace();
+ }
+
+ @Override
+ public boolean alter(Map<String, String> configuration) {
+ return false;
+ }
+
+ @Override
+ public void stop() {
+ }
+
+ @Override
+ public InflowState nextTuple(DataOutput dataOutput) throws AsterixException {
+ if (tweets == null) {
+ TableQuery<AzureTweetEntity> tweetQuery = TableQuery.from("Postings", AzureTweetEntity.class);
+ tweets = ctc.execute(tweetQuery).iterator();
+ }
+
+ boolean moreTweets = tweets.hasNext();
+ if (moreTweets) {
+ AzureTweetEntity tweet = tweets.next();
+ try {
+ JSONObject tjo = new JSONObject(tweet.getJSON().toString());
+ System.out.println(tjo);
+ tjo.remove("id");
+ JSONObject utjo = tjo.getJSONObject("user");
+ utjo.remove("id");
+ tjo.put("user", utjo);
+ String tjs = tjo.toString().replaceAll("}}", "} }");
+ System.out.println(tjo.getString("id_str") + " " + utjo.getString("id_str"));
+ byte[] tjb = tjs.getBytes(StandardCharsets.UTF_8);
+ rbaos.reset();
+ dos.write(tjb, 0, tjb.length);
+ dos.flush();
+ baais.setContent(rbaos.getByteArray(), 0, rbaos.getByteArray().length);
+ adp.initialize(baais, outputType, false);
+ // ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ // DataOutputStream dos = new DataOutputStream(baos);
+ adp.parse(dataOutput);
+// dataOutput.write(baos.toByteArray());
+//
+// // adp.parse(dataOutput);
+// ISerializerDeserializer serde = AqlSerializerDeserializerProvider.INSTANCE
+// .getSerializerDeserializer(outputType);
+// ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+// DataInputStream dis = new DataInputStream(bais);
+// IAObject o = (IAObject) serde.deserialize(dis);
+// System.out.println(o);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new AsterixException(e);
+ }
+ }
+ return moreTweets ? InflowState.DATA_AVAILABLE : InflowState.NO_MORE_DATA;
+ }
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureTwitterAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureTwitterAdapter.java
new file mode 100644
index 0000000..12628e4
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureTwitterAdapter.java
@@ -0,0 +1,50 @@
+package edu.uci.ics.asterix.external.dataset.adapter;
+
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.util.Map;
+
+import com.microsoft.windowsazure.services.core.storage.CloudStorageAccount;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+public class PullBasedAzureTwitterAdapter extends PullBasedAdapter implements IDatasourceAdapter {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final String ACCOUNT_NAME_KEY = "account_name";
+ private static final String ACCOUNT_KEY_KEY = "account_key";
+
+ private final CloudStorageAccount csa;
+ private final String connectionString;
+ private final String azureAccountName;
+ private final String azureAccountKey;
+
+ private final PullBasedAzureFeedClient feedClient;
+
+ public PullBasedAzureTwitterAdapter(Map<String, String> configuration, IHyracksTaskContext ctx,
+ ARecordType outputType) throws AsterixException {
+ super(configuration, ctx);
+ azureAccountName = configuration.get(ACCOUNT_NAME_KEY);
+ azureAccountKey = configuration.get(ACCOUNT_KEY_KEY);
+ if (azureAccountName == null || azureAccountKey == null) {
+ throw new IllegalArgumentException("You must specify a valid Azure account name and key");
+ }
+ connectionString = "DefaultEndpointsProtocol=http;" + "AccountName=" + azureAccountName + ";AccountKey="
+ + azureAccountKey + ";";
+ try {
+ csa = CloudStorageAccount.parse(connectionString);
+ } catch (InvalidKeyException | URISyntaxException e) {
+ throw new IllegalArgumentException("You must specify a valid Azure account name and key", e);
+ }
+ feedClient = new PullBasedAzureFeedClient(csa, outputType);
+ }
+
+ @Override
+ public IPullBasedFeedClient getFeedClient(int partition) throws Exception {
+ return feedClient;
+ }
+}