clean up azure adapters
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedAzureTwitterAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedAzureTwitterAdapterFactory.java
index 96ad2ec..b4dbe13 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedAzureTwitterAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedAzureTwitterAdapterFactory.java
@@ -2,6 +2,7 @@
import java.util.Map;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.external.dataset.adapter.PullBasedAzureTwitterAdapter;
import edu.uci.ics.asterix.metadata.MetadataManager;
import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
@@ -20,10 +21,19 @@
private static final long serialVersionUID = 1L;
private static final String INGESTOR_LOCATIONS_KEY = "ingestor-locations";
+ private static final String PARTITIONS_KEY = "partitions";
private static final String OUTPUT_TYPE_KEY = "output-type";
+ private static final String TABLE_NAME_KEY = "table-name";
+ private static final String ACCOUNT_NAME_KEY = "account-name";
+ private static final String ACCOUNT_KEY_KEY = "account-key";
private ARecordType recordType;
private Map<String, String> configuration;
+ private String tableName;
+ private String azureAccountName;
+ private String azureAccountKey;
+ private String[] locations;
+ private String[] partitions;
@Override
public SupportedOperation getSupportedOperations() {
@@ -52,7 +62,8 @@
@Override
public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
- return new PullBasedAzureTwitterAdapter(configuration, ctx, recordType);
+ return new PullBasedAzureTwitterAdapter(azureAccountName, azureAccountKey, tableName, partitions,
+ configuration, ctx, recordType);
}
@Override
@@ -63,6 +74,39 @@
@Override
public void configure(Map<String, String> configuration) throws Exception {
this.configuration = configuration;
+
+ tableName = configuration.get(TABLE_NAME_KEY);
+ if (tableName == null) {
+ throw new AsterixException("You must specify a valid table name");
+ }
+ azureAccountName = configuration.get(ACCOUNT_NAME_KEY);
+ azureAccountKey = configuration.get(ACCOUNT_KEY_KEY);
+ if (azureAccountName == null || azureAccountKey == null) {
+ throw new AsterixException("You must specify a valid Azure account name and key");
+ }
+
+ int nIngestLocations = 1;
+ String locationsStr = configuration.get(INGESTOR_LOCATIONS_KEY);
+ if (locationsStr != null) {
+ locations = locationsStr.split(",");
+ nIngestLocations = locations.length;
+ }
+
+ int nPartitions = 1;
+ String partitionsStr = configuration.get(PARTITIONS_KEY);
+ if (partitionsStr != null) {
+ partitions = partitionsStr.split(",");
+ nPartitions = partitions.length;
+ }
+
+ if (nIngestLocations != nPartitions) {
+ throw new AsterixException("Invalid adapter configuration: number of ingestion-locations ("
+ + nIngestLocations + ") must be the same as the number of partitions (" + nPartitions + ")");
+ }
+ configureType();
+ }
+
+ private void configureType() throws Exception {
String fqOutputType = configuration.get(OUTPUT_TYPE_KEY);
if (fqOutputType == null) {
@@ -71,9 +115,10 @@
String[] dataverseAndType = fqOutputType.split("[.]");
String dataverseName = dataverseAndType[0];
String datatypeName = dataverseAndType[1];
+
MetadataTransactionContext ctx = null;
+ MetadataManager.INSTANCE.acquireReadLatch();
try {
- MetadataManager.INSTANCE.acquireReadLatch();
ctx = MetadataManager.INSTANCE.beginTransaction();
Datatype t = MetadataManager.INSTANCE.getDatatype(ctx, dataverseName, datatypeName);
IAType type = t.getDatatype();
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureFeedClient.java
index 60143cd..dfaee03 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureFeedClient.java
@@ -87,7 +87,7 @@
if (moreTweets) {
String json = null;
try {
- json = getJSONString().replaceAll("}}", "}, \"z\":null }");
+ json = getJSONString();
byte[] jsonBytes = json.getBytes(StandardCharsets.UTF_8);
rbaos.reset();
dos.write(jsonBytes, 0, jsonBytes.length);
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureTwitterAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureTwitterAdapter.java
index a2f41b9..c739ca3 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureTwitterAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureTwitterAdapter.java
@@ -3,6 +3,7 @@
import java.net.URISyntaxException;
import java.security.InvalidKeyException;
import java.util.Map;
+import java.util.logging.Level;
import java.util.logging.Logger;
import com.microsoft.windowsazure.services.core.storage.CloudStorageAccount;
@@ -17,11 +18,6 @@
private static final long serialVersionUID = 1L;
- private static final String ACCOUNT_NAME_KEY = "account-name";
- private static final String ACCOUNT_KEY_KEY = "account-key";
- private static final String TABLE_NAME_KEY = "table-name";
- private static final String PARTITIONS_KEY = "partitions";
-
private final CloudStorageAccount csa;
private final String connectionString;
private final String azureAccountName;
@@ -33,43 +29,39 @@
private String[] lowKeys;
private String[] highKeys;
- public PullBasedAzureTwitterAdapter(Map<String, String> configuration, IHyracksTaskContext ctx,
- ARecordType outputType) throws AsterixException {
+ public PullBasedAzureTwitterAdapter(String accountName, String accountKey, String tableName, String[] partitions,
+ Map<String, String> configuration, IHyracksTaskContext ctx, ARecordType outputType) throws AsterixException {
super(configuration, ctx);
this.outputType = outputType;
- this.tableName = configuration.get(TABLE_NAME_KEY);
- if (tableName == null) {
- throw new IllegalArgumentException("You must specify a valid table name");
- }
- String partitionsString = configuration.get(PARTITIONS_KEY);
- if (partitionsString != null) {
+ if (partitions != null) {
partitioned = true;
- configurePartitions(partitionsString);
+ configurePartitions(partitions);
} else {
partitioned = false;
}
- azureAccountName = configuration.get(ACCOUNT_NAME_KEY);
- azureAccountKey = configuration.get(ACCOUNT_KEY_KEY);
- if (azureAccountName == null || azureAccountKey == null) {
- throw new IllegalArgumentException("You must specify a valid Azure account name and key");
- }
+ this.azureAccountName = accountName;
+ this.azureAccountKey = accountKey;
+ this.tableName = tableName;
+
connectionString = "DefaultEndpointsProtocol=http;" + "AccountName=" + azureAccountName + ";AccountKey="
+ azureAccountKey + ";";
try {
csa = CloudStorageAccount.parse(connectionString);
} catch (InvalidKeyException | URISyntaxException e) {
- throw new IllegalArgumentException("You must specify a valid Azure account name and key", e);
+ throw new AsterixException("You must specify a valid Azure account name and key", e);
}
}
- private void configurePartitions(String partitionsString) {
- String[] partitions = partitionsString.split(",");
+ private void configurePartitions(String[] partitions) {
lowKeys = new String[partitions.length];
highKeys = new String[partitions.length];
for (int i = 0; i < partitions.length; ++i) {
String[] loHi = partitions[i].split(":");
lowKeys[i] = loHi[0];
highKeys[i] = loHi[1];
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Partition " + i + " configured for keys " + lowKeys[i] + " to " + highKeys[i]);
+ }
}
}