modified PullBasedAdaptor to account for the case when data is not available at the external data source for sufficiently longer duration of time
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
index f0e51b2..09cf5e5 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
@@ -32,6 +32,8 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.apache.commons.lang3.StringUtils;
+
import edu.uci.ics.asterix.api.common.APIFramework.DisplayFormat;
import edu.uci.ics.asterix.api.common.SessionConfig;
import edu.uci.ics.asterix.aql.base.Statement;
@@ -41,6 +43,7 @@
import edu.uci.ics.asterix.aql.expression.Identifier;
import edu.uci.ics.asterix.aql.translator.AqlTranslator;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
import edu.uci.ics.asterix.common.feeds.SuperFeedManager;
import edu.uci.ics.asterix.event.schema.cluster.Cluster;
@@ -353,80 +356,52 @@
Map<String, String> feedActivityDetails = new HashMap<String, String>();
StringBuilder ingestLocs = new StringBuilder();
for (OperatorDescriptorId ingestOpId : ingestOperatorIds) {
- feedInfo.ingestLocations.addAll(info.getOperatorLocations().get(ingestOpId));
+ Map<Integer, String> operatorLocations = info.getOperatorLocations().get(ingestOpId);
+ int nOperatorInstances = operatorLocations.size();
+ for (int i = 0; i < nOperatorInstances; i++) {
+ feedInfo.ingestLocations.add(operatorLocations.get(i));
+ }
}
StringBuilder computeLocs = new StringBuilder();
for (OperatorDescriptorId computeOpId : computeOperatorIds) {
- List<String> locations = info.getOperatorLocations().get(computeOpId);
- if (locations != null) {
- feedInfo.computeLocations.addAll(locations);
+ Map<Integer, String> operatorLocations = info.getOperatorLocations().get(computeOpId);
+ if (operatorLocations != null) {
+ int nOperatorInstances = operatorLocations.size();
+ for (int i = 0; i < nOperatorInstances; i++) {
+ feedInfo.computeLocations.add(operatorLocations.get(i));
+ }
} else {
feedInfo.computeLocations.addAll(feedInfo.ingestLocations);
}
}
+
StringBuilder storageLocs = new StringBuilder();
for (OperatorDescriptorId storageOpId : storageOperatorIds) {
- feedInfo.storageLocations.addAll(info.getOperatorLocations().get(storageOpId));
+ Map<Integer, String> operatorLocations = info.getOperatorLocations().get(storageOpId);
+ int nOperatorInstances = operatorLocations.size();
+ for (int i = 0; i < nOperatorInstances; i++) {
+ feedInfo.storageLocations.add(operatorLocations.get(i));
+ }
}
- for (String ingestLoc : feedInfo.ingestLocations) {
- ingestLocs.append(ingestLoc);
- ingestLocs.append(",");
- }
- if (ingestLocs.length() > 1) {
- ingestLocs.deleteCharAt(ingestLocs.length() - 1);
- }
- for (String computeLoc : feedInfo.computeLocations) {
- computeLocs.append(computeLoc);
- computeLocs.append(",");
- }
- if (computeLocs.length() > 1) {
- computeLocs.deleteCharAt(computeLocs.length() - 1);
- }
- for (String storageLoc : feedInfo.storageLocations) {
- storageLocs.append(storageLoc);
- storageLocs.append(",");
- }
- if (storageLocs.length() > 1) {
- storageLocs.deleteCharAt(storageLocs.length() - 1);
- }
+ ingestLocs.append(StringUtils.join(feedInfo.ingestLocations, ","));
+ computeLocs.append(StringUtils.join(feedInfo.computeLocations, ","));
+ storageLocs.append(StringUtils.join(feedInfo.storageLocations, ","));
feedActivityDetails.put(FeedActivity.FeedActivityDetails.INGEST_LOCATIONS, ingestLocs.toString());
feedActivityDetails.put(FeedActivity.FeedActivityDetails.COMPUTE_LOCATIONS, computeLocs.toString());
feedActivityDetails.put(FeedActivity.FeedActivityDetails.STORAGE_LOCATIONS, storageLocs.toString());
- feedActivityDetails.put(FeedActivity.FeedActivityDetails.FEED_POLICY_NAME,
- feedInfo.feedPolicy.get(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY));
+ String policyName = feedInfo.feedPolicy.get(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY);
+ feedActivityDetails.put(FeedActivity.FeedActivityDetails.FEED_POLICY_NAME, policyName);
- int superFeedManagerIndex = new Random().nextInt(feedInfo.ingestLocations.size());
- String superFeedManagerHost = feedInfo.ingestLocations.get(superFeedManagerIndex);
-
- Cluster cluster = AsterixClusterProperties.INSTANCE.getCluster();
- String instanceName = cluster.getInstanceName();
- String node = superFeedManagerHost.substring(instanceName.length() + 1);
- String hostIp = null;
- for (Node n : cluster.getNode()) {
- if (n.getId().equals(node)) {
- hostIp = n.getClusterIp();
- break;
+ FeedPolicyAccessor policyAccessor = new FeedPolicyAccessor(feedInfo.feedPolicy);
+ if (policyAccessor.collectStatistics() || policyAccessor.isElastic()) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Feed " + feedInfo.feedConnectionId + " requires Super Feed Manager");
}
- }
- if (hostIp == null) {
- throw new IllegalStateException("Unknown node " + superFeedManagerHost);
+ configureSuperFeedManager(feedInfo, feedActivityDetails);
}
- feedActivityDetails.put(FeedActivity.FeedActivityDetails.SUPER_FEED_MANAGER_HOST, hostIp);
- feedActivityDetails.put(FeedActivity.FeedActivityDetails.SUPER_FEED_MANAGER_PORT, ""
- + superFeedManagerPort);
-
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Super Feed Manager for " + feedInfo.feedConnectionId + " is " + hostIp + " node "
- + superFeedManagerHost);
- }
-
- FeedManagerElectMessage feedMessage = new FeedManagerElectMessage(hostIp, superFeedManagerHost,
- superFeedManagerPort, feedInfo.feedConnectionId);
- superFeedManagerPort += SuperFeedManager.PORT_RANGE_ASSIGNED;
- messengerOutbox.add(new FeedMessengerMessage(feedMessage, feedInfo));
MetadataManager.INSTANCE.acquireWriteLatch();
MetadataTransactionContext mdTxnCtx = null;
try {
@@ -450,6 +425,41 @@
}
+ private void configureSuperFeedManager(FeedInfo feedInfo, Map<String, String> feedActivityDetails) {
+ // TODO Auto-generated method stub
+ int superFeedManagerIndex = new Random().nextInt(feedInfo.ingestLocations.size());
+ String superFeedManagerHost = feedInfo.ingestLocations.get(superFeedManagerIndex);
+
+ Cluster cluster = AsterixClusterProperties.INSTANCE.getCluster();
+ String instanceName = cluster.getInstanceName();
+ String node = superFeedManagerHost.substring(instanceName.length() + 1);
+ String hostIp = null;
+ for (Node n : cluster.getNode()) {
+ if (n.getId().equals(node)) {
+ hostIp = n.getClusterIp();
+ break;
+ }
+ }
+ if (hostIp == null) {
+ throw new IllegalStateException("Unknown node " + superFeedManagerHost);
+ }
+
+ feedActivityDetails.put(FeedActivity.FeedActivityDetails.SUPER_FEED_MANAGER_HOST, hostIp);
+ feedActivityDetails
+ .put(FeedActivity.FeedActivityDetails.SUPER_FEED_MANAGER_PORT, "" + superFeedManagerPort);
+
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Super Feed Manager for " + feedInfo.feedConnectionId + " is " + hostIp + " node "
+ + superFeedManagerHost);
+ }
+
+ FeedManagerElectMessage feedMessage = new FeedManagerElectMessage(hostIp, superFeedManagerHost,
+ superFeedManagerPort, feedInfo.feedConnectionId);
+ superFeedManagerPort += SuperFeedManager.PORT_RANGE_ASSIGNED;
+ messengerOutbox.add(new FeedMessengerMessage(feedMessage, feedInfo));
+
+ }
+
private void handleJobFinishMessage(FeedInfo feedInfo, Message message) {
MetadataManager.INSTANCE.acquireWriteLatch();
MetadataTransactionContext mdTxnCtx = null;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IPullBasedFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IPullBasedFeedClient.java
index ee28c3a..2e79679 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IPullBasedFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IPullBasedFeedClient.java
@@ -15,7 +15,6 @@
package edu.uci.ics.asterix.external.dataset.adapter;
import java.io.DataOutput;
-import java.util.Map;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
@@ -28,15 +27,17 @@
}
/**
- * Writes the next fetched tuple into the provided instance of DatatOutput.
+ * Writes the next fetched tuple into the provided instance of DatatOutput. Invocation of this method blocks until
+ * a new tuple has been written or the specified time has expired.
*
* @param dataOutput
* The receiving channel for the feed client to write ADM records to.
- * @return true if a record was written to the DataOutput instance
- * false if no record was written to the DataOutput instance indicating non-availability of new data.
+ * @param timeout
+ * Threshold time (expressed in seconds) for the next tuple to be obtained from the externa source.
+ * @return
* @throws AsterixException
*/
- public InflowState nextTuple(DataOutput dataOutput) throws AsterixException;
+ public InflowState nextTuple(DataOutput dataOutput, int timeout) throws AsterixException;
/**
* Provides logic for any corrective action that feed client needs to execute on
@@ -48,11 +49,4 @@
*/
public void resetOnFailure(Exception e) throws AsterixException;
- /**
- * @param configuration
- */
- public boolean alter(Map<String, String> configuration);
-
- public void stop();
-
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
index e2a4b76..3f3ca50 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
@@ -41,6 +41,7 @@
private static final long serialVersionUID = 1L;
private static final Logger LOGGER = Logger.getLogger(PullBasedAdapter.class.getName());
+ private static final int timeout = 5; // seconds
protected ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(1);
protected IPullBasedFeedClient pullBasedFeedClient;
@@ -52,6 +53,7 @@
private ByteBuffer frame;
private long tupleCount = 0;
private final IHyracksTaskContext ctx;
+ private int frameTupleCount = 0;
public abstract IPullBasedFeedClient getFeedClient(int partition) throws Exception;
@@ -72,24 +74,36 @@
pullBasedFeedClient = getFeedClient(partition);
InflowState inflowState = null;
+
while (continueIngestion) {
tupleBuilder.reset();
try {
- inflowState = pullBasedFeedClient.nextTuple(tupleBuilder.getDataOutput());
+ // blocking call
+ inflowState = pullBasedFeedClient.nextTuple(tupleBuilder.getDataOutput(), timeout);
switch (inflowState) {
case DATA_AVAILABLE:
tupleBuilder.addFieldEndOffset();
appendTupleToFrame(writer);
- tupleCount++;
+ frameTupleCount++;
break;
case NO_MORE_DATA:
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Reached end of feed");
}
FrameUtils.flushFrame(frame, writer);
+ tupleCount += frameTupleCount;
+ frameTupleCount = 0;
continueIngestion = false;
break;
case DATA_NOT_AVAILABLE:
+ if (frameTupleCount > 0) {
+ FrameUtils.flushFrame(frame, writer);
+ tupleCount += frameTupleCount;
+ frameTupleCount = 0;
+ }
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Timed out on obtaining data from pull based adaptor. Trying again!");
+ }
break;
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedFeedClient.java
index 8efe919..e728787 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedFeedClient.java
@@ -16,6 +16,8 @@
import java.io.DataOutput;
import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import edu.uci.ics.asterix.builders.IARecordBuilder;
import edu.uci.ics.asterix.builders.RecordBuilder;
@@ -45,6 +47,8 @@
public abstract class PullBasedFeedClient implements IPullBasedFeedClient {
+ protected static final Logger LOGGER = Logger.getLogger(PullBasedFeedClient.class.getName());
+
protected ARecordSerializerDeserializer recordSerDe;
protected AMutableRecord mutableRecord;
protected boolean messageReceived;
@@ -69,28 +73,36 @@
public abstract InflowState setNextRecord() throws Exception;
@Override
- public InflowState nextTuple(DataOutput dataOutput) throws AsterixException {
+ public InflowState nextTuple(DataOutput dataOutput, int timeout) throws AsterixException {
try {
- System.out.println("Setting next record");
- InflowState state = setNextRecord();
- boolean first = true;
- switch (state) {
- case DATA_AVAILABLE:
- IAType t = mutableRecord.getType();
- ATypeTag tag = t.getTypeTag();
- dataOutput.writeByte(tag.serialize());
- if (first) {
+ InflowState state = null;
+ int waitCount = 0;
+ boolean continueWait = true;
+ while ((state == null || state.equals(InflowState.DATA_NOT_AVAILABLE)) && continueWait) {
+ state = setNextRecord();
+ switch (state) {
+ case DATA_AVAILABLE:
+ IAType t = mutableRecord.getType();
+ ATypeTag tag = t.getTypeTag();
+ dataOutput.writeByte(tag.serialize());
recordBuilder.reset(mutableRecord.getType());
- first = false;
- }
- recordBuilder.init();
- writeRecord(mutableRecord, dataOutput, recordBuilder);
- break;
-
- case DATA_NOT_AVAILABLE:
- break;
- case NO_MORE_DATA:
- break;
+ recordBuilder.init();
+ writeRecord(mutableRecord, dataOutput, recordBuilder);
+ break;
+ case DATA_NOT_AVAILABLE:
+ if (waitCount > timeout) {
+ continueWait = false;
+ } else {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Waiting to obtaing data from pull based adaptor");
+ }
+ Thread.sleep(1000);
+ waitCount++;
+ }
+ break;
+ case NO_MORE_DATA:
+ break;
+ }
}
return state;
} catch (Exception e) {
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
index 019d1b7..5aafef4 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
@@ -33,7 +33,6 @@
public static final String INTERVAL = "interval";
private ARecordType recordType;
- private final IHyracksTaskContext ctx;
private PullBasedTwitterFeedClient tweetClient;
@Override
@@ -43,15 +42,9 @@
public PullBasedTwitterAdapter(Map<String, String> configuration, IHyracksTaskContext ctx) throws AsterixException {
super(configuration, ctx);
- this.ctx = ctx;
tweetClient = new PullBasedTwitterFeedClient(ctx, this);
}
- @Override
- public void stop() {
- tweetClient.stop();
- }
-
public ARecordType getAdapterOutputType() {
return recordType;
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
index 7a5aeea..0cd14b8 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
@@ -42,7 +42,6 @@
private String keywords;
private Query query;
- private String id_prefix;
private Twitter twitter;
private int requestInterval = 10; // seconds
private QueryResult result;
@@ -55,7 +54,6 @@
private static final Logger LOGGER = Logger.getLogger(PullBasedTwitterFeedClient.class.getName());
public PullBasedTwitterFeedClient(IHyracksTaskContext ctx, PullBasedTwitterAdapter adapter) {
- this.id_prefix = ctx.getJobletContext().getApplicationContext().getNodeId();
twitter = new TwitterFactory().getInstance();
mutableFields = new IAObject[] { new AMutableString(null), new AMutableString(null), new AMutableString(null),
new AMutableString(null), new AMutableString(null) };
@@ -99,17 +97,6 @@
// TOOO: implement resetting logic for Twitter
}
- @Override
- public boolean alter(Map<String, String> configuration) {
- // TODO Auto-generated method stub
- return false;
- }
-
- @Override
- public void stop() {
- // TODO Auto-generated method stub
- }
-
private void initialize(Map<String, String> params) {
this.keywords = (String) params.get(PullBasedTwitterAdapter.QUERY);
this.requestInterval = Integer.parseInt((String) params.get(PullBasedTwitterAdapter.INTERVAL));
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedClient.java
index 8a4b301..0522d42 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedClient.java
@@ -114,7 +114,6 @@
@SuppressWarnings("unchecked")
private void fetchFeed() {
try {
- System.err.println("Retrieving feed " + feedURL);
// Retrieve the feed.
// We will get a Feed Polled Event and then a
// Feed Retrieved event (assuming the feed is valid)
@@ -139,18 +138,6 @@
}
- @Override
- public boolean alter(Map<String, String> configuration) {
- // TODO Auto-generated method stub
- return false;
- }
-
- @Override
- public void stop() {
- // TODO Auto-generated method stub
-
- }
-
}
class FetcherEventListenerImpl implements FetcherListener {
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/StreamBasedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/StreamBasedAdapter.java
index f89a7ff..1871111 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/StreamBasedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/StreamBasedAdapter.java
@@ -2,6 +2,8 @@
import java.io.IOException;
import java.io.InputStream;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import edu.uci.ics.asterix.metadata.feeds.AdapterRuntimeManager;
import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
@@ -16,6 +18,8 @@
private static final long serialVersionUID = 1L;
+ protected static final Logger LOGGER = Logger.getLogger(StreamBasedAdapter.class.getName());
+
public static final String NODE_RESOLVER_FACTORY_PROPERTY = "node.Resolver";
public abstract InputStream getInputStream(int partition) throws IOException;
@@ -34,7 +38,12 @@
@Override
public void start(int partition, IFrameWriter writer) throws Exception {
InputStream in = getInputStream(partition);
- tupleParser.parse(in, writer);
+ if (in != null) {
+ tupleParser.parse(in, writer);
+ } else {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Could not obtain input stream for parsing from adaptor " + this + "[" + partition + "]");
+ }
+ }
}
-
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterRuntimeManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterRuntimeManager.java
index 1380d9e..b9a5e73 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterRuntimeManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterRuntimeManager.java
@@ -138,6 +138,7 @@
adapter.start(partition, writer);
runtimeManager.setState(State.FINISHED_INGESTION);
} catch (Exception e) {
+ e.printStackTrace();
if (LOGGER.isLoggable(Level.SEVERE)) {
LOGGER.severe("Exception during feed ingestion " + e.getMessage());
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/BuiltinFeedPolicies.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/BuiltinFeedPolicies.java
index da5fdde..fb55749 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/BuiltinFeedPolicies.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/BuiltinFeedPolicies.java
@@ -35,7 +35,7 @@
public static final FeedPolicy[] policies = new FeedPolicy[] { BRITTLE, BASIC, BASIC_MONITORED,
FAULT_TOLERANT_BASIC_MONITORED, ELASTIC };
- public static final FeedPolicy DEFAULT_POLICY = FAULT_TOLERANT_BASIC_MONITORED;
+ public static final FeedPolicy DEFAULT_POLICY = BASIC;
public static final String CONFIG_FEED_POLICY_KEY = "policy";
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
index d3693bf..3297c2d 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
@@ -244,7 +244,12 @@
((ITypedAdapterFactory) adapterFactory).configure(configuration);
break;
case GENERIC:
- String outputTypeName = configuration.get("output-type-name");
+ String outputTypeName = configuration.get(IGenericAdapterFactory.KEY_TYPE_NAME);
+ if (outputTypeName == null) {
+ throw new IllegalArgumentException(
+ "You must specify the datatype associated with the incoming data. Datatype is specified by the "
+ + IGenericAdapterFactory.KEY_TYPE_NAME + " configuration parameter");
+ }
adapterOutputType = (ARecordType) MetadataManager.INSTANCE.getDatatype(mdTxnCtx,
feed.getDataverseName(), outputTypeName).getDatatype();
((IGenericAdapterFactory) adapterFactory).configure(configuration, (ARecordType) adapterOutputType);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IGenericAdapterFactory.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IGenericAdapterFactory.java
index 54613d0..16c3c80 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IGenericAdapterFactory.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IGenericAdapterFactory.java
@@ -20,6 +20,8 @@
public interface IGenericAdapterFactory extends IAdapterFactory {
+ public static final String KEY_TYPE_NAME = "type-name";
+
public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception;
}
diff --git a/asterix-tools/pom.xml b/asterix-tools/pom.xml
index 861dd79..43123f0 100644
--- a/asterix-tools/pom.xml
+++ b/asterix-tools/pom.xml
@@ -154,6 +154,12 @@
<scope>compile</scope>
</dependency>
<dependency>
+ <groupId>edu.uci.ics.asterix</groupId>
+ <artifactId>asterix-metadata</artifactId>
+ <version>0.8.1-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.2.2</version>
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/DataGenerator.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/DataGenerator.java
index 7c18670..b6f693a 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/DataGenerator.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/DataGenerator.java
@@ -1,25 +1,5 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
package edu.uci.ics.asterix.tools.external.data;
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.io.IOException;
import java.nio.CharBuffer;
import java.util.ArrayList;
import java.util.HashSet;
@@ -27,233 +7,81 @@
import java.util.List;
import java.util.Random;
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.parsers.ParserConfigurationException;
-import javax.xml.transform.OutputKeys;
-import javax.xml.transform.Transformer;
-import javax.xml.transform.TransformerException;
-import javax.xml.transform.TransformerFactory;
-import javax.xml.transform.dom.DOMSource;
-import javax.xml.transform.stream.StreamResult;
-
-import org.w3c.dom.Document;
-import org.w3c.dom.Element;
-import org.w3c.dom.Node;
-import org.w3c.dom.NodeList;
+import edu.uci.ics.asterix.tools.external.data.TweetGenerator.DataMode;
public class DataGenerator {
private RandomDateGenerator randDateGen;
+
private RandomNameGenerator randNameGen;
- private RandomEmploymentGenerator randEmpGen;
+
private RandomMessageGenerator randMessageGen;
+
private RandomLocationGenerator randLocationGen;
- private DistributionHandler fbDistHandler;
- private DistributionHandler twDistHandler;
-
- private int totalFbMessages;
- private int numFbOnlyUsers;
- private int totalTwMessages;
- private int numTwOnlyUsers;
-
- private int numCommonUsers;
-
- private int fbUserId;
- private int twUserId;
-
- private int fbMessageId;
- private int twMessageId;
-
private Random random = new Random();
- private String commonUserFbSuffix = "_fb";
- private String commonUserTwSuffix = "_tw";
-
- private String outputDir;
-
- private PartitionConfiguration partition;
-
- private FacebookUser fbUser = new FacebookUser();
private TwitterUser twUser = new TwitterUser();
- private FacebookMessage fbMessage = new FacebookMessage();
private TweetMessage twMessage = new TweetMessage();
- private int duration;
-
- public DataGenerator(String[] args) throws Exception {
- String controllerInstallDir = args[0];
- String partitionConfXML = controllerInstallDir + "/output/partition-conf.xml";
- String partitionName = args[1];
- partition = XMLUtil.getPartitionConfiguration(partitionConfXML, partitionName);
-
- // 1
- randDateGen = new RandomDateGenerator(new Date(1, 1, 2005), new Date(8, 20, 2012));
-
- String firstNameFile = controllerInstallDir + "/metadata/firstNames.txt";
- String lastNameFile = controllerInstallDir + "/metadata/lastNames.txt";
- String vendorFile = controllerInstallDir + "/metadata/vendors.txt";
- String jargonFile = controllerInstallDir + "/metadata/jargon.txt";
- String orgList = controllerInstallDir + "/metadata/org_list.txt";
-
- randNameGen = new RandomNameGenerator(firstNameFile, lastNameFile);
- randEmpGen = new RandomEmploymentGenerator(90, new Date(1, 1, 2000), new Date(8, 20, 2012), orgList);
- randLocationGen = new RandomLocationGenerator(24, 49, 66, 98);
- randMessageGen = new RandomMessageGenerator(vendorFile, jargonFile);
-
- totalFbMessages = partition.getTargetPartition().getFbMessageIdMax()
- - partition.getTargetPartition().getFbMessageIdMin() + 1;
- numFbOnlyUsers = (partition.getTargetPartition().getFbUserKeyMax()
- - partition.getTargetPartition().getFbUserKeyMin() + 1)
- - partition.getTargetPartition().getCommonUsers();
-
- totalTwMessages = partition.getTargetPartition().getTwMessageIdMax()
- - partition.getTargetPartition().getTwMessageIdMin() + 1;
- numTwOnlyUsers = (partition.getTargetPartition().getTwUserKeyMax()
- - partition.getTargetPartition().getTwUserKeyMin() + 1)
- - partition.getTargetPartition().getCommonUsers();
-
- numCommonUsers = partition.getTargetPartition().getCommonUsers();
- fbDistHandler = new DistributionHandler(totalFbMessages, 0.5, numFbOnlyUsers + numCommonUsers);
- twDistHandler = new DistributionHandler(totalTwMessages, 0.5, numTwOnlyUsers + numCommonUsers);
-
- fbUserId = partition.getTargetPartition().getFbUserKeyMin();
- twUserId = partition.getTargetPartition().getTwUserKeyMin();
-
- fbMessageId = partition.getTargetPartition().getFbMessageIdMin();
- twMessageId = partition.getTargetPartition().getTwMessageIdMin();
-
- outputDir = partition.getSourcePartition().getPath();
- }
-
public DataGenerator(InitializationInfo info) {
initialize(info);
}
- private void generateFacebookOnlyUsers(int numFacebookUsers) throws IOException {
- FileAppender appender = FileUtil.getFileAppender(outputDir + "/" + "fb_users.adm", true, true);
- FileAppender messageAppender = FileUtil.getFileAppender(outputDir + "/" + "fb_message.adm", true, true);
-
- for (int i = 0; i < numFacebookUsers; i++) {
- getFacebookUser(null);
- appender.appendToFile(fbUser.toString());
- generateFacebookMessages(fbUser, messageAppender, -1);
- }
- appender.close();
- messageAppender.close();
- }
-
- private void generateTwitterOnlyUsers(int numTwitterUsers) throws IOException {
- FileAppender appender = FileUtil.getFileAppender(outputDir + "/" + "tw_users.adm", true, true);
- FileAppender messageAppender = FileUtil.getFileAppender(outputDir + "/" + "tw_message.adm", true, true);
-
- for (int i = 0; i < numTwitterUsers; i++) {
- getTwitterUser(null);
- appender.appendToFile(twUser.toString());
- generateTwitterMessages(twUser, messageAppender, -1);
- }
- appender.close();
- messageAppender.close();
- }
-
- private void generateCommonUsers() throws IOException {
- FileAppender fbAppender = FileUtil.getFileAppender(outputDir + "/" + "fb_users.adm", true, false);
- FileAppender twAppender = FileUtil.getFileAppender(outputDir + "/" + "tw_users.adm", true, false);
- FileAppender fbMessageAppender = FileUtil.getFileAppender(outputDir + "/" + "fb_message.adm", true, false);
- FileAppender twMessageAppender = FileUtil.getFileAppender(outputDir + "/" + "tw_message.adm", true, false);
-
- for (int i = 0; i < numCommonUsers; i++) {
- getFacebookUser(commonUserFbSuffix);
- fbAppender.appendToFile(fbUser.toString());
- generateFacebookMessages(fbUser, fbMessageAppender, -1);
-
- getCorrespondingTwitterUser(fbUser);
- twAppender.appendToFile(twUser.toString());
- generateTwitterMessages(twUser, twMessageAppender, -1);
- }
-
- fbAppender.close();
- twAppender.close();
- fbMessageAppender.close();
- twMessageAppender.close();
- }
-
- private void generateFacebookMessages(FacebookUser user, FileAppender appender, int numMsg) throws IOException {
- Message message;
- int numMessages = 0;
- if (numMsg == -1) {
- numMessages = fbDistHandler
- .getFromDistribution(fbUserId - partition.getTargetPartition().getFbUserKeyMin());
- }
- for (int i = 0; i < numMessages; i++) {
- message = randMessageGen.getNextRandomMessage();
- Point location = randLocationGen.getRandomPoint();
- fbMessage.reset(fbMessageId++, user.getId(), random.nextInt(totalFbMessages + 1), location, message);
- appender.appendToFile(fbMessage.toString());
- }
- }
-
- private void generateTwitterMessages(TwitterUser user, FileAppender appender, int numMsg) throws IOException {
- Message message;
- int numMessages = 0;
- if (numMsg == -1) {
- numMessages = twDistHandler
- .getFromDistribution(twUserId - partition.getTargetPartition().getTwUserKeyMin());
- }
-
- for (int i = 0; i < numMessages; i++) {
- message = randMessageGen.getNextRandomMessage();
- Point location = randLocationGen.getRandomPoint();
- DateTime sendTime = randDateGen.getNextRandomDatetime();
- twMessage.reset(twMessageId, user, location, sendTime, message.getReferredTopics(), message);
- twMessageId++;
- appender.appendToFile(twMessage.toString());
- }
- }
-
- public Iterator<TweetMessage> getTwitterMessageIterator(int partition, byte seed) {
- return new TweetMessageIterator(duration, partition, seed);
- }
-
public class TweetMessageIterator implements Iterator<TweetMessage> {
private final int duration;
+ private final GULongIDGenerator[] idGens;
private long startTime = 0;
- private final GULongIDGenerator idGen;
+ private int uidGenInUse = 0;
+ private TweetMessage dummyMessage;
+ private DataMode dataMode;
- public TweetMessageIterator(int duration, int partition, byte seed) {
+ public TweetMessageIterator(int duration, GULongIDGenerator[] idGens, DataMode dataMode) {
this.duration = duration;
- this.idGen = new GULongIDGenerator(partition, seed);
+ this.idGens = idGens;
+ this.startTime = System.currentTimeMillis();
+ if (dataMode.equals(DataMode.REUSE_DATA)) {
+ dummyMessage = next();
+ }
+ this.dataMode = dataMode;
}
@Override
public boolean hasNext() {
- if (startTime == 0) {
- startTime = System.currentTimeMillis();
- }
- return System.currentTimeMillis() - startTime < duration * 1000;
+ return System.currentTimeMillis() - startTime <= duration * 1000;
}
@Override
public TweetMessage next() {
- getTwitterUser(null);
- Message message = randMessageGen.getNextRandomMessage();
- Point location = randLocationGen.getRandomPoint();
- DateTime sendTime = randDateGen.getNextRandomDatetime();
- twMessage.reset(idGen.getNextULong(), twUser, location, sendTime, message.getReferredTopics(), message);
- twMessageId++;
- if (twUserId > numTwOnlyUsers) {
- twUserId = 1;
+ TweetMessage msg = null;
+ switch (dataMode) {
+ case NEW_DATA:
+ getTwitterUser(null);
+ Message message = randMessageGen.getNextRandomMessage();
+ Point location = randLocationGen.getRandomPoint();
+ DateTime sendTime = randDateGen.getNextRandomDatetime();
+ twMessage.reset(idGens[uidGenInUse].getNextULong(), twUser, location, sendTime,
+ message.getReferredTopics(), message);
+ msg = twMessage;
+ break;
+ case REUSE_DATA:
+ dummyMessage.setTweetid(idGens[uidGenInUse].getNextULong());
+ msg = dummyMessage;
+ break;
}
- return twMessage;
-
+ return msg;
}
@Override
public void remove() {
// TODO Auto-generated method stub
+
+ }
+
+ public void toggleUidKeySpace() {
+ uidGenInUse = (uidGenInUse + 1) % idGens.length;
}
}
@@ -266,74 +94,13 @@
public String[] vendors = DataGenerator.vendors;
public String[] jargon = DataGenerator.jargon;
public String[] org_list = DataGenerator.org_list;
- public int percentEmployed = 90;
- public Date employmentStartDate = new Date(1, 1, 2000);
- public Date employmentEndDate = new Date(31, 12, 2012);
- public int totalFbMessages;
- public int numFbOnlyUsers;
- public int totalTwMessages;
- public int numTwOnlyUsers = 5000;
- public int numCommonUsers;
- public int fbUserIdMin;
- public int fbMessageIdMin;
- public int twUserIdMin;
- public int twMessageIdMin;
- public int timeDurationInSecs = 60;
-
}
public void initialize(InitializationInfo info) {
randDateGen = new RandomDateGenerator(info.startDate, info.endDate);
randNameGen = new RandomNameGenerator(info.firstNames, info.lastNames);
- randEmpGen = new RandomEmploymentGenerator(info.percentEmployed, info.employmentStartDate,
- info.employmentEndDate, info.org_list);
randLocationGen = new RandomLocationGenerator(24, 49, 66, 98);
randMessageGen = new RandomMessageGenerator(info.vendors, info.jargon);
- fbDistHandler = new DistributionHandler(info.totalFbMessages, 0.5, info.numFbOnlyUsers + info.numCommonUsers);
- twDistHandler = new DistributionHandler(info.totalTwMessages, 0.5, info.numTwOnlyUsers + info.numCommonUsers);
- fbUserId = info.fbUserIdMin;
- twUserId = info.twUserIdMin;
-
- fbMessageId = info.fbMessageIdMin;
- twMessageId = info.twMessageIdMin;
- duration = info.timeDurationInSecs;
- }
-
- public static void main(String args[]) throws Exception {
- if (args.length < 2) {
- printUsage();
- System.exit(1);
- }
-
- DataGenerator dataGenerator = new DataGenerator(args);
- dataGenerator.generateData();
- }
-
- public static void printUsage() {
- System.out.println(" Error: Invalid number of arguments ");
- System.out.println(" Usage :" + " DataGenerator <path to configuration file> <partition name> ");
- }
-
- public void generateData() throws IOException {
- generateFacebookOnlyUsers(numFbOnlyUsers);
- generateTwitterOnlyUsers(numTwOnlyUsers);
- generateCommonUsers();
- System.out.println("Partition :" + partition.getTargetPartition().getName() + " finished");
- }
-
- public void getFacebookUser(String usernameSuffix) {
- String suggestedName = randNameGen.getRandomName();
- String[] nameComponents = suggestedName.split(" ");
- String name = nameComponents[0] + nameComponents[1];
- if (usernameSuffix != null) {
- name = name + usernameSuffix;
- }
- String alias = nameComponents[0];
- String userSince = randDateGen.getNextRandomDatetime().toString();
- int numFriends = random.nextInt(25);
- int[] friendIds = RandomUtil.getKFromN(numFriends, (numFbOnlyUsers + numCommonUsers));
- Employment emp = randEmpGen.getRandomEmployment();
- fbUser.reset(fbUserId++, alias, name, userSince, friendIds, emp);
}
public void getTwitterUser(String usernameSuffix) {
@@ -348,17 +115,6 @@
int statusesCount = random.nextInt(500); // draw from Zipfian
int followersCount = random.nextInt((int) (200));
twUser.reset(screenName, numFriends, statusesCount, name, followersCount);
- twUserId++;
- }
-
- public void getCorrespondingTwitterUser(FacebookUser fbUser) {
- String screenName = fbUser.getName().substring(0, fbUser.getName().lastIndexOf(commonUserFbSuffix))
- + commonUserTwSuffix;
- String name = screenName.split(" ")[0];
- int numFriends = random.nextInt((int) ((numTwOnlyUsers + numCommonUsers)));
- int statusesCount = random.nextInt(500); // draw from Zipfian
- int followersCount = random.nextInt((int) (numTwOnlyUsers + numCommonUsers));
- twUser.reset(screenName, numFriends, statusesCount, name, followersCount);
}
public static class RandomDateGenerator {
@@ -427,15 +183,6 @@
return recentDate;
}
- public static void main(String args[]) throws Exception {
- RandomDateGenerator dgen = new RandomDateGenerator(new Date(1, 1, 2005), new Date(8, 20, 2012));
- while (true) {
- Date nextDate = dgen.getNextRandomDate();
- if (nextDate.getDay() == 0) {
- throw new Exception("invalid date " + nextDate);
- }
- }
- }
}
public static class DateTime extends Date {
@@ -443,15 +190,15 @@
private String hour = "10";
private String min = "10";
private String sec = "00";
- private long chrononTime;
public DateTime(int month, int day, int year, String hour, String min, String sec) {
super(month, day, year);
this.hour = hour;
this.min = min;
this.sec = sec;
- chrononTime = new java.util.Date(year, month, day, Integer.parseInt(hour), Integer.parseInt(min),
- Integer.parseInt(sec)).getTime();
+ }
+
+ public DateTime() {
}
public void reset(int month, int day, int year, String hour, String min, String sec) {
@@ -461,11 +208,6 @@
this.hour = hour;
this.min = min;
this.sec = sec;
- chrononTime = new java.util.Date(year, month, day, Integer.parseInt(hour), Integer.parseInt(min),
- Integer.parseInt(sec)).getTime();
- }
-
- public DateTime() {
}
public DateTime(Date date) {
@@ -483,22 +225,6 @@
this.sec = (sec < 10) ? "0" : "" + sec;
}
- public long getChrononTime() {
- return chrononTime;
- }
-
- public String getHour() {
- return hour;
- }
-
- public String getMin() {
- return min;
- }
-
- public String getSec() {
- return sec;
- }
-
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("datetime");
@@ -532,10 +258,6 @@
length = 0;
}
- public char[] getMessage() {
- return message;
- }
-
public List<String> getReferredTopics() {
return referredTopics;
}
@@ -598,11 +320,6 @@
private final String[] connectors = new String[] { "_", "#", "$", "@" };
- public RandomNameGenerator(String firstNameFilePath, String lastNameFilePath) throws IOException {
- firstNames = FileUtil.listyFile(new File(firstNameFilePath)).toArray(new String[] {});
- lastNames = FileUtil.listyFile(new File(lastNameFilePath)).toArray(new String[] {});
- }
-
public RandomNameGenerator(String[] firstNames, String[] lastNames) {
this.firstNames = firstNames;
this.lastNames = lastNames;
@@ -631,12 +348,6 @@
private final MessageTemplate messageTemplate;
- public RandomMessageGenerator(String vendorFilePath, String jargonFilePath) throws IOException {
- List<String> vendors = FileUtil.listyFile(new File(vendorFilePath));
- List<String> jargon = FileUtil.listyFile(new File(jargonFilePath));
- this.messageTemplate = new MessageTemplate(vendors, jargon);
- }
-
public RandomMessageGenerator(String[] vendors, String[] jargon) {
List<String> vendorList = new ArrayList<String>();
for (String v : vendors) {
@@ -737,106 +448,15 @@
}
}
- public static class FileUtil {
-
- public static List<String> listyFile(File file) throws IOException {
- BufferedReader reader = new BufferedReader(new FileReader(file));
- String line;
- List<String> list = new ArrayList<String>();
- while (true) {
- line = reader.readLine();
- if (line == null) {
- break;
- }
- list.add(line);
- }
- reader.close();
- return list;
- }
-
- public static FileAppender getFileAppender(String filePath, boolean createIfNotExists, boolean overwrite)
- throws IOException {
- return new FileAppender(filePath, createIfNotExists, overwrite);
- }
- }
-
- public static class FileAppender {
-
- private final BufferedWriter writer;
-
- public FileAppender(String filePath, boolean createIfNotExists, boolean overwrite) throws IOException {
- File file = new File(filePath);
- if (!file.exists()) {
- if (createIfNotExists) {
- new File(file.getParent()).mkdirs();
- } else {
- throw new IOException("path " + filePath + " does not exists");
- }
- }
- this.writer = new BufferedWriter(new FileWriter(file, !overwrite));
- }
-
- public void appendToFile(String content) throws IOException {
- writer.append(content);
- writer.append("\n");
- }
-
- public void close() throws IOException {
- writer.close();
- }
- }
-
- public static class RandomEmploymentGenerator {
-
- private final int percentEmployed;
- private final Random random = new Random();
- private final RandomDateGenerator randDateGen;
- private final List<String> organizations;
- private Employment emp;
-
- public RandomEmploymentGenerator(int percentEmployed, Date beginEmpDate, Date endEmpDate, String orgListPath)
- throws IOException {
- this.percentEmployed = percentEmployed;
- this.randDateGen = new RandomDateGenerator(beginEmpDate, endEmpDate);
- organizations = FileUtil.listyFile(new File(orgListPath));
- emp = new Employment();
- }
-
- public RandomEmploymentGenerator(int percentEmployed, Date beginEmpDate, Date endEmpDate, String[] orgList) {
- this.percentEmployed = percentEmployed;
- this.randDateGen = new RandomDateGenerator(beginEmpDate, endEmpDate);
- organizations = new ArrayList<String>();
- for (String org : orgList) {
- organizations.add(org);
- }
- emp = new Employment();
- }
-
- public Employment getRandomEmployment() {
- int empployed = random.nextInt(100) + 1;
- boolean isEmployed = false;
- if (empployed <= percentEmployed) {
- isEmployed = true;
- }
- Date startDate = randDateGen.getNextRandomDate();
- Date endDate = null;
- if (!isEmployed) {
- endDate = randDateGen.getNextRecentDate(startDate);
- }
- String org = organizations.get(random.nextInt(organizations.size()));
- emp.reset(org, startDate, endDate);
- return emp;
- }
- }
-
public static class RandomLocationGenerator {
+ private Random random = new Random();
+
private final int beginLat;
private final int endLat;
private final int beginLong;
private final int endLong;
- private Random random = new Random();
private Point point;
public RandomLocationGenerator(int beginLat, int endLat, int beginLong, int endLong) {
@@ -862,417 +482,6 @@
}
- public static class PartitionConfiguration {
-
- private final TargetPartition targetPartition;
- private final SourcePartition sourcePartition;
-
- public PartitionConfiguration(SourcePartition sourcePartition, TargetPartition targetPartition) {
- this.sourcePartition = sourcePartition;
- this.targetPartition = targetPartition;
- }
-
- public TargetPartition getTargetPartition() {
- return targetPartition;
- }
-
- public SourcePartition getSourcePartition() {
- return sourcePartition;
- }
-
- }
-
- public static class SourcePartition {
-
- private final String name;
- private final String host;
- private final String path;
-
- public SourcePartition(String name, String host, String path) {
- this.name = name;
- this.host = host;
- this.path = path;
- }
-
- public String getName() {
- return name;
- }
-
- public String getHost() {
- return host;
- }
-
- public String getPath() {
- return path;
- }
- }
-
- public static class TargetPartition {
- private final String name;
- private final String host;
- private final String path;
- private final int fbUserKeyMin;
- private final int fbUserKeyMax;
- private final int twUserKeyMin;
- private final int twUserKeyMax;
- private final int fbMessageIdMin;
- private final int fbMessageIdMax;
- private final int twMessageIdMin;
- private final int twMessageIdMax;
- private final int commonUsers;
-
- public TargetPartition(String partitionName, String host, String path, int fbUserKeyMin, int fbUserKeyMax,
- int twUserKeyMin, int twUserKeyMax, int fbMessageIdMin, int fbMessageIdMax, int twMessageIdMin,
- int twMessageIdMax, int commonUsers) {
- this.name = partitionName;
- this.host = host;
- this.path = path;
- this.fbUserKeyMin = fbUserKeyMin;
- this.fbUserKeyMax = fbUserKeyMax;
- this.twUserKeyMin = twUserKeyMin;
- this.twUserKeyMax = twUserKeyMax;
- this.twMessageIdMin = twMessageIdMin;
- this.twMessageIdMax = twMessageIdMax;
- this.fbMessageIdMin = fbMessageIdMin;
- this.fbMessageIdMax = fbMessageIdMax;
- this.commonUsers = commonUsers;
- }
-
- public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append(name);
- builder.append(" ");
- builder.append(host);
- builder.append("\n");
- builder.append(path);
- builder.append("\n");
- builder.append("fbUser:key:min");
- builder.append(fbUserKeyMin);
-
- builder.append("\n");
- builder.append("fbUser:key:max");
- builder.append(fbUserKeyMax);
-
- builder.append("\n");
- builder.append("twUser:key:min");
- builder.append(twUserKeyMin);
-
- builder.append("\n");
- builder.append("twUser:key:max");
- builder.append(twUserKeyMax);
-
- builder.append("\n");
- builder.append("fbMessage:key:min");
- builder.append(fbMessageIdMin);
-
- builder.append("\n");
- builder.append("fbMessage:key:max");
- builder.append(fbMessageIdMax);
-
- builder.append("\n");
- builder.append("twMessage:key:min");
- builder.append(twMessageIdMin);
-
- builder.append("\n");
- builder.append("twMessage:key:max");
- builder.append(twMessageIdMax);
-
- builder.append("\n");
- builder.append("twMessage:key:max");
- builder.append(twMessageIdMax);
-
- builder.append("\n");
- builder.append("commonUsers");
- builder.append(commonUsers);
-
- return new String(builder);
- }
-
- public String getName() {
- return name;
- }
-
- public String getHost() {
- return host;
- }
-
- public int getFbUserKeyMin() {
- return fbUserKeyMin;
- }
-
- public int getFbUserKeyMax() {
- return fbUserKeyMax;
- }
-
- public int getTwUserKeyMin() {
- return twUserKeyMin;
- }
-
- public int getTwUserKeyMax() {
- return twUserKeyMax;
- }
-
- public int getFbMessageIdMin() {
- return fbMessageIdMin;
- }
-
- public int getFbMessageIdMax() {
- return fbMessageIdMax;
- }
-
- public int getTwMessageIdMin() {
- return twMessageIdMin;
- }
-
- public int getTwMessageIdMax() {
- return twMessageIdMax;
- }
-
- public int getCommonUsers() {
- return commonUsers;
- }
-
- public String getPath() {
- return path;
- }
- }
-
- public static class Employment {
-
- private String organization;
- private Date startDate;
- private Date endDate;
-
- public Employment(String organization, Date startDate, Date endDate) {
- this.organization = organization;
- this.startDate = startDate;
- this.endDate = endDate;
- }
-
- public Employment() {
- }
-
- public void reset(String organization, Date startDate, Date endDate) {
- this.organization = organization;
- this.startDate = startDate;
- this.endDate = endDate;
- }
-
- public String getOrganization() {
- return organization;
- }
-
- public Date getStartDate() {
- return startDate;
- }
-
- public Date getEndDate() {
- return endDate;
- }
-
- public String toString() {
- StringBuilder builder = new StringBuilder("");
- builder.append("{");
- builder.append("\"organization-name\":");
- builder.append("\"" + organization + "\"");
- builder.append(",");
- builder.append("\"start-date\":");
- builder.append(startDate);
- if (endDate != null) {
- builder.append(",");
- builder.append("\"end-date\":");
- builder.append(endDate);
- }
- builder.append("}");
- return new String(builder);
- }
-
- }
-
- public static class FacebookMessage {
-
- private int messageId;
- private int authorId;
- private int inResponseTo;
- private Point senderLocation;
- private Message message;
-
- public int getMessageId() {
- return messageId;
- }
-
- public int getAuthorID() {
- return authorId;
- }
-
- public Point getSenderLocation() {
- return senderLocation;
- }
-
- public Message getMessage() {
- return message;
- }
-
- public int getInResponseTo() {
- return inResponseTo;
- }
-
- public FacebookMessage() {
-
- }
-
- public FacebookMessage(int messageId, int authorId, int inResponseTo, Point senderLocation, Message message) {
- this.messageId = messageId;
- this.authorId = authorId;
- this.inResponseTo = inResponseTo;
- this.senderLocation = senderLocation;
- this.message = message;
- }
-
- public void reset(int messageId, int authorId, int inResponseTo, Point senderLocation, Message message) {
- this.messageId = messageId;
- this.authorId = authorId;
- this.inResponseTo = inResponseTo;
- this.senderLocation = senderLocation;
- this.message = message;
- }
-
- public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append("{");
- builder.append("\"message-id\":");
- builder.append(messageId);
- builder.append(",");
- builder.append("\"author-id\":");
- builder.append(authorId);
- builder.append(",");
- builder.append("\"in-response-to\":");
- builder.append(inResponseTo);
- builder.append(",");
- builder.append("\"sender-location\":");
- builder.append(senderLocation);
- builder.append(",");
- builder.append("\"message\":");
- builder.append("\"");
- for (int i = 0; i < message.getLength(); i++) {
- builder.append(message.charAt(i));
- }
- builder.append("\"");
- builder.append("}");
- return new String(builder);
- }
- }
-
- public static class FacebookUser {
-
- private int id;
- private String alias;
- private String name;
- private String userSince;
- private int[] friendIds;
- private Employment employment;
-
- public FacebookUser() {
-
- }
-
- public FacebookUser(int id, String alias, String name, String userSince, int[] friendIds, Employment employment) {
- this.id = id;
- this.alias = alias;
- this.name = name;
- this.userSince = userSince;
- this.friendIds = friendIds;
- this.employment = employment;
- }
-
- public int getId() {
- return id;
- }
-
- public String getAlias() {
- return alias;
- }
-
- public String getName() {
- return name;
- }
-
- public String getUserSince() {
- return userSince;
- }
-
- public int[] getFriendIds() {
- return friendIds;
- }
-
- public Employment getEmployment() {
- return employment;
- }
-
- public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append("{");
- builder.append("\"id\":" + id);
- builder.append(",");
- builder.append("\"alias\":" + "\"" + alias + "\"");
- builder.append(",");
- builder.append("\"name\":" + "\"" + name + "\"");
- builder.append(",");
- builder.append("\"user-since\":" + userSince);
- builder.append(",");
- builder.append("\"friend-ids\":");
- builder.append("{{");
- for (int i = 0; i < friendIds.length; i++) {
- builder.append(friendIds[i]);
- builder.append(",");
- }
- if (friendIds.length > 0) {
- builder.deleteCharAt(builder.lastIndexOf(","));
- }
- builder.append("}}");
- builder.append(",");
- builder.append("\"employment\":");
- builder.append("[");
- builder.append(employment.toString());
- builder.append("]");
- builder.append("}");
- return builder.toString();
- }
-
- public void setId(int id) {
- this.id = id;
- }
-
- public void setAlias(String alias) {
- this.alias = alias;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public void setUserSince(String userSince) {
- this.userSince = userSince;
- }
-
- public void setFriendIds(int[] friendIds) {
- this.friendIds = friendIds;
- }
-
- public void setEmployment(Employment employment) {
- this.employment = employment;
- }
-
- public void reset(int id, String alias, String name, String userSince, int[] friendIds, Employment employment) {
- this.id = id;
- this.alias = alias;
- this.name = name;
- this.userSince = userSince;
- this.friendIds = friendIds;
- this.employment = employment;
- }
- }
-
public static class TweetMessage {
private long tweetid;
@@ -1283,7 +492,6 @@
private Message messageText;
public TweetMessage() {
-
}
public TweetMessage(long tweetid, TwitterUser user, Point senderLocation, DateTime sendTime,
@@ -1461,478 +669,6 @@
}
- public static class DistributionHandler {
-
- private final ZipfGenerator zipfGen;
- private final int totalUsers;
- private final int totalMessages;
- private Random random = new Random();
-
- public DistributionHandler(int totalMessages, double skew, int totalNumUsers) {
- zipfGen = new ZipfGenerator(totalMessages, skew);
- totalUsers = totalNumUsers;
- this.totalMessages = totalMessages;
- }
-
- public int getFromDistribution(int rank) {
- double prob = zipfGen.getProbability(rank);
- int numMessages = (int) (prob * totalMessages);
- return numMessages;
- }
-
- public static void main(String args[]) {
- int totalMessages = 1000 * 4070;
- double skew = 0.5;
- int totalUsers = 4070;
- DistributionHandler d = new DistributionHandler(totalMessages, skew, totalUsers);
- int sum = 0;
- for (int i = totalUsers; i >= 1; i--) {
- float contrib = d.getFromDistribution(i);
- sum += contrib;
- System.out.println(i + ":" + contrib);
- }
-
- System.out.println("SUM" + ":" + sum);
-
- }
- }
-
- public static class ZipfGenerator {
-
- private Random rnd = new Random(System.currentTimeMillis());
- private int size;
- private double skew;
- private double bottom = 0;
-
- public ZipfGenerator(int size, double skew) {
- this.size = size;
- this.skew = skew;
- for (int i = 1; i < size; i++) {
- this.bottom += (1 / Math.pow(i, this.skew));
- }
- }
-
- // the next() method returns an rank id. The frequency of returned rank
- // ids are follows Zipf distribution.
- public int next() {
- int rank;
- double friquency = 0;
- double dice;
- rank = rnd.nextInt(size);
- friquency = (1.0d / Math.pow(rank, this.skew)) / this.bottom;
- dice = rnd.nextDouble();
- while (!(dice < friquency)) {
- rank = rnd.nextInt(size);
- friquency = (1.0d / Math.pow(rank, this.skew)) / this.bottom;
- dice = rnd.nextDouble();
- }
- return rank;
- }
-
- // This method returns a probability that the given rank occurs.
- public double getProbability(int rank) {
- return (1.0d / Math.pow(rank, this.skew)) / this.bottom;
- }
-
- public static void main(String[] args) throws IOException {
- int total = (int) (3.7 * 1000 * 1000);
- int skew = 2;
- int numUsers = 1000 * 1000;
- /*
- * if (args.length != 2) { System.out.println("usage:" +
- * "./zipf size skew"); System.exit(-1); }
- */
- BufferedWriter buf = new BufferedWriter(new FileWriter(new File("/tmp/zip_output")));
- ZipfGenerator zipf = new ZipfGenerator(total, skew);
- double sum = 0;
- for (int i = 1; i <= numUsers; i++) {
- double prob = zipf.getProbability(i);
- double contribution = (double) (prob * total);
- String contrib = i + ":" + contribution;
- buf.write(contrib);
- buf.write("\n");
- System.out.println(contrib);
- sum += contribution;
- }
- System.out.println("sum is :" + sum);
- }
- }
-
- public static class PartitionElement implements ILibraryElement {
- private final String name;
- private final String host;
- private final int fbUserKeyMin;
- private final int fbUserKeyMax;
- private final int twUserKeyMin;
- private final int twUserKeyMax;
- private final int fbMessageIdMin;
- private final int fbMessageIdMax;
- private final int twMessageIdMin;
- private final int twMessageIdMax;
-
- public PartitionElement(String partitionName, String host, int fbUserKeyMin, int fbUserKeyMax,
- int twUserKeyMin, int twUserKeyMax, int fbMessageIdMin, int fbMessageIdMax, int twMessageIdMin,
- int twMessageIdMax) {
- this.name = partitionName;
- this.host = host;
- this.fbUserKeyMin = fbUserKeyMin;
- this.fbUserKeyMax = fbUserKeyMax;
- this.twUserKeyMin = twUserKeyMax;
- this.twUserKeyMax = twUserKeyMax;
- this.twMessageIdMin = twMessageIdMin;
- this.twMessageIdMax = twMessageIdMax;
- this.fbMessageIdMin = fbMessageIdMin;
- this.fbMessageIdMax = fbMessageIdMax;
- }
-
- public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append(name);
- builder.append(" ");
- builder.append(host);
- builder.append("\n");
- builder.append("fbUser:key:min");
- builder.append(fbUserKeyMin);
-
- builder.append("\n");
- builder.append("fbUser:key:max");
- builder.append(fbUserKeyMax);
-
- builder.append("\n");
- builder.append("twUser:key:min");
- builder.append(twUserKeyMin);
-
- builder.append("\n");
- builder.append("twUser:key:max");
- builder.append(twUserKeyMax);
-
- builder.append("\n");
- builder.append("fbMessage:key:min");
- builder.append(fbMessageIdMin);
-
- builder.append("\n");
- builder.append("fbMessage:key:max");
- builder.append(fbMessageIdMax);
-
- builder.append("\n");
- builder.append("twMessage:key:min");
- builder.append(twMessageIdMin);
-
- builder.append("\n");
- builder.append("twMessage:key:max");
- builder.append(twMessageIdMax);
-
- builder.append("\n");
- builder.append("twMessage:key:max");
- builder.append(twUserKeyMin);
-
- return new String(builder);
- }
-
- @Override
- public String getName() {
- return "Partition";
- }
-
- }
-
- interface ILibraryElement {
-
- public enum ElementType {
- PARTITION
- }
-
- public String getName();
-
- }
-
- public static class Configuration {
-
- private final float numMB;
- private final String unit;
-
- private final List<SourcePartition> sourcePartitions;
- private List<TargetPartition> targetPartitions;
-
- public Configuration(float numMB, String unit, List<SourcePartition> partitions) throws IOException {
- this.numMB = numMB;
- this.unit = unit;
- this.sourcePartitions = partitions;
-
- }
-
- public float getNumMB() {
- return numMB;
- }
-
- public String getUnit() {
- return unit;
- }
-
- public List<SourcePartition> getSourcePartitions() {
- return sourcePartitions;
- }
-
- public List<TargetPartition> getTargetPartitions() {
- return targetPartitions;
- }
-
- public void setTargetPartitions(List<TargetPartition> targetPartitions) {
- this.targetPartitions = targetPartitions;
- }
-
- }
-
- public static class XMLUtil {
-
- public static void writeToXML(Configuration conf, String filePath) throws IOException,
- ParserConfigurationException, TransformerException {
-
- DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
- DocumentBuilder docBuilder = docFactory.newDocumentBuilder();
-
- // root elements
- Document doc = docBuilder.newDocument();
- Element rootElement = doc.createElement("Partitions");
- doc.appendChild(rootElement);
-
- int index = 0;
- for (TargetPartition partition : conf.getTargetPartitions()) {
- writePartitionElement(conf.getSourcePartitions().get(index), partition, rootElement, doc);
- }
-
- TransformerFactory transformerFactory = TransformerFactory.newInstance();
- Transformer transformer = transformerFactory.newTransformer();
-
- transformer.setOutputProperty(OutputKeys.ENCODING, "utf-8");
- transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no");
- transformer.setOutputProperty(OutputKeys.INDENT, "yes");
- transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "2");
-
- DOMSource source = new DOMSource(doc);
- StreamResult result = new StreamResult(new File(filePath));
-
- transformer.transform(source, result);
-
- }
-
- public static void writePartitionInfo(Configuration conf, String filePath) throws IOException {
- BufferedWriter bw = new BufferedWriter(new FileWriter(filePath));
- for (SourcePartition sp : conf.getSourcePartitions()) {
- bw.write(sp.getHost() + ":" + sp.getName() + ":" + sp.getPath());
- bw.write("\n");
- }
- bw.close();
- }
-
- public static Document getDocument(String filePath) throws Exception {
- File inputFile = new File(filePath);
- DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
- DocumentBuilder db = dbf.newDocumentBuilder();
- Document doc = db.parse(inputFile);
- doc.getDocumentElement().normalize();
- return doc;
- }
-
- public static Configuration getConfiguration(String filePath) throws Exception {
- Configuration conf = getConfiguration(getDocument(filePath));
- PartitionMetrics metrics = new PartitionMetrics(conf.getNumMB(), conf.getUnit(), conf.getSourcePartitions()
- .size());
- List<TargetPartition> targetPartitions = getTargetPartitions(metrics, conf.getSourcePartitions());
- conf.setTargetPartitions(targetPartitions);
- return conf;
- }
-
- public static Configuration getConfiguration(Document document) throws IOException {
- Element rootEle = document.getDocumentElement();
- NodeList nodeList = rootEle.getChildNodes();
- float size = Float.parseFloat(getStringValue((Element) nodeList, "size"));
- String unit = getStringValue((Element) nodeList, "unit");
- List<SourcePartition> sourcePartitions = getSourcePartitions(document);
- return new Configuration(size, unit, sourcePartitions);
- }
-
- public static List<SourcePartition> getSourcePartitions(Document document) {
- Element rootEle = document.getDocumentElement();
- NodeList nodeList = rootEle.getElementsByTagName("partition");
- List<SourcePartition> sourcePartitions = new ArrayList<SourcePartition>();
- for (int i = 0; i < nodeList.getLength(); i++) {
- Node node = nodeList.item(i);
- sourcePartitions.add(getSourcePartition((Element) node));
- }
- return sourcePartitions;
- }
-
- public static SourcePartition getSourcePartition(Element functionElement) {
- String name = getStringValue(functionElement, "name");
- String host = getStringValue(functionElement, "host");
- String path = getStringValue(functionElement, "path");
- SourcePartition sp = new SourcePartition(name, host, path);
- return sp;
- }
-
- public static String getStringValue(Element element, String tagName) {
- String textValue = null;
- NodeList nl = element.getElementsByTagName(tagName);
- if (nl != null && nl.getLength() > 0) {
- Element el = (Element) nl.item(0);
- textValue = el.getFirstChild().getNodeValue();
- }
- return textValue;
- }
-
- public static PartitionConfiguration getPartitionConfiguration(String filePath, String partitionName)
- throws Exception {
- PartitionConfiguration pconf = getPartitionConfigurations(getDocument(filePath), partitionName);
- return pconf;
- }
-
- public static PartitionConfiguration getPartitionConfigurations(Document document, String partitionName)
- throws IOException {
-
- Element rootEle = document.getDocumentElement();
- NodeList nodeList = rootEle.getElementsByTagName("Partition");
-
- for (int i = 0; i < nodeList.getLength(); i++) {
- Node node = nodeList.item(i);
- Element nodeElement = (Element) node;
- String name = getStringValue(nodeElement, "name");
- if (!name.equalsIgnoreCase(partitionName)) {
- continue;
- }
- String host = getStringValue(nodeElement, "host");
- String path = getStringValue(nodeElement, "path");
-
- String fbUserKeyMin = getStringValue(nodeElement, "fbUserKeyMin");
- String fbUserKeyMax = getStringValue(nodeElement, "fbUserKeyMax");
- String twUserKeyMin = getStringValue(nodeElement, "twUserKeyMin");
- String twUserKeyMax = getStringValue(nodeElement, "twUserKeyMax");
- String fbMessageKeyMin = getStringValue(nodeElement, "fbMessageKeyMin");
-
- String fbMessageKeyMax = getStringValue(nodeElement, "fbMessageKeyMax");
- String twMessageKeyMin = getStringValue(nodeElement, "twMessageKeyMin");
- String twMessageKeyMax = getStringValue(nodeElement, "twMessageKeyMax");
- String numCommonUsers = getStringValue(nodeElement, "numCommonUsers");
-
- SourcePartition sp = new SourcePartition(name, host, path);
-
- TargetPartition tp = new TargetPartition(partitionName, host, path, Integer.parseInt(fbUserKeyMin),
- Integer.parseInt(fbUserKeyMax), Integer.parseInt(twUserKeyMin), Integer.parseInt(twUserKeyMax),
- Integer.parseInt(fbMessageKeyMin), Integer.parseInt(fbMessageKeyMax),
- Integer.parseInt(twMessageKeyMin), Integer.parseInt(twMessageKeyMax),
- Integer.parseInt(numCommonUsers));
- PartitionConfiguration pc = new PartitionConfiguration(sp, tp);
- return pc;
- }
- return null;
- }
-
- public static List<TargetPartition> getTargetPartitions(PartitionMetrics metrics,
- List<SourcePartition> sourcePartitions) {
- List<TargetPartition> partitions = new ArrayList<TargetPartition>();
- int fbUserKeyMin = 1;
- int twUserKeyMin = 1;
- int fbMessageIdMin = 1;
- int twMessageIdMin = 1;
-
- for (SourcePartition sp : sourcePartitions) {
- int fbUserKeyMax = fbUserKeyMin + metrics.getFbOnlyUsers() + metrics.getCommonUsers() - 1;
- int twUserKeyMax = twUserKeyMin + metrics.getTwitterOnlyUsers() + metrics.getCommonUsers() - 1;
-
- int fbMessageIdMax = fbMessageIdMin + metrics.getFbMessages() - 1;
- int twMessageIdMax = twMessageIdMin + metrics.getTwMessages() - 1;
- TargetPartition pe = new TargetPartition(sp.getName(), sp.getHost(), sp.getPath(), fbUserKeyMin,
- fbUserKeyMax, twUserKeyMin, twUserKeyMax, fbMessageIdMin, fbMessageIdMax, twMessageIdMin,
- twMessageIdMax, metrics.getCommonUsers());
- partitions.add(pe);
-
- fbUserKeyMin = fbUserKeyMax + 1;
- twUserKeyMin = twUserKeyMax + 1;
-
- fbMessageIdMin = fbMessageIdMax + 1;
- twMessageIdMin = twMessageIdMax + 1;
- }
-
- return partitions;
- }
-
- public static void writePartitionElement(SourcePartition sourcePartition, TargetPartition partition,
- Element rootElement, Document doc) {
- // staff elements
- Element pe = doc.createElement("Partition");
- rootElement.appendChild(pe);
-
- // name element
- Element name = doc.createElement("name");
- name.appendChild(doc.createTextNode("" + partition.getName()));
- pe.appendChild(name);
-
- // host element
- Element host = doc.createElement("host");
- host.appendChild(doc.createTextNode("" + partition.getHost()));
- pe.appendChild(host);
-
- // path element
- Element path = doc.createElement("path");
- path.appendChild(doc.createTextNode("" + partition.getPath()));
- pe.appendChild(path);
-
- // fbUserKeyMin element
- Element fbUserKeyMin = doc.createElement("fbUserKeyMin");
- fbUserKeyMin.appendChild(doc.createTextNode("" + partition.getFbUserKeyMin()));
- pe.appendChild(fbUserKeyMin);
-
- // fbUserKeyMax element
- Element fbUserKeyMax = doc.createElement("fbUserKeyMax");
- fbUserKeyMax.appendChild(doc.createTextNode("" + partition.getFbUserKeyMax()));
- pe.appendChild(fbUserKeyMax);
-
- // twUserKeyMin element
- Element twUserKeyMin = doc.createElement("twUserKeyMin");
- twUserKeyMin.appendChild(doc.createTextNode("" + partition.getTwUserKeyMin()));
- pe.appendChild(twUserKeyMin);
-
- // twUserKeyMax element
- Element twUserKeyMax = doc.createElement("twUserKeyMax");
- twUserKeyMax.appendChild(doc.createTextNode("" + partition.getTwUserKeyMax()));
- pe.appendChild(twUserKeyMax);
-
- // fbMessgeKeyMin element
- Element fbMessageKeyMin = doc.createElement("fbMessageKeyMin");
- fbMessageKeyMin.appendChild(doc.createTextNode("" + partition.getFbMessageIdMin()));
- pe.appendChild(fbMessageKeyMin);
-
- // fbMessgeKeyMin element
- Element fbMessageKeyMax = doc.createElement("fbMessageKeyMax");
- fbMessageKeyMax.appendChild(doc.createTextNode("" + partition.getFbMessageIdMax()));
- pe.appendChild(fbMessageKeyMax);
-
- // twMessgeKeyMin element
- Element twMessageKeyMin = doc.createElement("twMessageKeyMin");
- twMessageKeyMin.appendChild(doc.createTextNode("" + partition.getTwMessageIdMin()));
- pe.appendChild(twMessageKeyMin);
-
- // twMessgeKeyMin element
- Element twMessageKeyMax = doc.createElement("twMessageKeyMax");
- twMessageKeyMax.appendChild(doc.createTextNode("" + partition.getTwMessageIdMax()));
- pe.appendChild(twMessageKeyMax);
-
- // twMessgeKeyMin element
- Element numCommonUsers = doc.createElement("numCommonUsers");
- numCommonUsers.appendChild(doc.createTextNode("" + partition.getCommonUsers()));
- pe.appendChild(numCommonUsers);
-
- }
-
- public static void main(String args[]) throws Exception {
- String confFile = "/Users/rgrove1/work/research/asterix/icde/data-gen/conf/conf.xml";
- String outputPath = "/Users/rgrove1/work/research/asterix/icde/data-gen/output/conf-output.xml";
- Configuration conf = getConfiguration(confFile);
- writeToXML(conf, outputPath);
- }
-
- }
-
public static class Date {
private int day;
@@ -1992,57 +728,6 @@
}
}
- public static class PartitionMetrics {
-
- private final int fbMessages;
- private final int twMessages;
-
- private final int fbOnlyUsers;
- private final int twitterOnlyUsers;
- private final int commonUsers;
-
- public PartitionMetrics(float number, String unit, int numPartitions) throws IOException {
-
- int factor = 0;
- if (unit.equalsIgnoreCase("MB")) {
- factor = 1024 * 1024;
- } else if (unit.equalsIgnoreCase("GB")) {
- factor = 1024 * 1024 * 1024;
- } else if (unit.equalsIgnoreCase("TB")) {
- factor = 1024 * 1024 * 1024 * 1024;
- } else
- throw new IOException("Invalid unit:" + unit);
-
- fbMessages = (int) (((number * factor * 0.80) / 258) / numPartitions);
- twMessages = (int) (fbMessages * 1.1 / 0.35);
-
- fbOnlyUsers = (int) ((number * factor * 0.20 * 0.0043)) / numPartitions;
- twitterOnlyUsers = (int) (0.25 * fbOnlyUsers);
- commonUsers = (int) (0.1 * fbOnlyUsers);
- }
-
- public int getFbMessages() {
- return fbMessages;
- }
-
- public int getTwMessages() {
- return twMessages;
- }
-
- public int getFbOnlyUsers() {
- return fbOnlyUsers;
- }
-
- public int getTwitterOnlyUsers() {
- return twitterOnlyUsers;
- }
-
- public int getCommonUsers() {
- return commonUsers;
- }
-
- }
-
public static String[] lastNames = { "Hoopengarner", "Harrow", "Gardner", "Blyant", "Best", "Buttermore", "Gronko",
"Mayers", "Countryman", "Neely", "Ruhl", "Taggart", "Bash", "Cason", "Hil", "Zalack", "Mingle", "Carr",
"Rohtin", "Wardle", "Pullman", "Wire", "Kellogg", "Hiles", "Keppel", "Bratton", "Sutton", "Wickes",
@@ -2482,4 +1167,4 @@
"Lexicone", "Fax-fax", "Viatechi", "Inchdox", "Kongreen", "Doncare", "Y-geohex", "Opeelectronics",
"Medflex", "Dancode", "Roundhex", "Labzatron", "Newhotplus", "Sancone", "Ronholdings", "Quoline",
"zoomplus", "Fix-touch", "Codetechno", "Tanzumbam", "Indiex", "Canline" };
-}
+}
\ No newline at end of file
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapter.java
index 4cdda1e..278565a 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapter.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapter.java
@@ -4,7 +4,6 @@
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
-import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -20,23 +19,14 @@
private static final long serialVersionUID = 1L;
- public static final String KEY_PORT = "port";
-
private static final Logger LOGGER = Logger.getLogger(GenericSocketFeedAdapter.class.getName());
- private Map<String, String> configuration;
-
private SocketFeedServer socketFeedServer;
- private static final int DEFAULT_PORT = 2909;
-
- public GenericSocketFeedAdapter(Map<String, String> configuration, ITupleParserFactory parserFactory,
- ARecordType outputtype, IHyracksTaskContext ctx) throws AsterixException, IOException {
+ public GenericSocketFeedAdapter(ITupleParserFactory parserFactory, ARecordType outputtype, int port,
+ IHyracksTaskContext ctx) throws AsterixException, IOException {
super(parserFactory, outputtype, ctx);
- this.configuration = configuration;
- String portValue = (String) this.configuration.get(KEY_PORT);
- int port = portValue != null ? Integer.parseInt(portValue) : DEFAULT_PORT;
- this.socketFeedServer = new SocketFeedServer(configuration, outputtype, port);
+ this.socketFeedServer = new SocketFeedServer(outputtype, port);
}
@Override
@@ -53,8 +43,7 @@
private ServerSocket serverSocket;
private InputStream inputStream;
- public SocketFeedServer(Map<String, String> configuration, ARecordType outputtype, int port)
- throws IOException, AsterixException {
+ public SocketFeedServer(ARecordType outputtype, int port) throws IOException, AsterixException {
try {
serverSocket = new ServerSocket(port);
} catch (Exception e) {
@@ -70,19 +59,27 @@
public InputStream getInputStream() {
Socket socket;
try {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("waiting for client at " + serverSocket.getLocalPort());
+ }
socket = serverSocket.accept();
inputStream = socket.getInputStream();
} catch (IOException e) {
if (LOGGER.isLoggable(Level.SEVERE)) {
LOGGER.severe("Unable to create input stream required for feed ingestion");
}
- e.printStackTrace();
}
return inputStream;
}
public void stop() throws IOException {
- serverSocket.close();
+ try {
+ serverSocket.close();
+ } catch (IOException ioe) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Unable to close socket at " + serverSocket.getLocalPort());
+ }
+ }
}
}
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapterFactory.java
index 03c65c7..4fd0453 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapterFactory.java
@@ -14,34 +14,52 @@
*/
package edu.uci.ics.asterix.tools.external.data;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.commons.lang3.StringUtils;
import edu.uci.ics.asterix.external.adapter.factory.StreamBasedAdapterFactory;
import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
import edu.uci.ics.asterix.metadata.feeds.IGenericAdapterFactory;
import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.util.AsterixRuntimeUtil;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
/**
* Factory class for creating @see{GenericSocketFeedAdapter} The
* adapter listens at a port for receiving data (from external world).
- * Data received is transformed into Asterix Data Format (ADM) and stored into
- * a dataset a configured in the Adapter configuration.
+ * Data received is transformed into Asterix Data Format (ADM).
*/
public class GenericSocketFeedAdapterFactory extends StreamBasedAdapterFactory implements IGenericAdapterFactory {
- /**
- *
- */
private static final long serialVersionUID = 1L;
private ARecordType outputType;
+ private List<Pair<String, Integer>> sockets;
+
+ private Mode mode = Mode.IP;
+
+ public static final String KEY_SOCKETS = "sockets";
+
+ public static final String KEY_MODE = "address-type";
+
+ public static enum Mode {
+ NC,
+ IP
+ }
+
@Override
public String getName() {
- return "generic_socket_feed";
+ return "socket_adaptor";
}
@Override
@@ -59,16 +77,67 @@
this.configuration = configuration;
outputType = (ARecordType) outputType;
this.configureFormat(outputType);
+ this.configureSockets(configuration);
}
@Override
public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
- return new AlgebricksCountPartitionConstraint(1);
+ List<String> locations = new ArrayList<String>();
+ for (Pair<String, Integer> socket : sockets) {
+ locations.add(socket.first);
+ }
+ return new AlgebricksAbsolutePartitionConstraint(locations.toArray(new String[] {}));
}
@Override
public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
- return new GenericSocketFeedAdapter(configuration, parserFactory, outputType, ctx);
+ Pair<String, Integer> socket = sockets.get(partition);
+ return new GenericSocketFeedAdapter(parserFactory, outputType, socket.second, ctx);
}
+ private void configureSockets(Map<String, String> configuration) throws Exception {
+ sockets = new ArrayList<Pair<String, Integer>>();
+ String modeValue = configuration.get(KEY_MODE);
+ if (modeValue != null) {
+ mode = Mode.valueOf(modeValue.trim().toUpperCase());
+ }
+ String socketsValue = configuration.get(KEY_SOCKETS);
+ Map<String, Set<String>> ncMap = AsterixRuntimeUtil.getNodeControllerMap();
+ List<String> ncs = AsterixRuntimeUtil.getAllNodeControllers();
+ if (socketsValue == null) {
+ throw new IllegalArgumentException("\'sockets\' parameter not specified as part of adaptor configuration");
+ }
+ String[] socketsArray = socketsValue.split(",");
+ Random random = new Random();
+ for (String socket : socketsArray) {
+ String[] socketTokens = socket.split(":");
+ String host = socketTokens[0];
+ int port = Integer.parseInt(socketTokens[1]);
+ Pair<String, Integer> p = null;
+ switch (mode) {
+ case IP:
+ Set<String> ncsOnIp = ncMap.get(host);
+ if (ncsOnIp == null || ncsOnIp.isEmpty()) {
+ throw new IllegalArgumentException("Invalid host " + host
+ + " as it is not part of the AsterixDB cluster. Valid choices are "
+ + StringUtils.join(ncMap.keySet(), ", "));
+ }
+ String[] ncArray = ncsOnIp.toArray(new String[] {});
+ String nc = ncArray[random.nextInt(ncArray.length)];
+ p = new Pair<String, Integer>(nc, port);
+ break;
+
+ case NC:
+ p = new Pair<String, Integer>(host, port);
+ if (!ncs.contains(host)) {
+ throw new IllegalArgumentException("Invalid NC " + host
+ + " as it is not part of the AsterixDB cluster. Valid choices are "
+ + StringUtils.join(ncs, ", "));
+
+ }
+ break;
+ }
+ sockets.add(p);
+ }
+ }
}
\ No newline at end of file
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator.java
index b92c3fd..28ff14d 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator.java
@@ -1,17 +1,3 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
package edu.uci.ics.asterix.tools.external.data;
import java.io.IOException;
@@ -25,49 +11,93 @@
public class TweetGenerator {
+ public static final String NUM_KEY_SPACES = "num-key-spaces";
public static final String KEY_DURATION = "duration";
public static final String KEY_TPS = "tps";
- public static final String KEY_MIN_TPS = "tps-min";
- public static final String KEY_MAX_TPS = "tps-max";
public static final String KEY_TPUT_DURATION = "tput-duration";
public static final String KEY_GUID_SEED = "guid-seed";
+ public static final String KEY_FRAME_WRITER_MODE = "frame-writer-mode";
+ public static final String KEY_DATA_MODE = "data-mode";
public static final String OUTPUT_FORMAT = "output-format";
public static final String OUTPUT_FORMAT_ARECORD = "arecord";
public static final String OUTPUT_FORMAT_ADM_STRING = "adm-string";
+ private static final int DEFAULT_DURATION = 60;
+ private static final int DEFAULT_GUID_SEED = 0;
+
private int duration;
private TweetMessageIterator tweetIterator = null;
+ private int partition;
+ private int tweetCount = 0;
private int frameTweetCount = 0;
private int numFlushedTweets = 0;
private OutputStream os;
private DataGenerator dataGenerator = null;
private ByteBuffer outputBuffer = ByteBuffer.allocate(32 * 1024);
+ private GULongIDGenerator[] uidGenerators;
+ private int numUidGenerators;
+ private FrameWriterMode frameWriterMode;
+ private DataMode dataMode;
- public TweetGenerator(Map<String, String> configuration, int partition, String format) throws Exception {
- String value = configuration.get(KEY_DURATION);
- duration = value != null ? Integer.parseInt(value) : 60;
- InitializationInfo info = new InitializationInfo();
- info.timeDurationInSecs = duration;
- dataGenerator = new DataGenerator(info);
-
- String seedValue = configuration.get(KEY_GUID_SEED);
- int seedInt = seedValue != null ? Integer.parseInt(seedValue) : 0;
- tweetIterator = dataGenerator.new TweetMessageIterator(duration, partition, (byte) seedInt);
+ public int getTweetCount() {
+ return tweetCount;
}
- private void writeTweetString(TweetMessage next) throws IOException {
- String tweet = next.toString() + "\n";
- byte[] b = tweet.getBytes();
- if (outputBuffer.position() + b.length > outputBuffer.limit()) {
- flush();
- numFlushedTweets += frameTweetCount;
- frameTweetCount = 0;
- outputBuffer.put(b);
- frameTweetCount++;
- } else {
- outputBuffer.put(b);
- frameTweetCount++;
+ public enum DataMode {
+ REUSE_DATA,
+ NEW_DATA
+ }
+
+ public enum FrameWriterMode {
+ DUMMY_NO_PARSING,
+ PARSING
+ }
+
+ public TweetGenerator(Map<String, String> configuration, int partition, String format, OutputStream os)
+ throws Exception {
+ this.partition = partition;
+ String value = configuration.get(KEY_DURATION);
+ duration = value != null ? Integer.parseInt(value) : DEFAULT_DURATION;
+
+ value = configuration.get(KEY_DATA_MODE);
+ dataMode = value != null ? DataMode.valueOf(value) : DataMode.NEW_DATA;
+ numUidGenerators = configuration.get(NUM_KEY_SPACES) != null ? Integer.parseInt(configuration
+ .get(NUM_KEY_SPACES)) : 1;
+ uidGenerators = new GULongIDGenerator[numUidGenerators];
+
+ int guidSeed = configuration.get(KEY_GUID_SEED) != null ? Integer.parseInt(configuration.get(KEY_GUID_SEED))
+ : DEFAULT_GUID_SEED;
+
+ for (int i = 0; i < uidGenerators.length; i++) {
+ uidGenerators[i] = new GULongIDGenerator(partition, (byte) (i + guidSeed));
+ }
+
+ InitializationInfo info = new InitializationInfo();
+ dataGenerator = new DataGenerator(info);
+ value = configuration.get(KEY_FRAME_WRITER_MODE);
+ frameWriterMode = value != null ? FrameWriterMode.valueOf(value.toUpperCase()) : FrameWriterMode.PARSING;
+ dataMode = configuration.get(KEY_DATA_MODE) != null ? DataMode.valueOf(configuration.get(KEY_DATA_MODE))
+ : DataMode.NEW_DATA;
+ tweetIterator = dataGenerator.new TweetMessageIterator(duration, uidGenerators, dataMode);
+ this.os = os;
+ }
+
+ private void writeTweetString(TweetMessage tweetMessage) throws IOException {
+ String tweet = tweetMessage.toString() + "\n";
+ tweetCount++;
+ if (frameWriterMode.equals(FrameWriterMode.PARSING)) {
+ byte[] b = tweet.getBytes();
+ if (outputBuffer.position() + b.length > outputBuffer.limit()) {
+ flush();
+ numFlushedTweets += frameTweetCount;
+ frameTweetCount = 0;
+ outputBuffer.put(b);
+ frameTweetCount++;
+ } else {
+ outputBuffer.put(b);
+ frameTweetCount++;
+ }
}
}
@@ -80,21 +110,22 @@
os.write(outputBuffer.array(), 0, outputBuffer.limit());
outputBuffer.position(0);
outputBuffer.limit(32 * 1024);
+ tweetIterator.toggleUidKeySpace();
}
public boolean setNextRecordBatch(int numTweetsInBatch) throws Exception {
- int count = 0;
- if (tweetIterator.hasNext()) {
+ boolean moreData = tweetIterator.hasNext();
+ if (!moreData) {
+ System.out.println("TWEET COUNT: [" + partition + "]" + tweetCount);
+ return false;
+ } else {
+ int count = 0;
while (count < numTweetsInBatch) {
writeTweetString(tweetIterator.next());
count++;
}
return true;
}
- return false;
}
- public void setOutputStream(OutputStream os) {
- this.os = os;
- }
-}
+}
\ No newline at end of file
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
index 07e018a..56e4673 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
@@ -1,30 +1,11 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
package edu.uci.ics.asterix.tools.external.data;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.net.UnknownHostException;
-import java.util.Date;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
import java.util.Map;
-import java.util.Random;
-import java.util.Timer;
-import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
@@ -39,6 +20,8 @@
/**
* TPS can be configured between 1 and 20,000
+ *
+ * @author ramang
*/
public class TwitterFirehoseFeedAdapter extends StreamBasedAdapter implements IFeedAdapter {
@@ -46,180 +29,104 @@
private static final Logger LOGGER = Logger.getLogger(TwitterFirehoseFeedAdapter.class.getName());
- private final TwitterServer twitterServer;
-
- private TwitterClient twitterClient;
-
- private static final String LOCALHOST = "127.0.0.1";
- private static final int PORT = 2909;
- private static final int TPUT_DURATION_DEFAULT = 5; // 5 seconds
+ private final Map<String, String> configuration;
private ExecutorService executorService = Executors.newCachedThreadPool();
+ private PipedOutputStream outputStream = new PipedOutputStream();
+
+ private PipedInputStream inputStream = new PipedInputStream(outputStream);
+
+ private final TwitterServer twitterServer;
+
public TwitterFirehoseFeedAdapter(Map<String, String> configuration, ITupleParserFactory parserFactory,
- ARecordType outputtype, IHyracksTaskContext ctx, int partition) throws Exception {
+ ARecordType outputtype, int partition, IHyracksTaskContext ctx) throws Exception {
super(parserFactory, outputtype, ctx);
- this.twitterServer = new TwitterServer(configuration, outputtype, executorService, partition);
- this.twitterClient = new TwitterClient(twitterServer.getPort());
+ this.configuration = configuration;
+ this.twitterServer = new TwitterServer(configuration, partition, outputtype, outputStream, executorService);
}
@Override
public void start(int partition, IFrameWriter writer) throws Exception {
twitterServer.start();
- twitterClient.start();
super.start(partition, writer);
}
@Override
public InputStream getInputStream(int partition) throws IOException {
- return twitterClient.getInputStream();
+ return inputStream;
}
private static class TwitterServer {
- private ServerSocket serverSocket;
- private final Listener listener;
- private int port = -1;
- private ExecutorService executorService;
+ private final DataProvider dataProvider;
+ private final ExecutorService executorService;
- public TwitterServer(Map<String, String> configuration, ARecordType outputtype,
- ExecutorService executorService, int partition) throws Exception {
- int numAttempts = 0;
- while (port < 0) {
- try {
- serverSocket = new ServerSocket(PORT + numAttempts);
- port = PORT + numAttempts;
- } catch (Exception e) {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("port: " + (PORT + numAttempts) + " unusable ");
- }
- numAttempts++;
- }
- }
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Twitter server configured to use port: " + port);
- }
- String dvds = configuration.get("dataverse-dataset");
- listener = new Listener(serverSocket, configuration, outputtype, dvds, partition);
+ public TwitterServer(Map<String, String> configuration, int partition, ARecordType outputtype, OutputStream os,
+ ExecutorService executorService) throws Exception {
+ dataProvider = new DataProvider(configuration, outputtype, partition, os);
this.executorService = executorService;
}
- public void start() {
- executorService.execute(listener);
- }
-
public void stop() throws IOException {
- listener.stop();
- serverSocket.close();
+ dataProvider.stop();
}
- public int getPort() {
- return port;
- }
- }
-
- private static class TwitterClient {
-
- private Socket socket;
- private int port;
-
- public TwitterClient(int port) throws UnknownHostException, IOException {
- this.port = port;
- }
-
- public InputStream getInputStream() throws IOException {
- return socket.getInputStream();
- }
-
- public void start() throws UnknownHostException, IOException {
- socket = new Socket(LOCALHOST, port);
+ public void start() {
+ executorService.execute(dataProvider);
}
}
- private static class Listener implements Runnable {
-
- private final ServerSocket serverSocket;
- private Socket socket;
- private TweetGenerator tweetGenerator;
- private boolean continuePush = true;
- private int fixedTps = -1;
- private int minTps = -1;
- private int maxTps = -1;
- private int tputDuration;
- private Rate task;
- private Mode mode;
+ private static class DataProvider implements Runnable {
public static final String KEY_MODE = "mode";
+ private TweetGenerator tweetGenerator;
+ private boolean continuePush = true;
+ private int batchSize;
+ private final Mode mode;
+ private final OutputStream os;
+
public static enum Mode {
AGGRESSIVE,
- CONTROLLED,
+ CONTROLLED
}
- public Listener(ServerSocket serverSocket, Map<String, String> configuration, ARecordType outputtype,
- String datasetName, int partition) throws Exception {
- this.serverSocket = serverSocket;
- this.tweetGenerator = new TweetGenerator(configuration, partition, TweetGenerator.OUTPUT_FORMAT_ADM_STRING);
- String value = configuration.get(KEY_MODE);
- String confValue = null;
- if (value != null) {
- mode = Mode.valueOf(value.toUpperCase());
- switch (mode) {
- case AGGRESSIVE:
- break;
- case CONTROLLED:
- confValue = configuration.get(TweetGenerator.KEY_TPS);
- if (confValue != null) {
- minTps = Integer.parseInt(confValue);
- maxTps = minTps;
- fixedTps = minTps;
- } else {
- confValue = configuration.get(TweetGenerator.KEY_MIN_TPS);
- if (confValue != null) {
- minTps = Integer.parseInt(confValue);
- }
- confValue = configuration.get(TweetGenerator.KEY_MAX_TPS);
- if (confValue != null) {
- maxTps = Integer.parseInt(configuration.get(TweetGenerator.KEY_MAX_TPS));
- }
-
- if (minTps < 0 || maxTps < 0 || minTps > maxTps) {
- throw new IllegalArgumentException("Incorrect value for min/max TPS");
- }
- }
-
- }
- } else {
- mode = Mode.AGGRESSIVE;
+ public DataProvider(Map<String, String> configuration, ARecordType outputtype, int partition, OutputStream os)
+ throws Exception {
+ this.tweetGenerator = new TweetGenerator(configuration, partition, TweetGenerator.OUTPUT_FORMAT_ADM_STRING,
+ os);
+ this.os = os;
+ mode = configuration.get(KEY_MODE) != null ? Mode.valueOf(configuration.get(KEY_MODE).toUpperCase())
+ : Mode.AGGRESSIVE;
+ switch (mode) {
+ case CONTROLLED:
+ String tpsValue = configuration.get(TweetGenerator.KEY_TPS);
+ if (tpsValue == null) {
+ throw new IllegalArgumentException("TPS value not configured. use tps=<value>");
+ }
+ batchSize = Integer.parseInt(tpsValue);
+ break;
+ case AGGRESSIVE:
+ batchSize = 5000;
+ break;
}
-
- value = configuration.get(TweetGenerator.KEY_TPUT_DURATION);
- tputDuration = value != null ? Integer.parseInt(value) : TPUT_DURATION_DEFAULT;
- task = new Rate(tweetGenerator, tputDuration, datasetName, partition);
}
@Override
public void run() {
+ boolean moreData = true;
+ long startBatch;
+ long endBatch;
+
while (true) {
try {
- socket = serverSocket.accept();
- OutputStream os = socket.getOutputStream();
- tweetGenerator.setOutputStream(os);
- boolean moreData = true;
- Timer timer = new Timer();
- timer.schedule(task, tputDuration * 1000, tputDuration * 1000);
- long startBatch;
- long endBatch;
- Random random = new Random();
- int batchSize = 0;
while (moreData && continuePush) {
switch (mode) {
+ case AGGRESSIVE:
+ moreData = tweetGenerator.setNextRecordBatch(batchSize);
+ break;
case CONTROLLED:
- if (maxTps > 0) {
- batchSize = minTps + random.nextInt((maxTps + 1) - minTps);
- } else {
- batchSize = fixedTps;
- }
startBatch = System.currentTimeMillis();
moreData = tweetGenerator.setNextRecordBatch(batchSize);
endBatch = System.currentTimeMillis();
@@ -227,30 +134,14 @@
Thread.sleep(1000 - (endBatch - startBatch));
}
break;
- case AGGRESSIVE:
- batchSize = Integer.MAX_VALUE;
- moreData = tweetGenerator.setNextRecordBatch(batchSize);
}
}
- timer.cancel();
os.close();
break;
} catch (Exception e) {
if (LOGGER.isLoggable(Level.WARNING)) {
LOGGER.warning("Exception in adaptor " + e.getMessage());
}
- } finally {
- try {
- if (socket != null && socket.isClosed()) {
- socket.close();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Closed socket:" + socket.getPort());
- }
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
-
}
}
}
@@ -259,37 +150,6 @@
continuePush = false;
}
- private static class Rate extends TimerTask {
-
- private final TweetGenerator gen;
- private final int tputDuration;
- private final int partition;
- private final String dataset;
- private int prevMeasuredTweets = 0;
-
- public Rate(TweetGenerator gen, int tputDuration, String dataset, int partition) {
- this.gen = gen;
- this.tputDuration = tputDuration;
- this.dataset = dataset;
- this.partition = partition;
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning(new Date() + " " + "Dataset" + " " + "partition" + " " + "Total flushed tweets"
- + "\t" + "intantaneous throughput");
- }
- }
-
- @Override
- public void run() {
- int currentMeasureTweets = gen.getNumFlushedTweets();
- if (LOGGER.isLoggable(Level.FINE)) {
- LOGGER.fine(dataset + " " + partition + " " + gen.getNumFlushedTweets() + "\t"
- + ((currentMeasureTweets - prevMeasuredTweets) / tputDuration) + " ID "
- + Thread.currentThread().getId());
- }
- prevMeasuredTweets = currentMeasureTweets;
- }
-
- }
}
@Override
@@ -297,4 +157,9 @@
twitterServer.stop();
}
-}
+ @Override
+ public Map<String, String> getConfiguration() {
+ return configuration;
+ }
+
+}
\ No newline at end of file
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
index 2305c32..41b1915 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
@@ -1,5 +1,5 @@
/*
-x * Copyright 2009-2012 by The Regents of the University of California
+x * Copyright 2009-2013 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
@@ -41,13 +41,6 @@
private static final long serialVersionUID = 1L;
/*
- * The dataverse and dataset names for the target feed dataset. This informaiton
- * is used in configuring partition constraints for the adapter. It is preferred that
- * the adapter location does not coincide with a partition location for the feed dataset.
- */
- private static final String KEY_DATAVERSE_DATASET = "dataverse-dataset";
-
- /*
* Degree of parallelism for feed ingestion activity. Defaults to 1.
* This builds up the count constraint for the ingestion operator.
*/
@@ -106,7 +99,7 @@
@Override
public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
- return new TwitterFirehoseFeedAdapter(configuration, parserFactory, outputType, ctx, partition);
+ return new TwitterFirehoseFeedAdapter(configuration, parserFactory, outputType, partition, ctx);
}
@Override