fault-tolerance checkpoint
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedServlet.java
new file mode 100644
index 0000000..dbcf566
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedServlet.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.api.http.servlet;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.List;
+import java.util.logging.Level;
+
+import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.json.JSONObject;
+
+import edu.uci.ics.asterix.api.common.APIFramework.DisplayFormat;
+import edu.uci.ics.asterix.api.common.SessionConfig;
+import edu.uci.ics.asterix.aql.base.Statement;
+import edu.uci.ics.asterix.aql.base.Statement.Kind;
+import edu.uci.ics.asterix.aql.parser.AQLParser;
+import edu.uci.ics.asterix.aql.parser.ParseException;
+import edu.uci.ics.asterix.aql.parser.TokenMgrError;
+import edu.uci.ics.asterix.aql.translator.AqlTranslator;
+import edu.uci.ics.asterix.common.config.GlobalConfig;
+import edu.uci.ics.asterix.metadata.MetadataManager;
+import edu.uci.ics.asterix.result.ResultReader;
+import edu.uci.ics.asterix.result.ResultUtils;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.dataset.IHyracksDataset;
+import edu.uci.ics.hyracks.client.dataset.HyracksDataset;
+
+public class FeedServlet extends HttpServlet {
+ private static final long serialVersionUID = 1L;
+
+ private static final String HYRACKS_CONNECTION_ATTR = "edu.uci.ics.asterix.HYRACKS_CONNECTION";
+
+ private static final String HYRACKS_DATASET_ATTR = "edu.uci.ics.asterix.HYRACKS_DATASET";
+
+ @Override
+ public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
+ response.setContentType("application/json");
+ response.setCharacterEncoding("utf-8");
+
+ PrintWriter out = response.getWriter();
+
+ DisplayFormat format = DisplayFormat.HTML;
+
+ String contentType = request.getContentType();
+
+ if ((contentType == null) || (contentType.equals("text/plain"))) {
+ format = DisplayFormat.TEXT;
+ } else if (contentType.equals("application/json")) {
+ format = DisplayFormat.JSON;
+ }
+
+
+ ServletContext context = getServletContext();
+ IHyracksClientConnection hcc;
+ IHyracksDataset hds;
+
+ try {
+ synchronized (context) {
+ hcc = (IHyracksClientConnection) context.getAttribute(HYRACKS_CONNECTION_ATTR);
+
+ hds = (IHyracksDataset) context.getAttribute(HYRACKS_DATASET_ATTR);
+ if (hds == null) {
+ hds = new HyracksDataset(hcc, ResultReader.FRAME_SIZE, ResultReader.NUM_READERS);
+ context.setAttribute(HYRACKS_DATASET_ATTR, hds);
+ }
+ }
+
+ } catch (ParseException | TokenMgrError | edu.uci.ics.asterix.aqlplus.parser.TokenMgrError pe) {
+ GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, pe.getMessage(), pe);
+ String errorMessage = ResultUtils.buildParseExceptionMessage(pe, "");
+ JSONObject errorResp = ResultUtils.getErrorResponse(2, errorMessage, "", "");
+ out.write(errorResp.toString());
+ response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+ } catch (Exception e) {
+ GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, e.getMessage(), e);
+ ResultUtils.apiErrorHandler(out, e);
+ response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+ }
+ }
+
+}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index fd55a07..38f0fd2 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -20,6 +20,7 @@
import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
@@ -83,7 +84,6 @@
import edu.uci.ics.asterix.metadata.bootstrap.MetadataConstants;
import edu.uci.ics.asterix.metadata.dataset.hints.DatasetHints;
import edu.uci.ics.asterix.metadata.dataset.hints.DatasetHints.DatasetNodegroupCardinalityHint;
-import edu.uci.ics.asterix.metadata.declared.AqlDataSource;
import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
import edu.uci.ics.asterix.metadata.entities.Dataset;
import edu.uci.ics.asterix.metadata.entities.Datatype;
@@ -540,6 +540,7 @@
String nodegroupName;
String hintValue = dd.getHints().get(DatasetNodegroupCardinalityHint.NAME);
if (hintValue != null) {
+ int numChosen = 0;
boolean valid = DatasetHints.validate(DatasetNodegroupCardinalityHint.NAME,
dd.getHints().get(DatasetNodegroupCardinalityHint.NAME)).first;
if (!valid) {
@@ -548,33 +549,35 @@
nodegroupCardinality = Integer.parseInt(dd.getHints().get(DatasetNodegroupCardinalityHint.NAME));
}
Set<String> nodeNames = AsterixAppContextInfo.getInstance().getMetadataProperties().getNodeNames();
+ Set<String> nodeNamesClone = new HashSet<String>();
+ for (String node : nodeNames) {
+ nodeNamesClone.add(node);
+ }
String metadataNodeName = AsterixAppContextInfo.getInstance().getMetadataProperties().getMetadataNodeName();
List<String> selectedNodes = new ArrayList<String>();
selectedNodes.add(metadataNodeName);
- nodeNames.remove(metadataNodeName);
- nodegroupCardinality--;
+ numChosen++;
+ nodeNamesClone.remove(metadataNodeName);
- Random random = new Random();
- String[] nodes = nodeNames.toArray(new String[] {});
- int[] b = new int[nodeNames.size()];
- for (int i = 0; i < b.length; i++) {
- b[i] = i;
- }
+ if (numChosen < nodegroupCardinality) {
+ Random random = new Random();
+ String[] nodes = nodeNamesClone.toArray(new String[] {});
+ int[] b = new int[nodeNamesClone.size()];
+ for (int i = 0; i < b.length; i++) {
+ b[i] = i;
+ }
- for (int i = 0; i < nodegroupCardinality; i++) {
- int selected = i + random.nextInt(nodeNames.size() - i);
- int selNodeIndex = b[selected];
- selectedNodes.add(nodes[selNodeIndex]);
- int temp = b[0];
- b[0] = b[selected];
- b[selected] = temp;
+ for (int i = 0; i < nodegroupCardinality - numChosen; i++) {
+ int selected = i + random.nextInt(nodeNames.size() - i);
+ int selNodeIndex = b[selected];
+ selectedNodes.add(nodes[selNodeIndex]);
+ int temp = b[0];
+ b[0] = b[selected];
+ b[selected] = temp;
+ }
}
nodegroupName = dataverse + ":" + dd.getName().getValue();
MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, new NodeGroup(nodegroupName, selectedNodes));
- //TODO: Remove this hack. In future we would not mandate metadata node to be one of the
- // storage nodes. We require that currently so that ingestion node does not coincide with
- // the metadata node.
- nodeNames.add(metadataNodeName);
} else {
nodegroupName = MetadataConstants.METADATA_DEFAULT_NODEGROUP_NAME;
}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java
index cf7ac4d..5e52f3e 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java
@@ -18,13 +18,7 @@
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
-import edu.uci.ics.asterix.metadata.entities.Dataset;
-import edu.uci.ics.asterix.metadata.entities.Feed;
import edu.uci.ics.asterix.metadata.entities.FeedActivity;
-import edu.uci.ics.asterix.metadata.feeds.FeedMessage;
-import edu.uci.ics.asterix.metadata.feeds.IFeedMessage;
-import edu.uci.ics.asterix.metadata.feeds.IFeedMessage.MessageType;
-import edu.uci.ics.asterix.translator.CompiledStatements.CompiledDisconnectFeedStatement;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -61,12 +55,9 @@
IOperatorDescriptor feedMessenger;
AlgebricksPartitionConstraint messengerPc;
- IFeedMessage feedMessage = null;
- feedMessage = new FeedMessage(MessageType.END);
-
try {
- Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = metadataProvider.buildFeedMessengerRuntime(
- metadataProvider, spec, dataverseName, feedName, datasetName, feedMessage, feedActivity);
+ Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = metadataProvider
+ .buildDisconnectFeedMessengerRuntime(spec, dataverseName, feedName, datasetName, feedActivity);
feedMessenger = p.first;
messengerPc = p.second;
} catch (AlgebricksException e) {
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedUtil.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedUtil.java
index 0a93a8e..07a53b3 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedUtil.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedUtil.java
@@ -17,7 +17,12 @@
import edu.uci.ics.asterix.metadata.feeds.FeedConnectionId;
import edu.uci.ics.asterix.metadata.feeds.FeedIntakeOperatorDescriptor;
import edu.uci.ics.asterix.metadata.feeds.FeedMetaOperatorDescriptor;
+import edu.uci.ics.asterix.metadata.feeds.FeedRuntime.FeedRuntimeType;
import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.std.StreamProjectRuntimeFactory;
import edu.uci.ics.hyracks.api.constraints.Constraint;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
import edu.uci.ics.hyracks.api.constraints.expressions.ConstantExpression;
@@ -57,17 +62,33 @@
orig.getFeedPolicy());
oldNewOID.put(opDesc.getOperatorId(), fiop.getOperatorId());
} else if (opDesc instanceof AsterixLSMTreeInsertDeleteOperatorDescriptor) {
- AsterixLSMTreeInsertDeleteOperatorDescriptor orig = (AsterixLSMTreeInsertDeleteOperatorDescriptor) opDesc;
- AsterixLSMTreeInsertDeleteOperatorDescriptor liop = new AsterixLSMTreeInsertDeleteOperatorDescriptor(
- altered, orig.getRecordDescriptor(), orig.getStorageManager(),
- orig.getLifecycleManagerProvider(), orig.getFileSplitProvider(), orig.getTreeIndexTypeTraits(),
- orig.getComparatorFactories(), orig.getTreeIndexBloomFilterKeyFields(),
- orig.getFieldPermutations(), orig.getIndexOperation(), orig.getIndexDataflowHelperFactory(),
- orig.getTupleFilterFactory(), orig.getModificationOpCallbackFactory(), orig.isPrimary());
- oldNewOID.put(opDesc.getOperatorId(), liop.getOperatorId());
- } else {
FeedMetaOperatorDescriptor metaOp = new FeedMetaOperatorDescriptor(altered, feedConnectionId, opDesc,
- feedPolicy);
+ feedPolicy, FeedRuntimeType.STORAGE);
+ /*
+ AsterixLSMTreeInsertDeleteOperatorDescriptor orig = (AsterixLSMTreeInsertDeleteOperatorDescriptor) opDesc;
+ AsterixLSMTreeInsertDeleteOperatorDescriptor liop = new AsterixLSMTreeInsertDeleteOperatorDescriptor(
+ altered, orig.getRecordDescriptor(), orig.getStorageManager(),
+ orig.getLifecycleManagerProvider(), orig.getFileSplitProvider(), orig.getTreeIndexTypeTraits(),
+ orig.getComparatorFactories(), orig.getTreeIndexBloomFilterKeyFields(),
+ orig.getFieldPermutations(), orig.getIndexOperation(), orig.getIndexDataflowHelperFactory(),
+ orig.getTupleFilterFactory(), orig.getModificationOpCallbackFactory(), orig.isPrimary());
+ oldNewOID.put(opDesc.getOperatorId(), liop.getOperatorId());
+ */
+ oldNewOID.put(opDesc.getOperatorId(), metaOp.getOperatorId());
+ } else {
+ FeedRuntimeType runtimeType = null;
+ if (opDesc instanceof AlgebricksMetaOperatorDescriptor) {
+ IPushRuntimeFactory runtimeFactory = ((AlgebricksMetaOperatorDescriptor) opDesc).getPipeline()
+ .getRuntimeFactories()[0];
+ if (runtimeFactory instanceof AssignRuntimeFactory) {
+ runtimeType = FeedRuntimeType.COMPUTE;
+ } else if (runtimeFactory instanceof StreamProjectRuntimeFactory) {
+ runtimeType = FeedRuntimeType.COMMIT;
+ }
+ }
+ FeedMetaOperatorDescriptor metaOp = new FeedMetaOperatorDescriptor(altered, feedConnectionId, opDesc,
+ feedPolicy, runtimeType);
+
oldNewOID.put(opDesc.getOperatorId(), metaOp.getOperatorId());
}
}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index 73eeb7e..4b60764 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -23,6 +23,7 @@
import edu.uci.ics.asterix.api.http.servlet.APIServlet;
import edu.uci.ics.asterix.api.http.servlet.DDLAPIServlet;
+import edu.uci.ics.asterix.api.http.servlet.FeedServlet;
import edu.uci.ics.asterix.api.http.servlet.QueryAPIServlet;
import edu.uci.ics.asterix.api.http.servlet.QueryResultAPIServlet;
import edu.uci.ics.asterix.api.http.servlet.QueryStatusAPIServlet;
@@ -46,6 +47,8 @@
private Server webServer;
private Server jsonAPIServer;
+ private Server feedServer;
+
private static IAsterixStateProxy proxy;
private ICCApplicationContext appCtx;
@@ -77,6 +80,9 @@
jsonAPIServer.start();
ExternalLibraryBootstrap.setUpExternaLibraries(false);
+ setupFeedServer(externalProperties);
+ feedServer.start();
+
ccAppCtx.addClusterLifecycleListener(ClusterLifecycleListener.INSTANCE);
}
@@ -127,4 +133,17 @@
context.addServlet(new ServletHolder(new UpdateAPIServlet()), "/update");
context.addServlet(new ServletHolder(new DDLAPIServlet()), "/ddl");
}
+
+ private void setupFeedServer(AsterixExternalProperties externalProperties) throws Exception {
+ feedServer = new Server(externalProperties.getFeedServerPort());
+
+ ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
+ context.setContextPath("/");
+
+ IHyracksClientConnection hcc = getNewHyracksClientConnection();
+ context.setAttribute(HYRACKS_CONNECTION_ATTR, hcc);
+
+ feedServer.setHandler(context);
+ context.addServlet(new ServletHolder(new FeedServlet()), "/");
+ }
}
\ No newline at end of file
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
index 325dc2e..63be317 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
@@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Random;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
@@ -38,6 +39,9 @@
import edu.uci.ics.asterix.aql.expression.Identifier;
import edu.uci.ics.asterix.aql.translator.AqlTranslator;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.event.schema.cluster.Cluster;
+import edu.uci.ics.asterix.file.JobSpecificationUtils;
import edu.uci.ics.asterix.hyracks.bootstrap.FeedLifecycleListener.FeedFailure.FailureType;
import edu.uci.ics.asterix.metadata.MetadataException;
import edu.uci.ics.asterix.metadata.MetadataManager;
@@ -48,6 +52,8 @@
import edu.uci.ics.asterix.metadata.cluster.AddNodeWork;
import edu.uci.ics.asterix.metadata.cluster.ClusterManager;
import edu.uci.ics.asterix.metadata.cluster.IClusterManagementWorkResponse;
+import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.metadata.entities.Dataverse;
import edu.uci.ics.asterix.metadata.entities.FeedActivity;
import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityDetails;
import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
@@ -55,16 +61,23 @@
import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies;
import edu.uci.ics.asterix.metadata.feeds.FeedConnectionId;
import edu.uci.ics.asterix.metadata.feeds.FeedIntakeOperatorDescriptor;
+import edu.uci.ics.asterix.metadata.feeds.FeedManagerElectMessage;
import edu.uci.ics.asterix.metadata.feeds.FeedMetaOperatorDescriptor;
import edu.uci.ics.asterix.metadata.feeds.FeedPolicyAccessor;
+import edu.uci.ics.asterix.metadata.feeds.IFeedMessage;
+import edu.uci.ics.asterix.metadata.feeds.SuperFeedManager;
import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
import edu.uci.ics.asterix.om.util.AsterixClusterProperties.State;
+import edu.uci.ics.asterix.om.util.AsterixRuntimeUtil;
+import edu.uci.ics.asterix.runtime.formats.NonTaggedDataFormat;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
import edu.uci.ics.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
@@ -75,6 +88,8 @@
import edu.uci.ics.hyracks.api.job.JobInfo;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexInsertUpdateDeleteOperatorDescriptor;
public class FeedLifecycleListener implements IJobLifecycleListener, IClusterEventsSubscriber, Serializable {
@@ -184,9 +199,14 @@
private static final long serialVersionUID = 1L;
private LinkedBlockingQueue<Message> inbox;
private Map<JobId, FeedInfo> registeredFeeds = new HashMap<JobId, FeedInfo>();
+ private FeedMessenger feedMessenger;
+ private LinkedBlockingQueue<FeedMessengerMessage> messengerOutbox;
public FeedJobNotificationHandler(LinkedBlockingQueue<Message> inbox) {
this.inbox = inbox;
+ messengerOutbox = new LinkedBlockingQueue<FeedMessengerMessage>();
+ feedMessenger = new FeedMessenger(messengerOutbox);
+ (new Thread(feedMessenger)).start();
}
public boolean isRegisteredFeed(JobId jobId) {
@@ -304,6 +324,15 @@
feedActivityDetails.put(FeedActivity.FeedActivityDetails.FEED_POLICY_NAME,
feedInfo.feedPolicy.get(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY));
+ int superFeedManagerIndex = new Random().nextInt(feedInfo.ingestLocations.size());
+ String superFeedManagerHost = feedInfo.ingestLocations.get(superFeedManagerIndex);
+
+ SuperFeedManager sfm = new SuperFeedManager(feedInfo.feedConnectionId, superFeedManagerHost, 3000);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Super Feed Manager for " + feedInfo.feedConnectionId + " is " + sfm);
+ }
+ FeedManagerElectMessage feedMessage = new FeedManagerElectMessage(sfm);
+ messengerOutbox.add(new FeedMessengerMessage(feedMessage, feedInfo));
MetadataManager.INSTANCE.acquireWriteLatch();
MetadataTransactionContext mdTxnCtx = null;
try {
@@ -366,6 +395,55 @@
MetadataManager.INSTANCE.releaseWriteLatch();
}
}
+
+ public static class FeedMessengerMessage {
+ private final IFeedMessage message;
+ private final FeedInfo feedInfo;
+
+ public FeedMessengerMessage(IFeedMessage message, FeedInfo feedInfo) {
+ this.message = message;
+ this.feedInfo = feedInfo;
+ }
+
+ public IFeedMessage getMessage() {
+ return message;
+ }
+
+ public FeedInfo getFeedInfo() {
+ return feedInfo;
+ }
+ }
+
+ private static class FeedMessenger implements Runnable {
+
+ private final LinkedBlockingQueue<FeedMessengerMessage> inbox;
+
+ public FeedMessenger(LinkedBlockingQueue<FeedMessengerMessage> inbox) {
+ this.inbox = inbox;
+ }
+
+ public void run() {
+ while (true) {
+ FeedMessengerMessage message = null;
+ try {
+ message = inbox.take();
+ FeedInfo feedInfo = message.getFeedInfo();
+ switch (message.getMessage().getMessageType()) {
+ case SUPER_FEED_MANAGER_ELECT:
+ Thread.sleep(2000);
+ sendSuperFeedManangerElectMessage(feedInfo,
+ (FeedManagerElectMessage) message.getMessage());
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Sent super feed manager election message" + message.getMessage());
+ }
+ }
+ } catch (InterruptedException ie) {
+ break;
+ }
+ }
+ }
+
+ }
}
public static class FeedInfo {
@@ -622,6 +700,46 @@
}
}
+ private static void sendSuperFeedManangerElectMessage(FeedInfo feedInfo, FeedManagerElectMessage electMessage) {
+ try {
+ Dataverse dataverse = new Dataverse(feedInfo.feedConnectionId.getDataverse(),
+ NonTaggedDataFormat.NON_TAGGED_DATA_FORMAT, 0);
+ AqlMetadataProvider metadataProvider = new AqlMetadataProvider(dataverse);
+ JobSpecification spec = JobSpecificationUtils.createJobSpecification();
+
+ IOperatorDescriptor feedMessenger;
+ AlgebricksPartitionConstraint messengerPc;
+ List<String> locations = new ArrayList<String>();
+ locations.addAll(feedInfo.computeLocations);
+ locations.addAll(feedInfo.ingestLocations);
+ locations.addAll(feedInfo.storageLocations);
+
+ Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = metadataProvider.buildSendFeedMessageRuntime(
+ spec, dataverse.getDataverseName(), feedInfo.feedConnectionId.getFeedName(),
+ feedInfo.feedConnectionId.getDatasetName(), electMessage, locations.toArray(new String[] {}));
+ feedMessenger = p.first;
+ messengerPc = p.second;
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, feedMessenger, messengerPc);
+
+ NullSinkOperatorDescriptor nullSink = new NullSinkOperatorDescriptor(spec);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, nullSink, messengerPc);
+ spec.connect(new OneToOneConnectorDescriptor(spec), feedMessenger, 0, nullSink, 0);
+ spec.addRoot(nullSink);
+
+ JobId jobId = AsterixAppContextInfo.getInstance().getHcc().startJob(spec);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("IMPORTANT Super Feed Manager Message: " + electMessage + " Job Id " + jobId);
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Exception in sending super feed manager elect message: " + feedInfo.feedConnectionId + " "
+ + e.getMessage());
+ }
+ }
+ }
+
public static class FeedFailure {
public enum FailureType {
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedWorkRequestResponseHandler.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedWorkRequestResponseHandler.java
index e9039bc..e88bff8 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedWorkRequestResponseHandler.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedWorkRequestResponseHandler.java
@@ -120,7 +120,7 @@
}
JobSpecification spec = feedInfo.jobSpec;
System.out.println("Altered Job Spec \n" + spec);
- Thread.sleep(3000);
+ Thread.sleep(5000);
AsterixAppContextInfo.getInstance().getHcc().startJob(feedInfo.jobSpec);
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixExternalProperties.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixExternalProperties.java
index 98c7c91..63a8057 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixExternalProperties.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixExternalProperties.java
@@ -27,6 +27,9 @@
private static final String EXTERNAL_APISERVER_KEY = "api.port";
private static int EXTERNAL_APISERVER_DEFAULT = 19002;
+ private static final String EXTERNAL_FEEDSERVER_KEY = "feed.port";
+ private static int EXTERNAL_FEEDSERVER_DEFAULT = 19003;
+
private static final String EXTERNAL_CC_JAVA_OPTS_KEY = "cc.java.opts";
private static String EXTERNAL_CC_JAVA_OPTS_DEFAULT = "-Xmx1024m";
@@ -47,6 +50,11 @@
PropertyInterpreters.getIntegerPropertyInterpreter());
}
+ public int getFeedServerPort() {
+ return accessor.getProperty(EXTERNAL_FEEDSERVER_KEY, EXTERNAL_FEEDSERVER_DEFAULT,
+ PropertyInterpreters.getIntegerPropertyInterpreter());
+ }
+
public Level getLogLevel() {
return accessor.getProperty(EXTERNAL_LOGLEVEL_KEY, EXTERNAL_LOGLEVEL_DEFAULT,
PropertyInterpreters.getLevelPropertyInterpreter());
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
index 129a1a7..e850619 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
@@ -76,6 +76,7 @@
}
}
} catch (Exception e) {
+ e.printStackTrace();
throw new HyracksDataException(e);
}
System.arraycopy(buffer.array(), 0, writeBuffer.array(), 0, buffer.capacity());
diff --git a/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/AsterixEventServiceClient.java b/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/AsterixEventServiceClient.java
index 084d30f..8947247 100644
--- a/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/AsterixEventServiceClient.java
+++ b/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/AsterixEventServiceClient.java
@@ -176,7 +176,8 @@
JAXBContext ctx = JAXBContext.newInstance(Configuration.class);
Marshaller marshaller = ctx.createMarshaller();
marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);
- String outputPathDir = System.getProperty("java.io.tmpdir") + File.separator + "conf";
+ String outputPathDir = System.getProperty("java.io.tmpdir") + File.separator + "conf-"
+ + System.getProperty("user.name");
new File(outputPathDir).mkdirs();
String outputPath = outputPathDir + File.separator + "configuration.xml";
marshaller.marshal(configuration, new FileOutputStream(outputPath));
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index e34c735..89368e7 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -19,8 +19,10 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.logging.Logger;
import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
@@ -58,6 +60,7 @@
import edu.uci.ics.asterix.metadata.entities.Index;
import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies;
+import edu.uci.ics.asterix.metadata.feeds.EndFeedMessage;
import edu.uci.ics.asterix.metadata.feeds.ExternalDataScanOperatorDescriptor;
import edu.uci.ics.asterix.metadata.feeds.FeedConnectionId;
import edu.uci.ics.asterix.metadata.feeds.FeedIntakeOperatorDescriptor;
@@ -65,6 +68,7 @@
import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory.SupportedOperation;
import edu.uci.ics.asterix.metadata.feeds.IFeedMessage;
+import edu.uci.ics.asterix.metadata.feeds.IFeedMessage.MessageType;
import edu.uci.ics.asterix.metadata.feeds.IGenericAdapterFactory;
import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
@@ -473,16 +477,42 @@
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(feedIngestor, constraint);
}
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildFeedMessengerRuntime(
- AqlMetadataProvider metadataProvider, JobSpecification jobSpec, String dataverse, String feedName,
- String dataset, IFeedMessage feedMessage, FeedActivity feedActivity) throws AlgebricksException {
- AlgebricksPartitionConstraint partitionConstraint = new AlgebricksAbsolutePartitionConstraint(feedActivity
- .getFeedActivityDetails().get(FeedActivityDetails.INGEST_LOCATIONS).split(","));
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildSendFeedMessageRuntime(
+ JobSpecification jobSpec, String dataverse, String feedName, String dataset, IFeedMessage feedMessage,
+ String[] locations) throws AlgebricksException {
+ AlgebricksPartitionConstraint partitionConstraint = new AlgebricksAbsolutePartitionConstraint(locations);
FeedMessageOperatorDescriptor feedMessenger = new FeedMessageOperatorDescriptor(jobSpec, dataverse, feedName,
dataset, feedMessage);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(feedMessenger, partitionConstraint);
}
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildDisconnectFeedMessengerRuntime(
+ JobSpecification jobSpec, String dataverse, String feedName, String dataset, FeedActivity feedActivity)
+ throws AlgebricksException {
+ List<String> feedLocations = new ArrayList<String>();
+ String[] ingestLocs = feedActivity.getFeedActivityDetails().get(FeedActivityDetails.INGEST_LOCATIONS)
+ .split(",");
+ String[] computeLocs = feedActivity.getFeedActivityDetails().get(FeedActivityDetails.COMPUTE_LOCATIONS)
+ .split(",");
+ String[] storageLocs = feedActivity.getFeedActivityDetails().get(FeedActivityDetails.STORAGE_LOCATIONS)
+ .split(",");
+
+ for (String loc : ingestLocs) {
+ feedLocations.add(loc);
+ }
+ for (String loc : computeLocs) {
+ feedLocations.add(loc);
+ }
+ for (String loc : storageLocs) {
+ feedLocations.add(loc);
+ }
+
+ FeedConnectionId feedId = new FeedConnectionId(dataverse, feedName, dataset);
+ String[] locations = feedLocations.toArray(new String[] {});
+ IFeedMessage feedMessage = new EndFeedMessage(feedId);
+ return buildSendFeedMessageRuntime(jobSpec, dataverse, feedName, dataset, feedMessage, locations);
+ }
+
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildBtreeRuntime(JobSpecification jobSpec,
List<LogicalVariable> outputVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv,
JobGenContext context, boolean retainInput, Dataset dataset, String indexName, int[] lowKeyFields,
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterRuntimeManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterRuntimeManager.java
index 62f779f..1112fdd 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterRuntimeManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterRuntimeManager.java
@@ -14,12 +14,14 @@
*/
package edu.uci.ics.asterix.metadata.feeds;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import edu.uci.ics.asterix.common.api.AsterixThreadExecutor;
-import edu.uci.ics.asterix.metadata.feeds.MaterializingFrameWriter.Mode;
+import edu.uci.ics.asterix.metadata.feeds.FeedRuntime.FeedRuntimeType;
+import edu.uci.ics.asterix.metadata.feeds.FeedFrameWriter.Mode;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -56,7 +58,7 @@
FINISHED_INGESTION
}
- public AdapterRuntimeManager(FeedConnectionId feedId, IFeedAdapter feedAdapter, MaterializingFrameWriter writer,
+ public AdapterRuntimeManager(FeedConnectionId feedId, IFeedAdapter feedAdapter, FeedFrameWriter writer,
int partition, LinkedBlockingQueue<IFeedMessage> inbox) {
this.feedId = feedId;
this.feedAdapter = feedAdapter;
@@ -64,17 +66,18 @@
this.adapterExecutor = new AdapterExecutor(partition, writer, feedAdapter, this);
this.adapterRuntime = new Thread(adapterExecutor);
this.feedInboxMonitor = new FeedInboxMonitor(this, inbox, partition);
- AsterixThreadExecutor.INSTANCE.execute(feedInboxMonitor);
}
@Override
public void start() throws Exception {
state = State.ACTIVE_INGESTION;
- FeedManager.INSTANCE.registerFeedRuntime(this);
+ FeedRuntime ingestionRuntime = new IngestionRuntime(feedId, partition, FeedRuntimeType.INGESTION, this);
+ ExecutorService executorService = FeedManager.INSTANCE.registerFeedRuntime(ingestionRuntime);
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Registered feed runtime manager for " + this.getFeedId());
}
- adapterRuntime.start();
+ getFeedExecutorService().execute(feedInboxMonitor);
+ getFeedExecutorService().execute(adapterExecutor);
}
@Override
@@ -112,13 +115,13 @@
public static class AdapterExecutor implements Runnable {
- private MaterializingFrameWriter writer;
+ private FeedFrameWriter writer;
private IFeedAdapter adapter;
private AdapterRuntimeManager runtimeManager;
- public AdapterExecutor(int partition, MaterializingFrameWriter writer, IFeedAdapter adapter,
+ public AdapterExecutor(int partition, FeedFrameWriter writer, IFeedAdapter adapter,
AdapterRuntimeManager adapterRuntimeMgr) {
this.writer = writer;
this.adapter = adapter;
@@ -146,7 +149,7 @@
}
}
- public MaterializingFrameWriter getWriter() {
+ public FeedFrameWriter getWriter() {
return writer;
}
@@ -195,4 +198,9 @@
return partition;
}
+ @Override
+ public ExecutorService getFeedExecutorService() {
+ return FeedManager.INSTANCE.getFeedExecutorService(feedId);
+ }
+
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AlterFeedMessage.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AlterFeedMessage.java
index 9c0d3e5..73f668a 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AlterFeedMessage.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AlterFeedMessage.java
@@ -27,8 +27,8 @@
private final Map<String, String> alteredConfParams;
- public AlterFeedMessage(Map<String, String> alteredConfParams) {
- super(MessageType.ALTER);
+ public AlterFeedMessage(Map<String, String> alteredConfParams, FeedConnectionId feedId) {
+ super(MessageType.ALTER, feedId);
this.alteredConfParams = alteredConfParams;
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/EndFeedMessage.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/EndFeedMessage.java
new file mode 100644
index 0000000..fffee64
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/EndFeedMessage.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.metadata.feeds;
+
+/**
+ * A feed control message indicating the need to end the feed. This message is dispatched
+ * to all locations that host an operator invovled in the feed pipeline.
+ */
+public class EndFeedMessage extends FeedMessage {
+
+ private static final long serialVersionUID = 1L;
+
+ private final FeedConnectionId feedId;
+
+ public EndFeedMessage(FeedConnectionId feedId) {
+ super(MessageType.END, feedId);
+ this.feedId = feedId;
+ }
+
+ public FeedConnectionId getFeedId() {
+ return feedId;
+ }
+
+ @Override
+ public String toString() {
+ return "" + feedId;
+ }
+}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java
new file mode 100644
index 0000000..c84560a
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java
@@ -0,0 +1,349 @@
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.io.OutputStream;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.metadata.feeds.FeedRuntime.FeedRuntimeType;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class FeedFrameWriter implements IFrameWriter {
+
+ private static final Logger LOGGER = Logger.getLogger(FeedFrameWriter.class.getName());
+
+ private IFrameWriter writer;
+
+ private IOperatorNodePushable nodePushable;
+
+ private FeedPolicyEnforcer policyEnforcer;
+
+ private FeedConnectionId feedId;
+
+ private LinkedBlockingQueue<Long> statsOutbox;
+
+ private final boolean collectStatistics;
+
+ private List<ByteBuffer> frames = new ArrayList<ByteBuffer>();
+
+ private Mode mode;
+
+ private String nodeId;
+
+ private long FLUSH_THRESHOLD_TIME = 5000;
+
+ private FramePushWait framePushWait;
+
+ private SuperFeedManager sfm;
+
+ private FeedRuntimeType feedRuntimeType;
+
+ private int partition;
+
+ private Timer timer;
+
+ private ExecutorService executorService;
+
+ public enum Mode {
+ FORWARD,
+ STORE
+ }
+
+ public FeedFrameWriter(IFrameWriter writer, IOperatorNodePushable nodePushable, FeedConnectionId feedId,
+ FeedPolicyEnforcer policyEnforcer, String nodeId, FeedRuntimeType feedRuntimeType, int partition,
+ ExecutorService executorService) {
+ this.writer = writer;
+ this.mode = Mode.FORWARD;
+ this.nodePushable = nodePushable;
+ this.feedId = feedId;
+ this.policyEnforcer = policyEnforcer;
+ this.feedRuntimeType = feedRuntimeType;
+ this.partition = partition;
+ this.executorService = executorService;
+ this.collectStatistics = policyEnforcer.getFeedPolicyAccessor().collectStatistics();
+ if (collectStatistics) {
+ this.statsOutbox = new LinkedBlockingQueue<Long>();
+ Runnable task = new FeedOperatorStatisticsCollector(feedId, statsOutbox, nodePushable);
+ executorService.execute(task);
+ sfm = FeedManager.INSTANCE.getSuperFeedManager(feedId);
+ framePushWait = new FramePushWait(nodePushable, FLUSH_THRESHOLD_TIME, sfm, feedId, nodeId, feedRuntimeType,
+ partition);
+ Timer timer = new Timer();
+ timer.scheduleAtFixedRate(framePushWait, 0, FLUSH_THRESHOLD_TIME);
+ }
+
+ }
+
+ public Mode getMode() {
+ return mode;
+ }
+
+ public void setMode(Mode newMode) throws HyracksDataException {
+ if (this.mode.equals(newMode)) {
+ return;
+ }
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Switching to :" + newMode + " from " + this.mode);
+ }
+ switch (newMode) {
+ case FORWARD:
+ this.mode = newMode;
+ break;
+ case STORE:
+ this.mode = newMode;
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Beginning to store frames :");
+ LOGGER.info("Frames accumulated till now:" + frames.size());
+ }
+ break;
+ }
+
+ }
+
+ public List<ByteBuffer> getStoredFrames() {
+ return frames;
+ }
+
+ public void clear() {
+ frames.clear();
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ writer.open();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ switch (mode) {
+ case FORWARD:
+ try {
+ if (collectStatistics) {
+ framePushWait.notifyStart();
+ writer.nextFrame(buffer);
+ framePushWait.notifyFinish();
+ } else {
+ writer.nextFrame(buffer);
+ }
+ } catch (Exception e) {
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.severe("Unable to write frame " + " on behalf of " + nodePushable.getDisplayName());
+ }
+ }
+ if (frames.size() > 0) {
+ for (ByteBuffer buf : frames) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Flusing OLD frame: " + buf + " on behalf of "
+ + nodePushable.getDisplayName());
+ }
+ writer.nextFrame(buf);
+ }
+ }
+ frames.clear();
+ break;
+ case STORE:
+ ByteBuffer storageBuffer = ByteBuffer.allocate(buffer.capacity());
+ storageBuffer.put(buffer);
+ frames.add(storageBuffer);
+ storageBuffer.flip();
+ break;
+ }
+ }
+
+ private static class FramePushWait extends TimerTask {
+
+ private long startTime = -1;
+ private IOperatorNodePushable nodePushable;
+ private State state;
+ private long flushThresholdTime;
+ private SuperFeedManager sfm;
+ private static final String EOL = "\n";
+ private FeedConnectionId feedId;
+ private String nodeId;
+ private FeedRuntimeType feedRuntimeType;
+ private int partition;
+
+ public FramePushWait(IOperatorNodePushable nodePushable, long flushThresholdTime, SuperFeedManager sfm,
+ FeedConnectionId feedId, String nodeId, FeedRuntimeType feedRuntimeType, int partition) {
+ this.nodePushable = nodePushable;
+ this.flushThresholdTime = flushThresholdTime;
+ this.state = State.INTIALIZED;
+ this.sfm = sfm;
+ this.feedId = feedId;
+ this.nodeId = nodeId;
+ this.feedRuntimeType = feedRuntimeType;
+ this.partition = partition;
+ }
+
+ public void notifyStart() {
+ startTime = System.currentTimeMillis();
+ state = State.WAITING_FOR_FLUSH_COMPLETION;
+ }
+
+ public void notifyFinish() {
+ state = State.WAITNG_FOR_NEXT_FRAME;
+ }
+
+ @Override
+ public void run() {
+ if (state.equals(State.WAITING_FOR_FLUSH_COMPLETION)) {
+ long currentTime = System.currentTimeMillis();
+ if (currentTime - startTime > flushThresholdTime) {
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.severe("CONGESTION!!!!!!!! BY " + nodePushable);
+ }
+ reportCongestionToSFM(currentTime - startTime);
+ }
+ }
+ }
+
+ private void reportCongestionToSFM(long waitingTime) {
+ String feedRep = feedId.getDataverse() + ":" + feedId.getFeedName() + ":" + feedId.getDatasetName();
+ String operator = "" + feedRuntimeType;
+ String mesg = feedRep + "|" + operator + "|" + partition + "|" + waitingTime + "|" + EOL;
+ Socket sc = null;
+ try {
+ while (sfm == null) {
+ sfm = FeedManager.INSTANCE.getSuperFeedManager(feedId);
+ if (sfm == null) {
+ Thread.sleep(2000);
+ } else {
+ break;
+ }
+ }
+ sc = new Socket(sfm.getHost(), sfm.getPort());
+ OutputStream os = sc.getOutputStream();
+ os.write(mesg.getBytes());
+ } catch (Exception e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Unable to report congestion to " + sfm);
+ }
+ }
+ }
+
+ private enum State {
+ INTIALIZED,
+ WAITING_FOR_FLUSH_COMPLETION,
+ WAITNG_FOR_NEXT_FRAME
+ }
+
+ }
+
+ private static class FeedOperatorStatisticsCollector implements Runnable {
+
+ private final FeedConnectionId feedId;
+ private final LinkedBlockingQueue<Long> inbox;
+ private final long[] readings;
+ private int readingIndex = 0;
+ private int historySize = 10;
+ private double runningAvg = -1;
+ private double deviationPercentageThreshold = 50;
+ private int successiveThresholds = 0;
+ private IOperatorNodePushable coreOperatorNodePushable;
+ private int count;
+
+ public FeedOperatorStatisticsCollector(FeedConnectionId feedId, LinkedBlockingQueue<Long> inbox,
+ IOperatorNodePushable coreOperatorNodePushable) {
+ this.feedId = feedId;
+ this.inbox = inbox;
+ this.readings = new long[historySize];
+ this.coreOperatorNodePushable = coreOperatorNodePushable;
+ }
+
+ @Override
+ public void run() {
+ SuperFeedManager sfm = null;
+ try {
+ while (sfm == null) {
+ sfm = FeedManager.INSTANCE.getSuperFeedManager(feedId);
+ if (sfm == null) {
+ Thread.sleep(2000);
+ }
+ }
+
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Obtained SFM " + sfm + " " + coreOperatorNodePushable.getDisplayName());
+ }
+ while (true) {
+ Long reading = inbox.take();
+ if (count != historySize) {
+ count++;
+ }
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Obtained Reading " + reading + " " + coreOperatorNodePushable.getDisplayName());
+ }
+ double newRunningAvg;
+ double deviation = 0;
+ if (runningAvg >= 0) {
+ int prevIndex = readingIndex == 0 ? historySize - 1 : readingIndex - 1;
+ newRunningAvg = (runningAvg * count - readings[prevIndex] + reading) / (count);
+ deviation = reading - runningAvg;
+ } else {
+ newRunningAvg = reading;
+ }
+
+ double devPercentage = (deviation * 100 / runningAvg);
+
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Current reading :" + reading + " Previous avg:" + runningAvg + " New Average: "
+ + newRunningAvg + " deviation % " + devPercentage + " Op "
+ + coreOperatorNodePushable.getDisplayName());
+ }
+
+ if (devPercentage > deviationPercentageThreshold) {
+ successiveThresholds++;
+ if (successiveThresholds > 1) {
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.severe("CONGESTION in sending frames by "
+ + coreOperatorNodePushable.getDisplayName());
+ }
+ successiveThresholds = 0;
+ }
+ } else {
+ runningAvg = newRunningAvg;
+ readings[readingIndex] = reading;
+ readingIndex = (readingIndex + 1) % historySize;
+ }
+ }
+ } catch (InterruptedException ie) {
+ // do nothing
+ }
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (timer != null) {
+ timer.cancel();
+ }
+ writer.close();
+ }
+
+ public IFrameWriter getWriter() {
+ return writer;
+ }
+
+ public void setWriter(IFrameWriter writer) {
+ this.writer = writer;
+ }
+
+ @Override
+ public String toString() {
+ return "MaterializingFrameWriter using " + writer;
+ }
+
+}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
index 0889711..efa6cbb 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
@@ -15,7 +15,11 @@
package edu.uci.ics.asterix.metadata.feeds;
import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import edu.uci.ics.asterix.metadata.feeds.FeedRuntime.FeedRuntimeId;
+import edu.uci.ics.asterix.metadata.feeds.FeedRuntime.FeedRuntimeType;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -33,6 +37,7 @@
public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
private static final long serialVersionUID = 1L;
+ private static final Logger LOGGER = Logger.getLogger(FeedIntakeOperatorDescriptor.class.getName());
private final IAType atype;
private final FeedConnectionId feedId;
@@ -53,12 +58,24 @@
IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
throws HyracksDataException {
IFeedAdapter adapter;
+ FeedRuntimeId feedRuntimeId = new FeedRuntimeId(FeedRuntimeType.INGESTION, feedId, partition);
+ IngestionRuntime ingestionRuntime = (IngestionRuntime) FeedManager.INSTANCE.getFeedRuntime(feedRuntimeId);
try {
- adapter = (IFeedAdapter) adapterFactory.createAdapter(ctx);
+ if (ingestionRuntime == null) {
+ adapter = (IFeedAdapter) adapterFactory.createAdapter(ctx);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Beginning new feed:" + feedId);
+ }
+ } else {
+ adapter = ((IngestionRuntime) ingestionRuntime).getAdapterRuntimeManager().getFeedAdapter();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Resuming old feed:" + feedId);
+ }
+ }
} catch (Exception e) {
throw new HyracksDataException("initialization of adapter failed", e);
}
- return new FeedIntakeOperatorNodePushable(feedId, adapter, feedPolicy, partition);
+ return new FeedIntakeOperatorNodePushable(ctx, feedId, adapter, feedPolicy, partition, ingestionRuntime);
}
public FeedConnectionId getFeedId() {
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
index d6b7178..859990c 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
@@ -15,11 +15,15 @@
package edu.uci.ics.asterix.metadata.feeds;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import edu.uci.ics.asterix.metadata.feeds.AdapterRuntimeManager.State;
+import edu.uci.ics.asterix.metadata.feeds.FeedRuntime.FeedRuntimeId;
+import edu.uci.ics.asterix.metadata.feeds.FeedRuntime.FeedRuntimeType;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
@@ -36,25 +40,34 @@
private final LinkedBlockingQueue<IFeedMessage> inbox;
private final Map<String, String> feedPolicy;
private final FeedPolicyEnforcer policyEnforcer;
- private AdapterRuntimeManager adapterRuntimeMgr;
+ private FeedRuntime ingestionRuntime;
+ private final String nodeId;
- public FeedIntakeOperatorNodePushable(FeedConnectionId feedId, IFeedAdapter adapter,
- Map<String, String> feedPolicy, int partition) {
+ public FeedIntakeOperatorNodePushable(IHyracksTaskContext ctx, FeedConnectionId feedId, IFeedAdapter adapter,
+ Map<String, String> feedPolicy, int partition, IngestionRuntime ingestionRuntime) {
this.adapter = adapter;
this.partition = partition;
this.feedId = feedId;
+ this.ingestionRuntime = ingestionRuntime;
inbox = new LinkedBlockingQueue<IFeedMessage>();
this.feedPolicy = feedPolicy;
policyEnforcer = new FeedPolicyEnforcer(feedId, feedPolicy);
+ nodeId = ctx.getJobletContext().getApplicationContext().getNodeId();
}
@Override
public void initialize() throws HyracksDataException {
- adapterRuntimeMgr = FeedManager.INSTANCE.getFeedRuntimeManager(feedId, partition);
+
+ AdapterRuntimeManager adapterRuntimeMgr = null;
+ System.out.println("FEED INGESTION RUNTIME CALLED FOR " + partition);
try {
- if (adapterRuntimeMgr == null) {
- MaterializingFrameWriter mWriter = new MaterializingFrameWriter(writer);
+ if (ingestionRuntime == null) {
+ ingestionRuntime = new IngestionRuntime(feedId, partition, FeedRuntimeType.INGESTION, adapterRuntimeMgr);
+ ExecutorService executorService = FeedManager.INSTANCE.registerFeedRuntime(ingestionRuntime);
+ FeedFrameWriter mWriter = new FeedFrameWriter(writer, this, feedId, policyEnforcer, nodeId,
+ FeedRuntimeType.INGESTION, partition, executorService);
adapterRuntimeMgr = new AdapterRuntimeManager(feedId, adapter, mWriter, partition, inbox);
+
if (adapter instanceof AbstractFeedDatasourceAdapter) {
((AbstractFeedDatasourceAdapter) adapter).setFeedPolicyEnforcer(policyEnforcer);
}
@@ -62,14 +75,17 @@
LOGGER.info("Beginning new feed:" + feedId);
}
mWriter.open();
+ System.out.println("STARTING FEED INGESTION RUNTIME FOR " + partition);
adapterRuntimeMgr.start();
} else {
+ adapterRuntimeMgr = ((IngestionRuntime) ingestionRuntime).getAdapterRuntimeManager();
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Resuming old feed:" + feedId);
}
adapter = adapterRuntimeMgr.getFeedAdapter();
writer.open();
adapterRuntimeMgr.getAdapterExecutor().setWriter(writer);
+ System.out.println("RESUMED FEED INGESTION RUNTIME FOR " + partition);
adapterRuntimeMgr.setState(State.ACTIVE_INGESTION);
}
@@ -78,17 +94,23 @@
adapterRuntimeMgr.wait();
}
}
- FeedManager.INSTANCE.deRegisterFeedRuntime(adapterRuntimeMgr);
+ FeedManager.INSTANCE.deregisterFeed(feedId);
} catch (InterruptedException ie) {
if (policyEnforcer.getFeedPolicyAccessor().continueOnHardwareFailure()) {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Continuing on failure as per feed policy");
}
adapterRuntimeMgr.setState(State.INACTIVE_INGESTION);
+ writer.fail();
+ /*
+ * Do not de-register feed
+ */
} else {
+ FeedManager.INSTANCE.deregisterFeed(feedId);
throw new HyracksDataException(ie);
}
} catch (Exception e) {
+ FeedManager.INSTANCE.deregisterFeed(feedId);
e.printStackTrace();
throw new HyracksDataException(e);
} finally {
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
index 2ef9756..6e8cc63 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
@@ -14,58 +14,129 @@
*/
package edu.uci.ics.asterix.metadata.feeds;
-import java.util.ArrayList;
+import java.io.IOException;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.metadata.feeds.FeedRuntime.FeedRuntimeId;
/**
* Handle (de)registration of feeds for delivery of control messages.
*/
public class FeedManager implements IFeedManager {
+ private static final Logger LOGGER = Logger.getLogger(FeedManager.class.getName());
+
public static FeedManager INSTANCE = new FeedManager();
private FeedManager() {
}
- private Map<FeedConnectionId, List<AdapterRuntimeManager>> activeFeedRuntimeManagers = new HashMap<FeedConnectionId, List<AdapterRuntimeManager>>();
-
-
- @Override
- public synchronized void registerFeedRuntime(AdapterRuntimeManager adapterRuntimeMgr) {
- List<AdapterRuntimeManager> adpaterRuntimeMgrs = activeFeedRuntimeManagers.get(adapterRuntimeMgr.getFeedId());
- if (adpaterRuntimeMgrs == null) {
- adpaterRuntimeMgrs = new ArrayList<AdapterRuntimeManager>();
- activeFeedRuntimeManagers.put(adapterRuntimeMgr.getFeedId(), adpaterRuntimeMgrs);
- }
- adpaterRuntimeMgrs.add(adapterRuntimeMgr);
+ private Map<FeedConnectionId, SuperFeedManager> superFeedManagers = new HashMap<FeedConnectionId, SuperFeedManager>();
+ private Map<FeedConnectionId, Map<FeedRuntimeId, FeedRuntime>> feedRuntimes = new HashMap<FeedConnectionId, Map<FeedRuntimeId, FeedRuntime>>();
+ private Map<FeedConnectionId, ExecutorService> feedExecutorService = new HashMap<FeedConnectionId, ExecutorService>();
+
+ public ExecutorService getFeedExecutorService(FeedConnectionId feedId) {
+ return feedExecutorService.get(feedId);
}
@Override
- public synchronized void deRegisterFeedRuntime(AdapterRuntimeManager adapterRuntimeMgr) {
- List<AdapterRuntimeManager> adapterRuntimeMgrs = activeFeedRuntimeManagers.get(adapterRuntimeMgr.getFeedId());
- if (adapterRuntimeMgrs != null && adapterRuntimeMgrs.contains(adapterRuntimeMgr)) {
- adapterRuntimeMgrs.remove(adapterRuntimeMgr);
- }
- }
+ public void deregisterFeed(FeedConnectionId feedId) {
+ try {
+ Map<FeedRuntimeId, FeedRuntime> feedRuntimesForFeed = feedRuntimes.get(feedId);
+ if (feedRuntimesForFeed != null) {
+ feedRuntimesForFeed.clear();
+ }
- @Override
- public synchronized AdapterRuntimeManager getFeedRuntimeManager(FeedConnectionId feedId, int partition) {
- List<AdapterRuntimeManager> adapterRuntimeMgrs = activeFeedRuntimeManagers.get(feedId);
- if (adapterRuntimeMgrs != null) {
- for (AdapterRuntimeManager mgr : adapterRuntimeMgrs) {
- if (mgr.getPartition() == partition) {
- return mgr;
+ feedRuntimes.remove(feedId);
+
+ SuperFeedManager sfm = superFeedManagers.get(feedId);
+ if (sfm != null && sfm.isLocal()) {
+ sfm.stop();
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Shutdown super feed manager " + sfm);
}
}
+
+ ExecutorService executorService = feedExecutorService.remove(feedId);
+ if (executorService != null && !executorService.isShutdown()) {
+ executorService.shutdownNow();
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("unable to shutdown feed services for" + feedId);
+ }
+ }
+ }
+
+ @Override
+ public ExecutorService registerFeedRuntime(FeedRuntime feedRuntime) {
+ ExecutorService execService = feedExecutorService.get(feedRuntime.getFeedRuntimeId().getFeedId());
+ if (execService == null) {
+ execService = Executors.newCachedThreadPool();
+ feedExecutorService.put(feedRuntime.getFeedRuntimeId().getFeedId(), execService);
+ }
+
+ Map<FeedRuntimeId, FeedRuntime> feedRuntimesForFeed = feedRuntimes.get(feedRuntime.getFeedRuntimeId()
+ .getFeedId());
+ if (feedRuntimesForFeed == null) {
+ feedRuntimesForFeed = new HashMap<FeedRuntimeId, FeedRuntime>();
+ feedRuntimes.put(feedRuntime.getFeedRuntimeId().getFeedId(), feedRuntimesForFeed);
+ }
+ feedRuntimesForFeed.put(feedRuntime.getFeedRuntimeId(), feedRuntime);
+ System.out.println("REGISTERED feed runtime " + feedRuntime);
+ return execService;
+ }
+
+ @Override
+ public void deRegisterFeedRuntime(FeedRuntimeId feedRuntimeId) {
+ Map<FeedRuntimeId, FeedRuntime> feedRuntimesForFeed = feedRuntimes.get(feedRuntimeId.getFeedId());
+ if (feedRuntimesForFeed != null) {
+ FeedRuntime feedRuntime = feedRuntimesForFeed.get(feedRuntimeId);
+ if (feedRuntime != null) {
+ feedRuntimesForFeed.remove(feedRuntimeId);
+ if (feedRuntimesForFeed.isEmpty()) {
+ System.out.println("CLEARING OUT FEED RUNTIME INFO" + feedRuntimeId.getFeedId());
+ feedRuntimes.remove(feedRuntimeId.getFeedId());
+ }
+ }
+ }
+
+ }
+
+ @Override
+ public FeedRuntime getFeedRuntime(FeedRuntimeId feedRuntimeId) {
+ Map<FeedRuntimeId, FeedRuntime> feedRuntimesForFeed = feedRuntimes.get(feedRuntimeId.getFeedId());
+ if (feedRuntimesForFeed != null) {
+ return feedRuntimesForFeed.get(feedRuntimeId);
}
return null;
}
- public List<AdapterRuntimeManager> getFeedRuntimeManagers(FeedConnectionId feedId) {
- return activeFeedRuntimeManagers.get(feedId);
+ @Override
+ public void registerSuperFeedManager(FeedConnectionId feedId, SuperFeedManager sfm) {
+ superFeedManagers.put(feedId, sfm);
+ }
+
+ @Override
+ public void deregisterSuperFeedManager(FeedConnectionId feedId) {
+ SuperFeedManager sfm = superFeedManagers.remove(feedId);
+ try {
+ sfm.stop();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public SuperFeedManager getSuperFeedManager(FeedConnectionId feedId) {
+ return superFeedManagers.get(feedId);
}
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManagerElectMessage.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManagerElectMessage.java
new file mode 100644
index 0000000..103eb94
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManagerElectMessage.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.metadata.feeds;
+
+/**
+ * A feed control message containing the altered values for
+ * adapter configuration parameters. This message is dispatched
+ * to all runtime instances of the feed's adapter.
+ */
+public class FeedManagerElectMessage extends FeedMessage {
+
+ private static final long serialVersionUID = 1L;
+
+ private final SuperFeedManager superFeedMaanger;
+
+ public FeedManagerElectMessage(SuperFeedManager superFeedManager) {
+ super(MessageType.SUPER_FEED_MANAGER_ELECT, superFeedManager.getFeedConnectionId());
+ this.superFeedMaanger = superFeedManager;
+ }
+
+ @Override
+ public MessageType getMessageType() {
+ return MessageType.SUPER_FEED_MANAGER_ELECT;
+ }
+
+ public SuperFeedManager getSuperFeedMaanger() {
+ return superFeedMaanger;
+ }
+
+ @Override
+ public String toString() {
+ return superFeedMaanger.toString();
+ }
+
+}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessage.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessage.java
index ae630ac..36fcf4b 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessage.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessage.java
@@ -17,25 +17,27 @@
import edu.uci.ics.asterix.metadata.feeds.IFeedMessage;
/**
- * A control message that can be sent to the runtime instance of a
- * feed's adapter.
+ * A control message that can be sent to the runtime instance of a
+ * feed's adapter.
*/
public class FeedMessage implements IFeedMessage {
private static final long serialVersionUID = 1L;
- protected MessageType messageType;
+ protected final MessageType messageType;
+ protected final FeedConnectionId feedId;
- public FeedMessage(MessageType messageType) {
+ public FeedMessage(MessageType messageType, FeedConnectionId feedId) {
this.messageType = messageType;
+ this.feedId = feedId;
}
public MessageType getMessageType() {
return messageType;
}
- public void setMessageType(MessageType messageType) {
- this.messageType = messageType;
+ public FeedConnectionId getFeedId() {
+ return feedId;
}
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
index 5c41675..47e155c 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
@@ -18,6 +18,9 @@
import java.util.logging.Logger;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.metadata.feeds.FeedRuntime.FeedRuntimeId;
+import edu.uci.ics.asterix.metadata.feeds.FeedRuntime.FeedRuntimeType;
+import edu.uci.ics.hyracks.api.application.INCApplicationContext;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
@@ -32,36 +35,74 @@
private final FeedConnectionId feedId;
private final IFeedMessage feedMessage;
private final int partition;
+ private final IHyracksTaskContext ctx;
public FeedMessageOperatorNodePushable(IHyracksTaskContext ctx, FeedConnectionId feedId, IFeedMessage feedMessage,
int partition, int nPartitions) {
this.feedId = feedId;
this.feedMessage = feedMessage;
this.partition = partition;
+ this.ctx = ctx;
}
@Override
public void initialize() throws HyracksDataException {
try {
writer.open();
- AdapterRuntimeManager adapterRuntimeMgr = FeedManager.INSTANCE.getFeedRuntimeManager(feedId, partition);
- if (adapterRuntimeMgr != null) {
- switch (feedMessage.getMessageType()) {
- case END:
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Ending feed:" + feedId);
- }
+ FeedRuntimeId runtimeId = new FeedRuntimeId(FeedRuntimeType.INGESTION, feedId, partition);
+ FeedRuntime feedRuntime = FeedManager.INSTANCE.getFeedRuntime(runtimeId);
+ boolean ingestionLocation = feedRuntime != null;
+
+ switch (feedMessage.getMessageType()) {
+ case END:
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Ending feed:" + feedId);
+ }
+
+ if (ingestionLocation) {
+ AdapterRuntimeManager adapterRuntimeMgr = ((IngestionRuntime) feedRuntime)
+ .getAdapterRuntimeManager();
adapterRuntimeMgr.stop();
- FeedManager.INSTANCE.deRegisterFeedRuntime(adapterRuntimeMgr);
- break;
- case ALTER:
+ System.out.println("STOPPED INGESTION !!!!!!!!!!!!!!!");
+ } else {
+ System.out.println("NOT AN INGESTION LOCATION !!!!!!!!!!!!!!!");
+ }
+ FeedManager.INSTANCE.deRegisterFeedRuntime(runtimeId);
+ break;
+
+ case SUPER_FEED_MANAGER_ELECT:
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Registering SUPER Feed MGR for :" + feedId);
+ }
+ SuperFeedManager sfm = ((FeedManagerElectMessage) feedMessage).getSuperFeedMaanger();
+ synchronized (FeedManager.INSTANCE) {
+ if (FeedManager.INSTANCE.getSuperFeedManager(feedId) == null) {
+ FeedManager.INSTANCE.registerSuperFeedManager(feedId, sfm);
+ INCApplicationContext ncCtx = ctx.getJobletContext().getApplicationContext();
+ String nodeId = ncCtx.getNodeId();
+
+ if (sfm.getNodeId().equals(nodeId)) {
+ System.out.println("STARTED SUPER FEED MANAGER !!!!!!!!!!!");
+ sfm.setLocal(true);
+ sfm.start();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Started Super Feed Manager for " + feedId);
+ }
+ }
+ }
+ }
+ break;
+
+ case ALTER:
+ if (ingestionLocation) {
+ AdapterRuntimeManager adapterRuntimeMgr = ((IngestionRuntime) feedRuntime)
+ .getAdapterRuntimeManager();
adapterRuntimeMgr.getFeedAdapter().alter(
((AlterFeedMessage) feedMessage).getAlteredConfParams());
- break;
- }
- } else {
- throw new AsterixException("Unknown feed: " + feedId);
+ }
+ break;
}
+
} catch (Exception e) {
throw new HyracksDataException(e);
} finally {
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
index 9acfc63..261585d 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
@@ -1,10 +1,16 @@
package edu.uci.ics.asterix.metadata.feeds;
import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
+import edu.uci.ics.asterix.metadata.feeds.FeedRuntime.FeedRuntimeId;
+import edu.uci.ics.asterix.metadata.feeds.FeedRuntime.FeedRuntimeState;
+import edu.uci.ics.asterix.metadata.feeds.FeedRuntime.FeedRuntimeType;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.IActivity;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
@@ -14,17 +20,21 @@
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeDuplicateKeyException;
public class FeedMetaOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+
private static final Logger LOGGER = Logger.getLogger(FeedMetaOperatorDescriptor.class.getName());
private IOperatorDescriptor coreOperator;
private final FeedConnectionId feedConnectionId;
private final FeedPolicy feedPolicy;
+ private final FeedRuntimeType runtimeType;
public FeedMetaOperatorDescriptor(JobSpecification spec, FeedConnectionId feedConnectionId,
- IOperatorDescriptor coreOperatorDescriptor, FeedPolicy feedPolicy) {
+ IOperatorDescriptor coreOperatorDescriptor, FeedPolicy feedPolicy, FeedRuntimeType runtimeType) {
super(spec, coreOperatorDescriptor.getInputArity(), coreOperatorDescriptor.getOutputArity());
this.feedConnectionId = feedConnectionId;
this.feedPolicy = feedPolicy;
@@ -32,13 +42,14 @@
recordDescriptors[0] = coreOperatorDescriptor.getOutputRecordDescriptors()[0];
}
this.coreOperator = coreOperatorDescriptor;
+ this.runtimeType = runtimeType;
}
@Override
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
return new FeedMetaNodePushable(ctx, recordDescProvider, partition, nPartitions, coreOperator,
- feedConnectionId, feedPolicy);
+ feedConnectionId, feedPolicy, runtimeType);
}
@Override
@@ -50,54 +61,129 @@
private AbstractUnaryInputUnaryOutputOperatorNodePushable coreOperatorNodePushable;
private FeedPolicyEnforcer policyEnforcer;
+ private FeedRuntime feedRuntime;
+ private FeedConnectionId feedId;
+ private int partition;
+ private ByteBuffer currentBuffer;
+ private final FeedRuntimeType runtimeType;
+ private boolean resumeOldState;
+ private ExecutorService feedExecService;
+ private String nodeId;
public FeedMetaNodePushable(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
int partition, int nPartitions, IOperatorDescriptor coreOperator, FeedConnectionId feedConnectionId,
- FeedPolicy feedPolicy) throws HyracksDataException {
+ FeedPolicy feedPolicy, FeedRuntimeType runtimeType) throws HyracksDataException {
this.coreOperatorNodePushable = (AbstractUnaryInputUnaryOutputOperatorNodePushable) ((IActivity) coreOperator)
.createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
this.policyEnforcer = new FeedPolicyEnforcer(feedConnectionId, feedPolicy.getProperties());
+ this.partition = partition;
+ this.runtimeType = runtimeType;
+ this.feedId = feedConnectionId;
+ this.nodeId = ctx.getJobletContext().getApplicationContext().getNodeId();
}
@Override
public void open() throws HyracksDataException {
- coreOperatorNodePushable.setOutputFrameWriter(0, writer, recordDesc);
- coreOperatorNodePushable.open();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Core Op:" + coreOperatorNodePushable.getDisplayName() + " open ");
+ FeedRuntimeId runtimeId = new FeedRuntimeId(runtimeType, feedId, partition);
+ System.out.println("TRYING TO OBTAIN FEED RUNTIME" + runtimeId);
+ feedRuntime = FeedManager.INSTANCE.getFeedRuntime(runtimeId);
+ if (feedRuntime == null) {
+ feedRuntime = new FeedRuntime(feedId, partition, runtimeType);
+ feedExecService = FeedManager.INSTANCE.registerFeedRuntime(feedRuntime);
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Did not find a saved state, starting fresh for " + runtimeType + " node.");
+ }
+ resumeOldState = false;
+ } else {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Resuming from saved state (if any) of " + runtimeType + " node.");
+ }
+ feedExecService = FeedManager.INSTANCE.getFeedExecutorService(feedId);
+ resumeOldState = true;
}
-
+ FeedFrameWriter mWriter = new FeedFrameWriter(writer, this, feedId, policyEnforcer, nodeId, runtimeType,
+ partition, feedExecService);
+ coreOperatorNodePushable.setOutputFrameWriter(0, mWriter, recordDesc);
+ coreOperatorNodePushable.open();
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Core Op:" + coreOperatorNodePushable.getDisplayName() + " received frame ");
- }
try {
+ if (resumeOldState) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Old state " + feedRuntime.getRuntimeState().getFrame());
+ }
+ coreOperatorNodePushable.nextFrame(feedRuntime.getRuntimeState().getFrame());
+ feedRuntime.setRuntimeState(null);
+ resumeOldState = false;
+ }
coreOperatorNodePushable.nextFrame(buffer);
} catch (HyracksDataException e) {
- // log tuple
- if (policyEnforcer.getFeedPolicyAccessor().continueOnHardwareFailure()) {
+ if (policyEnforcer.getFeedPolicyAccessor().continueOnApplicationFailure()) {
+ boolean isExceptionHarmful = handleException(e.getCause());
+ if (isExceptionHarmful) {
+ // log the tuple
+ FeedRuntimeState runtimeState = new FeedRuntimeState(buffer, writer, e);
+ feedRuntime.setRuntimeState(runtimeState);
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Harmful exception (parked data) " + e);
+ }
+ } else {
+ // ignore the frame (exception is expected)
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Ignoring exception " + e);
+ }
+ }
+
} else {
throw e;
}
}
}
+ private boolean handleException(Throwable exception) {
+ if (exception instanceof BTreeDuplicateKeyException) {
+ if (resumeOldState) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Received duplicate key exception but that is possible post recovery");
+ }
+ return false;
+ } else {
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.warning("Received duplicate key exception!");
+ }
+ return true;
+ }
+ }
+ return true;
+ }
+
@Override
public void fail() throws HyracksDataException {
- if (LOGGER.isLoggable(Level.INFO)) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
LOGGER.info("Core Op:" + coreOperatorNodePushable.getDisplayName() + " fail ");
}
+ if (policyEnforcer.getFeedPolicyAccessor().continueOnHardwareFailure()) {
+ if (currentBuffer != null) {
+ FeedRuntimeState runtimeState = new FeedRuntimeState(currentBuffer, writer, null);
+ feedRuntime.setRuntimeState(runtimeState);
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Saved feed compute runtime for revivals" + feedRuntime.getFeedRuntimeId());
+ }
+ } else {
+ FeedManager.INSTANCE.deRegisterFeedRuntime(feedRuntime.getFeedRuntimeId());
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning(" No state to save, de-registered feed compute runtime "
+ + feedRuntime.getFeedRuntimeId());
+ }
+ }
+ }
coreOperatorNodePushable.fail();
}
@Override
public void close() throws HyracksDataException {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Core Op:" + coreOperatorNodePushable.getDisplayName() + " close ");
- }
coreOperatorNodePushable.close();
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedRuntime.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedRuntime.java
index cea68de..af25be2 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedRuntime.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedRuntime.java
@@ -9,10 +9,11 @@
public enum FeedRuntimeType {
INGESTION,
COMPUTE,
- STORAGE
+ STORAGE,
+ COMMIT
}
- protected FeedRuntimeId feedRuntimeId;
+ protected final FeedRuntimeId feedRuntimeId;
protected FeedRuntimeState runtimeState;
@@ -31,7 +32,7 @@
return feedRuntimeId + " " + "runtime state ? " + (runtimeState != null);
}
- private static class FeedRuntimeState {
+ public static class FeedRuntimeState {
private ByteBuffer frame;
private IFrameWriter frameWriter;
@@ -69,7 +70,7 @@
}
- private static class FeedRuntimeId {
+ public static class FeedRuntimeId {
private final FeedRuntimeType feedRuntimeType;
private final FeedConnectionId feedId;
@@ -93,6 +94,16 @@
return hashCode;
}
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof FeedRuntimeId) {
+ FeedRuntimeId oid = ((FeedRuntimeId) o);
+ return oid.getFeedId().equals(feedId) && oid.getFeedRuntimeType().equals(feedRuntimeType)
+ && oid.getPartition() == partition;
+ }
+ return false;
+ }
+
public FeedRuntimeType getFeedRuntimeType() {
return feedRuntimeType;
}
@@ -107,4 +118,16 @@
}
+ public FeedRuntimeState getRuntimeState() {
+ return runtimeState;
+ }
+
+ public void setRuntimeState(FeedRuntimeState runtimeState) {
+ this.runtimeState = runtimeState;
+ }
+
+ public FeedRuntimeId getFeedRuntimeId() {
+ return feedRuntimeId;
+ }
+
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IAdapterExecutor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IAdapterExecutor.java
index d6f6f55..3c151f3 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IAdapterExecutor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IAdapterExecutor.java
@@ -1,5 +1,7 @@
package edu.uci.ics.asterix.metadata.feeds;
+import java.util.concurrent.ExecutorService;
+
public interface IAdapterExecutor {
public void start() throws Exception;
@@ -8,4 +10,6 @@
public FeedConnectionId getFeedId();
+ public ExecutorService getFeedExecutorService();
+
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedManager.java
index c1147af..a0be4ff 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedManager.java
@@ -14,26 +14,59 @@
*/
package edu.uci.ics.asterix.metadata.feeds;
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+
+import edu.uci.ics.asterix.metadata.feeds.FeedRuntime.FeedRuntimeId;
+
/**
* Handle (de)registration of feeds for delivery of control messages.
*/
public interface IFeedManager {
/**
- * @param adapterRuntimeMgr
+ * @param feedId
+ * @return
*/
- public void registerFeedRuntime(AdapterRuntimeManager adapterRuntimeMgr);
+ public ExecutorService getFeedExecutorService(FeedConnectionId feedId);
/**
- * @param adapterRuntimeMgr
+ * @param feedRuntime
*/
- public void deRegisterFeedRuntime(AdapterRuntimeManager adapterRuntimeMgr);
+ public ExecutorService registerFeedRuntime(FeedRuntime feedRuntime);
+
+ /**
+ * @param feedRuntimeId
+ */
+ public void deRegisterFeedRuntime(FeedRuntimeId feedRuntimeId);
+
+ /**
+ * @param feedRuntimeId
+ * @return
+ */
+ public FeedRuntime getFeedRuntime(FeedRuntimeId feedRuntimeId);
/**
* @param feedId
- * @param partition
+ * @param sfm
+ */
+ public void registerSuperFeedManager(FeedConnectionId feedId, SuperFeedManager sfm);
+
+ /**
+ * @param feedId
+ */
+ public void deregisterSuperFeedManager(FeedConnectionId feedId);
+
+ /**
+ * @param feedId
* @return
*/
- public AdapterRuntimeManager getFeedRuntimeManager(FeedConnectionId feedId, int partition);
+ public SuperFeedManager getSuperFeedManager(FeedConnectionId feedId);
+
+ /**
+ * @param feedId
+ * @throws IOException
+ */
+ void deregisterFeed(FeedConnectionId feedId) throws IOException;
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedMessage.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedMessage.java
index 36a45be..1e08e8c 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedMessage.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedMessage.java
@@ -21,6 +21,7 @@
public enum MessageType {
END,
ALTER,
+ SUPER_FEED_MANAGER_ELECT
}
public MessageType getMessageType();
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ITypedAdapterFactory.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ITypedAdapterFactory.java
index 76ea93a..6faa44b 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ITypedAdapterFactory.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ITypedAdapterFactory.java
@@ -1,3 +1,17 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package edu.uci.ics.asterix.metadata.feeds;
import java.util.Map;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IngestionRuntime.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IngestionRuntime.java
index fa67dbb..37d49b4 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IngestionRuntime.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IngestionRuntime.java
@@ -1,3 +1,17 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package edu.uci.ics.asterix.metadata.feeds;
public class IngestionRuntime extends FeedRuntime {
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/MaterializingFrameWriter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/MaterializingFrameWriter.java
deleted file mode 100644
index b423b89..0000000
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/MaterializingFrameWriter.java
+++ /dev/null
@@ -1,118 +0,0 @@
-package edu.uci.ics.asterix.metadata.feeds;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-
-public class MaterializingFrameWriter implements IFrameWriter {
-
- private static final Logger LOGGER = Logger.getLogger(MaterializingFrameWriter.class.getName());
-
- private IFrameWriter writer;
-
- private List<ByteBuffer> frames = new ArrayList<ByteBuffer>();
-
- private Mode mode;
-
- public enum Mode {
- FORWARD,
- STORE
- }
-
- public MaterializingFrameWriter(IFrameWriter writer) {
- this.writer = writer;
- this.mode = Mode.FORWARD;
- }
-
- public Mode getMode() {
- return mode;
- }
-
- public void setMode(Mode newMode) throws HyracksDataException {
- if (this.mode.equals(newMode)) {
- return;
- }
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Switching to :" + newMode + " from " + this.mode);
- }
- switch (newMode) {
- case FORWARD:
- this.mode = newMode;
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Sending accumulated frames :" + frames.size());
- }
- break;
- case STORE:
- this.mode = newMode;
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Beginning to store frames :");
- LOGGER.info("Frames accumulated till now:" + frames.size());
- }
- break;
- }
-
- }
-
- public List<ByteBuffer> getStoredFrames() {
- return frames;
- }
-
- public void clear() {
- frames.clear();
- }
-
- @Override
- public void open() throws HyracksDataException {
- writer.open();
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- switch (mode) {
- case FORWARD:
- writer.nextFrame(buffer);
- if (frames.size() > 0) {
- for (ByteBuffer buf : frames) {
- System.out.println("Flusing OLD frame: " + buf);
- writer.nextFrame(buf);
- }
- }
- frames.clear();
- break;
- case STORE:
- ByteBuffer storageBuffer = ByteBuffer.allocate(buffer.capacity());
- storageBuffer.put(buffer);
- frames.add(storageBuffer);
- storageBuffer.flip();
- break;
- }
- }
-
- @Override
- public void fail() throws HyracksDataException {
- writer.fail();
- }
-
- @Override
- public void close() throws HyracksDataException {
- writer.close();
- }
-
- public IFrameWriter getWriter() {
- return writer;
- }
-
- public void setWriter(IFrameWriter writer) {
- this.writer = writer;
- }
-
- @Override
- public String toString() {
- return "MaterializingFrameWriter using " + writer;
- }
-}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java
new file mode 100644
index 0000000..dc484a7
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java
@@ -0,0 +1,213 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.nio.CharBuffer;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.om.util.AsterixRuntimeUtil;
+
+public class SuperFeedManager implements Serializable {
+
+ private static final Logger LOGGER = Logger.getLogger(SuperFeedManager.class.getName());
+
+ private static final long serialVersionUID = 1L;
+ private String host;
+
+ private final int port;
+
+ private final String nodeId;
+
+ private final FeedConnectionId feedConnectionId;
+
+ private SuperFeedManagerListener listener;
+
+ private boolean isLocal = false;
+
+ public SuperFeedManager(FeedConnectionId feedId, String nodeId, int port) throws Exception {
+ this.feedConnectionId = feedId;
+ this.nodeId = nodeId;
+ this.port = port;
+ initialize();
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public String getHost() throws Exception {
+ return host;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ private void initialize() throws Exception {
+ Map<String, Set<String>> ncs = AsterixRuntimeUtil.getNodeControllerMap();
+ for (Entry<String, Set<String>> entry : ncs.entrySet()) {
+ String ip = entry.getKey();
+ Set<String> nc = entry.getValue();
+ if (nc.contains(nodeId)) {
+ host = ip;
+ break;
+ }
+ }
+ }
+
+ public FeedConnectionId getFeedConnectionId() {
+ return feedConnectionId;
+ }
+
+ public boolean isLocal() {
+ return isLocal;
+ }
+
+ public void setLocal(boolean isLocal) {
+ this.isLocal = isLocal;
+ }
+
+ public void start() throws IOException {
+ if (listener == null) {
+ listener = new SuperFeedManagerListener(port);
+ listener.start();
+ }
+ }
+
+ public void stop() throws IOException {
+ if (listener != null) {
+ listener.stop();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return feedConnectionId + "[" + nodeId + "(" + host + ")" + ":" + port + "]";
+ }
+
+ private static class SuperFeedManagerListener implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+ private ServerSocket server;
+ private int port;
+ private LinkedBlockingQueue<String> messages;
+ private CongestionAnalyzer ca;
+
+ private ExecutorService executorService = Executors.newFixedThreadPool(10);
+
+ public SuperFeedManagerListener(int port) throws IOException {
+ this.port = port;
+ messages = new LinkedBlockingQueue<String>();
+ ca = new CongestionAnalyzer(messages);
+ executorService.execute(ca);
+ }
+
+ public void stop() {
+ executorService.shutdown();
+ }
+
+ public void start() throws IOException {
+ server = new ServerSocket(port);
+ while (true) {
+ Socket client = server.accept();
+ executorService.execute(new MessageProcessor(client, this));
+ }
+ }
+
+ public synchronized void notifyMessage(String s) {
+ messages.add(s);
+ }
+ }
+
+ private static class CongestionAnalyzer implements Runnable {
+
+ private LinkedBlockingQueue<String> messages;
+
+ public CongestionAnalyzer(LinkedBlockingQueue<String> messages) {
+ this.messages = messages;
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (true) {
+ try {
+ String message = messages.take();
+ String[] msgComp = message.split("|");
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Congestion Reported" + message);
+ }
+ } catch (InterruptedException ie) {
+ throw ie;
+ } catch (Exception e) {
+
+ }
+
+ }
+
+ } catch (InterruptedException ie) {
+ // do nothing
+ }
+ }
+ }
+
+ private static class MessageProcessor implements Runnable {
+
+ private SuperFeedManagerListener listener;
+ private Socket client;
+ private static final char EOL = (char) "\n".getBytes()[0];
+
+ public MessageProcessor(Socket client, SuperFeedManagerListener listener) {
+ this.listener = listener;
+ this.client = client;
+ }
+
+ @Override
+ public void run() {
+ CharBuffer buffer = CharBuffer.allocate(2000);
+ char ch;
+ try {
+ InputStream in = client.getInputStream();
+ ch = (char) in.read();
+ while (ch != EOL) {
+ buffer.put(ch);
+ ch = (char) in.read();
+ }
+ buffer.flip();
+ String s = new String(buffer.array());
+ listener.notifyMessage(s);
+ } catch (IOException ioe) {
+ } finally {
+ try {
+ client.close();
+ } catch (IOException ioe) {
+ // do nothing
+ }
+ }
+ }
+ }
+}
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixClusterProperties.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixClusterProperties.java
index 4943aea..9af206bb 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixClusterProperties.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixClusterProperties.java
@@ -20,6 +20,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.HashSet;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -112,7 +113,7 @@
}
return ncConfig.get(IO_DEVICES).split(",").length;
}
-
+
/**
* Returns the IO devices configured for a Node Controller
*
@@ -141,13 +142,17 @@
return cluster;
}
- public Node getAvailableSubstitutionNode() {
+ public synchronized Node getAvailableSubstitutionNode() {
List<Node> subNodes = cluster.getSubstituteNodes() == null ? null : cluster.getSubstituteNodes().getNode();
return subNodes == null || subNodes.isEmpty() ? null : subNodes.get(0);
}
public synchronized Set<String> getParticipantNodes() {
- return ncConfiguration.keySet();
+ Set<String> participantNodes = new HashSet<String>();
+ for (String pNode : ncConfiguration.keySet()) {
+ participantNodes.add(pNode);
+ }
+ return participantNodes;
}
public synchronized AlgebricksPartitionConstraint getClusterLocations() {
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/DataGenerator2.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/DataGenerator2.java
new file mode 100644
index 0000000..ad7daaf
--- /dev/null
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/DataGenerator2.java
@@ -0,0 +1,2483 @@
+package edu.uci.ics.asterix.tools.external.data;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.nio.CharBuffer;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.transform.OutputKeys;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerException;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+
+public class DataGenerator2 {
+
+ private RandomDateGenerator randDateGen;
+ private RandomNameGenerator randNameGen;
+ private RandomEmploymentGenerator randEmpGen;
+ private RandomMessageGenerator randMessageGen;
+ private RandomLocationGenerator randLocationGen;
+
+ private DistributionHandler fbDistHandler;
+ private DistributionHandler twDistHandler;
+
+ private int totalFbMessages;
+ private int numFbOnlyUsers;
+ private int totalTwMessages;
+ private int numTwOnlyUsers;
+
+ private int numCommonUsers;
+
+ private int fbUserId;
+ private int twUserId;
+
+ private int fbMessageId;
+ private int twMessageId;
+
+ private Random random = new Random();
+
+ private String commonUserFbSuffix = "_fb";
+ private String commonUserTwSuffix = "_tw";
+
+ private String outputDir;
+
+ private PartitionConfiguration partition;
+
+ private FacebookUser fbUser = new FacebookUser();
+ private TwitterUser twUser = new TwitterUser();
+
+ private FacebookMessage fbMessage = new FacebookMessage();
+ private TweetMessage twMessage = new TweetMessage();
+
+ private int duration;
+
+ private DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
+
+ public DataGenerator2(String[] args) throws Exception {
+ String controllerInstallDir = args[0];
+ String partitionConfXML = controllerInstallDir + "/output/partition-conf.xml";
+ String partitionName = args[1];
+ partition = XMLUtil.getPartitionConfiguration(partitionConfXML, partitionName);
+
+ // 1
+ randDateGen = new RandomDateGenerator(new Date(1, 1, 2005), new Date(8, 20, 2012));
+
+ String firstNameFile = controllerInstallDir + "/metadata/firstNames.txt";
+ String lastNameFile = controllerInstallDir + "/metadata/lastNames.txt";
+ String vendorFile = controllerInstallDir + "/metadata/vendors.txt";
+ String jargonFile = controllerInstallDir + "/metadata/jargon.txt";
+ String orgList = controllerInstallDir + "/metadata/org_list.txt";
+
+ randNameGen = new RandomNameGenerator(firstNameFile, lastNameFile);
+ randEmpGen = new RandomEmploymentGenerator(90, new Date(1, 1, 2000), new Date(8, 20, 2012), orgList);
+ randLocationGen = new RandomLocationGenerator(24, 49, 66, 98);
+ randMessageGen = new RandomMessageGenerator(vendorFile, jargonFile);
+
+ totalFbMessages = partition.getTargetPartition().getFbMessageIdMax()
+ - partition.getTargetPartition().getFbMessageIdMin() + 1;
+ numFbOnlyUsers = (partition.getTargetPartition().getFbUserKeyMax()
+ - partition.getTargetPartition().getFbUserKeyMin() + 1)
+ - partition.getTargetPartition().getCommonUsers();
+
+ totalTwMessages = partition.getTargetPartition().getTwMessageIdMax()
+ - partition.getTargetPartition().getTwMessageIdMin() + 1;
+ numTwOnlyUsers = (partition.getTargetPartition().getTwUserKeyMax()
+ - partition.getTargetPartition().getTwUserKeyMin() + 1)
+ - partition.getTargetPartition().getCommonUsers();
+
+ numCommonUsers = partition.getTargetPartition().getCommonUsers();
+ fbDistHandler = new DistributionHandler(totalFbMessages, 0.5, numFbOnlyUsers + numCommonUsers);
+ twDistHandler = new DistributionHandler(totalTwMessages, 0.5, numTwOnlyUsers + numCommonUsers);
+
+ fbUserId = partition.getTargetPartition().getFbUserKeyMin();
+ twUserId = partition.getTargetPartition().getTwUserKeyMin();
+
+ fbMessageId = partition.getTargetPartition().getFbMessageIdMin();
+ twMessageId = partition.getTargetPartition().getTwMessageIdMin();
+
+ outputDir = partition.getSourcePartition().getPath();
+ }
+
+ public DataGenerator2(InitializationInfo info) {
+ initialize(info);
+ }
+
+ private void generateFacebookOnlyUsers(int numFacebookUsers) throws IOException {
+ FileAppender appender = FileUtil.getFileAppender(outputDir + "/" + "fb_users.adm", true, true);
+ FileAppender messageAppender = FileUtil.getFileAppender(outputDir + "/" + "fb_message.adm", true, true);
+
+ for (int i = 0; i < numFacebookUsers; i++) {
+ getFacebookUser(null);
+ appender.appendToFile(fbUser.toString());
+ generateFacebookMessages(fbUser, messageAppender, -1);
+ }
+ appender.close();
+ messageAppender.close();
+ }
+
+ private void generateTwitterOnlyUsers(int numTwitterUsers) throws IOException {
+ FileAppender appender = FileUtil.getFileAppender(outputDir + "/" + "tw_users.adm", true, true);
+ FileAppender messageAppender = FileUtil.getFileAppender(outputDir + "/" + "tw_message.adm", true, true);
+
+ for (int i = 0; i < numTwitterUsers; i++) {
+ getTwitterUser(null);
+ appender.appendToFile(twUser.toString());
+ generateTwitterMessages(twUser, messageAppender, -1);
+ }
+ appender.close();
+ messageAppender.close();
+ }
+
+ private void generateCommonUsers() throws IOException {
+ FileAppender fbAppender = FileUtil.getFileAppender(outputDir + "/" + "fb_users.adm", true, false);
+ FileAppender twAppender = FileUtil.getFileAppender(outputDir + "/" + "tw_users.adm", true, false);
+ FileAppender fbMessageAppender = FileUtil.getFileAppender(outputDir + "/" + "fb_message.adm", true, false);
+ FileAppender twMessageAppender = FileUtil.getFileAppender(outputDir + "/" + "tw_message.adm", true, false);
+
+ for (int i = 0; i < numCommonUsers; i++) {
+ getFacebookUser(commonUserFbSuffix);
+ fbAppender.appendToFile(fbUser.toString());
+ generateFacebookMessages(fbUser, fbMessageAppender, -1);
+
+ getCorrespondingTwitterUser(fbUser);
+ twAppender.appendToFile(twUser.toString());
+ generateTwitterMessages(twUser, twMessageAppender, -1);
+ }
+
+ fbAppender.close();
+ twAppender.close();
+ fbMessageAppender.close();
+ twMessageAppender.close();
+ }
+
+ private void generateFacebookMessages(FacebookUser user, FileAppender appender, int numMsg) throws IOException {
+ Message message;
+ int numMessages = 0;
+ if (numMsg == -1) {
+ numMessages = fbDistHandler
+ .getFromDistribution(fbUserId - partition.getTargetPartition().getFbUserKeyMin());
+ }
+ for (int i = 0; i < numMessages; i++) {
+ message = randMessageGen.getNextRandomMessage();
+ Point location = randLocationGen.getRandomPoint();
+ fbMessage.reset(fbMessageId++, user.getId(), random.nextInt(totalFbMessages + 1), location, message);
+ appender.appendToFile(fbMessage.toString());
+ }
+ }
+
+ private void generateTwitterMessages(TwitterUser user, FileAppender appender, int numMsg) throws IOException {
+ Message message;
+ int numMessages = 0;
+ if (numMsg == -1) {
+ numMessages = twDistHandler
+ .getFromDistribution(twUserId - partition.getTargetPartition().getTwUserKeyMin());
+ }
+
+ for (int i = 0; i < numMessages; i++) {
+ message = randMessageGen.getNextRandomMessage();
+ Point location = randLocationGen.getRandomPoint();
+ DateTime sendTime = randDateGen.getNextRandomDatetime();
+ twMessage.reset(twMessageId + "", user, location, sendTime, message.getReferredTopics(), message);
+ twMessageId++;
+ appender.appendToFile(twMessage.toString());
+ }
+ }
+
+ public Iterator<TweetMessage> getTwitterMessageIterator() {
+ return new TweetMessageIterator(duration);
+ }
+
+ public class TweetMessageIterator implements Iterator<TweetMessage> {
+
+ private final int duration;
+ private long startTime = 0;
+
+ public TweetMessageIterator(int duration) {
+ this.duration = duration;
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (startTime == 0) {
+ startTime = System.currentTimeMillis();
+ }
+ return System.currentTimeMillis() - startTime < duration * 1000;
+ }
+
+ @Override
+ public TweetMessage next() {
+ getTwitterUser(null);
+ Message message = randMessageGen.getNextRandomMessage();
+ Point location = randLocationGen.getRandomPoint();
+ DateTime sendTime = randDateGen.getNextRandomDatetime();
+ twMessage.reset(UUID.randomUUID().toString(), twUser, location, sendTime, message.getReferredTopics(),
+ message);
+ twMessageId++;
+ if (twUserId > numTwOnlyUsers) {
+ twUserId = 1;
+ }
+ return twMessage;
+
+ }
+
+ @Override
+ public void remove() {
+ // TODO Auto-generated method stub
+
+ }
+
+ }
+
+ public static class InitializationInfo {
+ public Date startDate = new Date(1, 1, 2005);
+ public Date endDate = new Date(8, 20, 2012);
+ public String[] lastNames = DataGenerator.lastNames;
+ public String[] firstNames = DataGenerator.firstNames;
+ public String[] vendors = DataGenerator.vendors;
+ public String[] jargon = DataGenerator.jargon;
+ public String[] org_list = DataGenerator.org_list;
+ public int percentEmployed = 90;
+ public Date employmentStartDate = new Date(1, 1, 2000);
+ public Date employmentEndDate = new Date(31, 12, 2012);
+ public int totalFbMessages;
+ public int numFbOnlyUsers;
+ public int totalTwMessages;
+ public int numTwOnlyUsers = 5000;
+ public int numCommonUsers;
+ public int fbUserIdMin;
+ public int fbMessageIdMin;
+ public int twUserIdMin;
+ public int twMessageIdMin;
+ public int timeDurationInSecs = 60;
+
+ }
+
+ public void initialize(InitializationInfo info) {
+ randDateGen = new RandomDateGenerator(info.startDate, info.endDate);
+ randNameGen = new RandomNameGenerator(info.firstNames, info.lastNames);
+ randEmpGen = new RandomEmploymentGenerator(info.percentEmployed, info.employmentStartDate,
+ info.employmentEndDate, info.org_list);
+ randLocationGen = new RandomLocationGenerator(24, 49, 66, 98);
+ randMessageGen = new RandomMessageGenerator(info.vendors, info.jargon);
+ fbDistHandler = new DistributionHandler(info.totalFbMessages, 0.5, info.numFbOnlyUsers + info.numCommonUsers);
+ twDistHandler = new DistributionHandler(info.totalTwMessages, 0.5, info.numTwOnlyUsers + info.numCommonUsers);
+ fbUserId = info.fbUserIdMin;
+ twUserId = info.twUserIdMin;
+
+ fbMessageId = info.fbMessageIdMin;
+ twMessageId = info.fbMessageIdMin;
+ duration = info.timeDurationInSecs;
+ }
+
+ public static void main(String args[]) throws Exception {
+
+ String controllerInstallDir = null;
+ if (args.length < 2) {
+ printUsage();
+ System.exit(1);
+ }
+
+ DataGenerator2 dataGenerator = new DataGenerator2(args);
+ dataGenerator.generateData();
+ }
+
+ public static void printUsage() {
+ System.out.println(" Error: Invalid number of arguments ");
+ System.out.println(" Usage :" + " DataGenerator <path to configuration file> <partition name> ");
+ }
+
+ public void generateData() throws IOException {
+ generateFacebookOnlyUsers(numFbOnlyUsers);
+ generateTwitterOnlyUsers(numTwOnlyUsers);
+ generateCommonUsers();
+ System.out.println("Partition :" + partition.getTargetPartition().getName() + " finished");
+ }
+
+ public void getFacebookUser(String usernameSuffix) {
+ String suggestedName = randNameGen.getRandomName();
+ String[] nameComponents = suggestedName.split(" ");
+ String name = nameComponents[0] + nameComponents[1];
+ if (usernameSuffix != null) {
+ name = name + usernameSuffix;
+ }
+ String alias = nameComponents[0];
+ String userSince = randDateGen.getNextRandomDatetime().toString();
+ int numFriends = random.nextInt(25);
+ int[] friendIds = RandomUtil.getKFromN(numFriends, (numFbOnlyUsers + numCommonUsers));
+ Employment emp = randEmpGen.getRandomEmployment();
+ fbUser.reset(fbUserId++, alias, name, userSince, friendIds, emp);
+ }
+
+ public void getTwitterUser(String usernameSuffix) {
+ String suggestedName = randNameGen.getRandomName();
+ String[] nameComponents = suggestedName.split(" ");
+ String screenName = nameComponents[0] + nameComponents[1] + randNameGen.getRandomNameSuffix();
+ String name = suggestedName;
+ if (usernameSuffix != null) {
+ name = name + usernameSuffix;
+ }
+ int numFriends = random.nextInt((int) (100)); // draw from Zipfian
+ int statusesCount = random.nextInt(500); // draw from Zipfian
+ int followersCount = random.nextInt((int) (200));
+ twUser.reset(screenName, numFriends, statusesCount, name, followersCount);
+ twUserId++;
+ }
+
+ public void getCorrespondingTwitterUser(FacebookUser fbUser) {
+ String screenName = fbUser.getName().substring(0, fbUser.getName().lastIndexOf(commonUserFbSuffix))
+ + commonUserTwSuffix;
+ String name = screenName.split(" ")[0];
+ int numFriends = random.nextInt((int) ((numTwOnlyUsers + numCommonUsers)));
+ int statusesCount = random.nextInt(500); // draw from Zipfian
+ int followersCount = random.nextInt((int) (numTwOnlyUsers + numCommonUsers));
+ twUser.reset(screenName, numFriends, statusesCount, name, followersCount);
+ }
+
+ public static class RandomDateGenerator {
+
+ private final Date startDate;
+ private final Date endDate;
+ private final Random random = new Random();
+ private final int yearDifference;
+ private Date workingDate;
+ private Date recentDate;
+ private DateTime dateTime;
+
+ public RandomDateGenerator(Date startDate, Date endDate) {
+ this.startDate = startDate;
+ this.endDate = endDate;
+ yearDifference = endDate.getYear() - startDate.getYear() + 1;
+ workingDate = new Date();
+ recentDate = new Date();
+ dateTime = new DateTime();
+ }
+
+ public Date getStartDate() {
+ return startDate;
+ }
+
+ public Date getEndDate() {
+ return endDate;
+ }
+
+ public Date getNextRandomDate() {
+ int year = random.nextInt(yearDifference) + startDate.getYear();
+ int month;
+ int day;
+ if (year == endDate.getYear()) {
+ month = random.nextInt(endDate.getMonth()) + 1;
+ if (month == endDate.getMonth()) {
+ day = random.nextInt(endDate.getDay()) + 1;
+ } else {
+ day = random.nextInt(28) + 1;
+ }
+ } else {
+ month = random.nextInt(12) + 1;
+ day = random.nextInt(28) + 1;
+ }
+ workingDate.reset(month, day, year);
+ return workingDate;
+ }
+
+ public DateTime getNextRandomDatetime() {
+ Date randomDate = getNextRandomDate();
+ dateTime.reset(randomDate);
+ return dateTime;
+ }
+
+ public Date getNextRecentDate(Date date) {
+ int year = date.getYear()
+ + (date.getYear() == endDate.getYear() ? 0 : random.nextInt(endDate.getYear() - date.getYear()));
+ int month = (year == endDate.getYear()) ? date.getMonth() == endDate.getMonth() ? (endDate.getMonth())
+ : (date.getMonth() + random.nextInt(endDate.getMonth() - date.getMonth())) : random.nextInt(12) + 1;
+
+ int day = (year == endDate.getYear()) ? month == endDate.getMonth() ? date.getDay() == endDate.getDay() ? endDate
+ .getDay() : date.getDay() + random.nextInt(endDate.getDay() - date.getDay())
+ : random.nextInt(28) + 1
+ : random.nextInt(28) + 1;
+ recentDate.reset(month, day, year);
+ return recentDate;
+ }
+
+ public static void main(String args[]) throws Exception {
+ Date date = new Date(2, 20, 2012);
+ RandomDateGenerator dgen = new RandomDateGenerator(new Date(1, 1, 2005), new Date(8, 20, 2012));
+ while (true) {
+ Date nextDate = dgen.getNextRandomDate();
+ if (nextDate.getDay() == 0) {
+ throw new Exception("invalid date " + nextDate);
+ }
+
+ // System.out.println(" original date: " + date);
+ System.out.println(nextDate);
+ }
+ }
+ }
+
+ public static class DateTime extends Date {
+
+ private String hour = "10";
+ private String min = "10";
+ private String sec = "00";
+ private long chrononTime;
+
+ public DateTime(int month, int day, int year, String hour, String min, String sec) {
+ super(month, day, year);
+ this.hour = hour;
+ this.min = min;
+ this.sec = sec;
+ chrononTime = new java.util.Date(year, month, day, Integer.parseInt(hour), Integer.parseInt(min),
+ Integer.parseInt(sec)).getTime();
+ }
+
+ public void reset(int month, int day, int year, String hour, String min, String sec) {
+ super.setDay(month);
+ super.setDay(day);
+ super.setYear(year);
+ this.hour = hour;
+ this.min = min;
+ this.sec = sec;
+ chrononTime = new java.util.Date(year, month, day, Integer.parseInt(hour), Integer.parseInt(min),
+ Integer.parseInt(sec)).getTime();
+ }
+
+ public DateTime() {
+ }
+
+ public DateTime(Date date) {
+ super(date.getMonth(), date.getDay(), date.getYear());
+ }
+
+ public void reset(Date date) {
+ reset(date.getMonth(), date.getDay(), date.getYear());
+ }
+
+ public DateTime(Date date, int hour, int min, int sec) {
+ super(date.getMonth(), date.getDay(), date.getYear());
+ this.hour = (hour < 10) ? "0" : "" + hour;
+ this.min = (min < 10) ? "0" : "" + min;
+ this.sec = (sec < 10) ? "0" : "" + sec;
+ }
+
+ public long getChrononTime() {
+ return chrononTime;
+ }
+
+ public String getHour() {
+ return hour;
+ }
+
+ public String getMin() {
+ return min;
+ }
+
+ public String getSec() {
+ return sec;
+ }
+
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("datetime");
+ builder.append("(\"");
+ builder.append(super.getYear());
+ builder.append("-");
+ builder.append(super.getMonth() < 10 ? "0" + super.getMonth() : super.getMonth());
+ builder.append("-");
+ builder.append(super.getDay() < 10 ? "0" + super.getDay() : super.getDay());
+ builder.append("T");
+ builder.append(hour + ":" + min + ":" + sec);
+ builder.append("\")");
+ return builder.toString();
+ }
+ }
+
+ public static class Message {
+
+ private char[] message = new char[500];
+ private List<String> referredTopics;
+ private int length;
+
+ public Message(char[] m, List<String> referredTopics) {
+ System.arraycopy(m, 0, message, 0, m.length);
+ length = m.length;
+ this.referredTopics = referredTopics;
+ }
+
+ public Message() {
+ referredTopics = new ArrayList<String>();
+ length = 0;
+ }
+
+ public char[] getMessage() {
+ return message;
+ }
+
+ public List<String> getReferredTopics() {
+ return referredTopics;
+ }
+
+ public void reset(char[] m, int offset, int length, List<String> referredTopics) {
+ System.arraycopy(m, offset, message, 0, length);
+ this.length = length;
+ this.referredTopics = referredTopics;
+ }
+
+ public int getLength() {
+ return length;
+ }
+
+ public char charAt(int index) {
+ return message[index];
+ }
+
+ }
+
+ public static class Point {
+
+ private float latitude;
+ private float longitude;
+
+ public float getLatitude() {
+ return latitude;
+ }
+
+ public float getLongitude() {
+ return longitude;
+ }
+
+ public Point(float latitude, float longitude) {
+ this.latitude = latitude;
+ this.longitude = longitude;
+ }
+
+ public void reset(float latitude, float longitude) {
+ this.latitude = latitude;
+ this.longitude = longitude;
+ }
+
+ public Point() {
+ }
+
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("point(\"" + latitude + "," + longitude + "\")");
+ return builder.toString();
+ }
+ }
+
+ public static class RandomNameGenerator {
+
+ private String[] firstNames;
+ private String[] lastNames;
+
+ private final Random random = new Random();
+
+ private final String[] connectors = new String[] { "_", "#", "$", "@" };
+
+ public RandomNameGenerator(String firstNameFilePath, String lastNameFilePath) throws IOException {
+ firstNames = FileUtil.listyFile(new File(firstNameFilePath)).toArray(new String[] {});
+ lastNames = FileUtil.listyFile(new File(lastNameFilePath)).toArray(new String[] {});
+ }
+
+ public RandomNameGenerator(String[] firstNames, String[] lastNames) {
+ this.firstNames = firstNames;
+ this.lastNames = lastNames;
+ }
+
+ public String getRandomName() {
+ String name;
+ name = getSuggestedName();
+ return name;
+
+ }
+
+ private String getSuggestedName() {
+ int firstNameIndex = random.nextInt(firstNames.length);
+ int lastNameIndex = random.nextInt(lastNames.length);
+ String suggestedName = firstNames[firstNameIndex] + " " + lastNames[lastNameIndex];
+ return suggestedName;
+ }
+
+ public String getRandomNameSuffix() {
+ return connectors[random.nextInt(connectors.length)] + random.nextInt(1000);
+ }
+ }
+
+ public static class RandomMessageGenerator {
+
+ private final MessageTemplate messageTemplate;
+
+ public RandomMessageGenerator(String vendorFilePath, String jargonFilePath) throws IOException {
+ List<String> vendors = FileUtil.listyFile(new File(vendorFilePath));
+ List<String> jargon = FileUtil.listyFile(new File(jargonFilePath));
+ this.messageTemplate = new MessageTemplate(vendors, jargon);
+ }
+
+ public RandomMessageGenerator(String[] vendors, String[] jargon) {
+ List<String> vendorList = new ArrayList<String>();
+ for (String v : vendors) {
+ vendorList.add(v);
+ }
+ List<String> jargonList = new ArrayList<String>();
+ for (String j : jargon) {
+ jargonList.add(j);
+ }
+ this.messageTemplate = new MessageTemplate(vendorList, jargonList);
+ }
+
+ public Message getNextRandomMessage() {
+ return messageTemplate.getNextMessage();
+ }
+ }
+
+ public static class AbstractMessageTemplate {
+
+ protected final Random random = new Random();
+
+ protected String[] positiveVerbs = new String[] { "like", "love" };
+ protected String[] negativeVerbs = new String[] { "dislike", "hate", "can't stand" };
+
+ protected String[] negativeAdjectives = new String[] { "horrible", "bad", "terrible", "OMG" };
+ protected String[] postiveAdjectives = new String[] { "good", "awesome", "amazing", "mind-blowing" };
+
+ protected String[] otherWords = new String[] { "the", "its" };
+ }
+
+ public static class MessageTemplate extends AbstractMessageTemplate {
+
+ private List<String> vendors;
+ private List<String> jargon;
+ private CharBuffer buffer;
+ private List<String> referredTopics;
+ private Message message = new Message();
+
+ public MessageTemplate(List<String> vendors, List<String> jargon) {
+ this.vendors = vendors;
+ this.jargon = jargon;
+ buffer = CharBuffer.allocate(2500);
+ referredTopics = new ArrayList<String>();
+ }
+
+ public Message getNextMessage() {
+ buffer.position(0);
+ buffer.limit(2500);
+ referredTopics.clear();
+ boolean isPositive = random.nextBoolean();
+ String[] verbArray = isPositive ? positiveVerbs : negativeVerbs;
+ String[] adjectiveArray = isPositive ? postiveAdjectives : negativeAdjectives;
+ String verb = verbArray[random.nextInt(verbArray.length)];
+ String adjective = adjectiveArray[random.nextInt(adjectiveArray.length)];
+
+ buffer.put(" ");
+ buffer.put(verb);
+ buffer.put(" ");
+ String vendor = vendors.get(random.nextInt(vendors.size()));
+ referredTopics.add(vendor);
+ buffer.append(vendor);
+ buffer.append(" ");
+ buffer.append(otherWords[random.nextInt(otherWords.length)]);
+ buffer.append(" ");
+ String jargonTerm = jargon.get(random.nextInt(jargon.size()));
+ referredTopics.add(jargonTerm);
+ buffer.append(jargonTerm);
+ buffer.append(" is ");
+ buffer.append(adjective);
+ if (random.nextBoolean()) {
+ buffer.append(isPositive ? ":)" : ":(");
+ }
+
+ buffer.flip();
+ message.reset(buffer.array(), 0, buffer.limit(), referredTopics);
+ return message;
+ }
+ }
+
+ public static class RandomUtil {
+
+ public static Random random = new Random();
+
+ public static int[] getKFromN(int k, int n) {
+ int[] result = new int[k];
+ int cnt = 0;
+ HashSet<Integer> values = new HashSet<Integer>();
+ while (cnt < k) {
+ int val = random.nextInt(n + 1);
+ if (values.contains(val)) {
+ continue;
+ }
+
+ result[cnt++] = val;
+ values.add(val);
+ }
+ return result;
+ }
+ }
+
+ public static class FileUtil {
+
+ public static List<String> listyFile(File file) throws IOException {
+
+ BufferedReader reader = new BufferedReader(new FileReader(file));
+ String line;
+ List<String> list = new ArrayList<String>();
+ while (true) {
+ line = reader.readLine();
+ if (line == null) {
+ break;
+ }
+ list.add(line);
+ }
+ return list;
+ }
+
+ public static FileAppender getFileAppender(String filePath, boolean createIfNotExists, boolean overwrite)
+ throws IOException {
+ return new FileAppender(filePath, createIfNotExists, overwrite);
+ }
+ }
+
+ public static class FileAppender {
+
+ private final BufferedWriter writer;
+
+ public FileAppender(String filePath, boolean createIfNotExists, boolean overwrite) throws IOException {
+ File file = new File(filePath);
+ if (!file.exists()) {
+ if (createIfNotExists) {
+ new File(file.getParent()).mkdirs();
+ } else {
+ throw new IOException("path " + filePath + " does not exists");
+ }
+ }
+ this.writer = new BufferedWriter(new FileWriter(file, !overwrite));
+ }
+
+ public void appendToFile(String content) throws IOException {
+ writer.append(content);
+ writer.append("\n");
+ }
+
+ public void close() throws IOException {
+ writer.close();
+ }
+ }
+
+ public static class RandomEmploymentGenerator {
+
+ private final int percentEmployed;
+ private final Random random = new Random();
+ private final RandomDateGenerator randDateGen;
+ private final List<String> organizations;
+ private Employment emp;
+
+ public RandomEmploymentGenerator(int percentEmployed, Date beginEmpDate, Date endEmpDate, String orgListPath)
+ throws IOException {
+ this.percentEmployed = percentEmployed;
+ this.randDateGen = new RandomDateGenerator(beginEmpDate, endEmpDate);
+ organizations = FileUtil.listyFile(new File(orgListPath));
+ emp = new Employment();
+ }
+
+ public RandomEmploymentGenerator(int percentEmployed, Date beginEmpDate, Date endEmpDate, String[] orgList) {
+ this.percentEmployed = percentEmployed;
+ this.randDateGen = new RandomDateGenerator(beginEmpDate, endEmpDate);
+ organizations = new ArrayList<String>();
+ for (String org : orgList) {
+ organizations.add(org);
+ }
+ emp = new Employment();
+ }
+
+ public Employment getRandomEmployment() {
+ int empployed = random.nextInt(100) + 1;
+ boolean isEmployed = false;
+ if (empployed <= percentEmployed) {
+ isEmployed = true;
+ }
+ Date startDate = randDateGen.getNextRandomDate();
+ Date endDate = null;
+ if (!isEmployed) {
+ endDate = randDateGen.getNextRecentDate(startDate);
+ }
+ String org = organizations.get(random.nextInt(organizations.size()));
+ emp.reset(org, startDate, endDate);
+ return emp;
+ }
+ }
+
+ public static class RandomLocationGenerator {
+
+ private Random random = new Random();
+
+ private final int beginLat;
+ private final int endLat;
+ private final int beginLong;
+ private final int endLong;
+
+ private Point point;
+
+ public RandomLocationGenerator(int beginLat, int endLat, int beginLong, int endLong) {
+ this.beginLat = beginLat;
+ this.endLat = endLat;
+ this.beginLong = beginLong;
+ this.endLong = endLong;
+ this.point = new Point();
+ }
+
+ public Point getRandomPoint() {
+ int latMajor = beginLat + random.nextInt(endLat - beginLat);
+ int latMinor = random.nextInt(100);
+ float latitude = latMajor + ((float) latMinor) / 100;
+
+ int longMajor = beginLong + random.nextInt(endLong - beginLong);
+ int longMinor = random.nextInt(100);
+ float longitude = longMajor + ((float) longMinor) / 100;
+
+ point.reset(latitude, longitude);
+ return point;
+ }
+
+ }
+
+ public static class PartitionConfiguration {
+
+ private final TargetPartition targetPartition;
+ private final SourcePartition sourcePartition;
+
+ public PartitionConfiguration(SourcePartition sourcePartition, TargetPartition targetPartition) {
+ this.sourcePartition = sourcePartition;
+ this.targetPartition = targetPartition;
+ }
+
+ public TargetPartition getTargetPartition() {
+ return targetPartition;
+ }
+
+ public SourcePartition getSourcePartition() {
+ return sourcePartition;
+ }
+
+ }
+
+ public static class SourcePartition {
+
+ private final String name;
+ private final String host;
+ private final String path;
+
+ public SourcePartition(String name, String host, String path) {
+ this.name = name;
+ this.host = host;
+ this.path = path;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public String getPath() {
+ return path;
+ }
+ }
+
+ public static class TargetPartition {
+ private final String name;
+ private final String host;
+ private final String path;
+ private final int fbUserKeyMin;
+ private final int fbUserKeyMax;
+ private final int twUserKeyMin;
+ private final int twUserKeyMax;
+ private final int fbMessageIdMin;
+ private final int fbMessageIdMax;
+ private final int twMessageIdMin;
+ private final int twMessageIdMax;
+ private final int commonUsers;
+
+ public TargetPartition(String partitionName, String host, String path, int fbUserKeyMin, int fbUserKeyMax,
+ int twUserKeyMin, int twUserKeyMax, int fbMessageIdMin, int fbMessageIdMax, int twMessageIdMin,
+ int twMessageIdMax, int commonUsers) {
+ this.name = partitionName;
+ this.host = host;
+ this.path = path;
+ this.fbUserKeyMin = fbUserKeyMin;
+ this.fbUserKeyMax = fbUserKeyMax;
+ this.twUserKeyMin = twUserKeyMin;
+ this.twUserKeyMax = twUserKeyMax;
+ this.twMessageIdMin = twMessageIdMin;
+ this.twMessageIdMax = twMessageIdMax;
+ this.fbMessageIdMin = fbMessageIdMin;
+ this.fbMessageIdMax = fbMessageIdMax;
+ this.commonUsers = commonUsers;
+ }
+
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append(name);
+ builder.append(" ");
+ builder.append(host);
+ builder.append("\n");
+ builder.append(path);
+ builder.append("\n");
+ builder.append("fbUser:key:min");
+ builder.append(fbUserKeyMin);
+
+ builder.append("\n");
+ builder.append("fbUser:key:max");
+ builder.append(fbUserKeyMax);
+
+ builder.append("\n");
+ builder.append("twUser:key:min");
+ builder.append(twUserKeyMin);
+
+ builder.append("\n");
+ builder.append("twUser:key:max");
+ builder.append(twUserKeyMax);
+
+ builder.append("\n");
+ builder.append("fbMessage:key:min");
+ builder.append(fbMessageIdMin);
+
+ builder.append("\n");
+ builder.append("fbMessage:key:max");
+ builder.append(fbMessageIdMax);
+
+ builder.append("\n");
+ builder.append("twMessage:key:min");
+ builder.append(twMessageIdMin);
+
+ builder.append("\n");
+ builder.append("twMessage:key:max");
+ builder.append(twMessageIdMax);
+
+ builder.append("\n");
+ builder.append("twMessage:key:max");
+ builder.append(twMessageIdMax);
+
+ builder.append("\n");
+ builder.append("commonUsers");
+ builder.append(commonUsers);
+
+ return new String(builder);
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public int getFbUserKeyMin() {
+ return fbUserKeyMin;
+ }
+
+ public int getFbUserKeyMax() {
+ return fbUserKeyMax;
+ }
+
+ public int getTwUserKeyMin() {
+ return twUserKeyMin;
+ }
+
+ public int getTwUserKeyMax() {
+ return twUserKeyMax;
+ }
+
+ public int getFbMessageIdMin() {
+ return fbMessageIdMin;
+ }
+
+ public int getFbMessageIdMax() {
+ return fbMessageIdMax;
+ }
+
+ public int getTwMessageIdMin() {
+ return twMessageIdMin;
+ }
+
+ public int getTwMessageIdMax() {
+ return twMessageIdMax;
+ }
+
+ public int getCommonUsers() {
+ return commonUsers;
+ }
+
+ public String getPath() {
+ return path;
+ }
+ }
+
+ public static class Employment {
+
+ private String organization;
+ private Date startDate;
+ private Date endDate;
+
+ public Employment(String organization, Date startDate, Date endDate) {
+ this.organization = organization;
+ this.startDate = startDate;
+ this.endDate = endDate;
+ }
+
+ public Employment() {
+ }
+
+ public void reset(String organization, Date startDate, Date endDate) {
+ this.organization = organization;
+ this.startDate = startDate;
+ this.endDate = endDate;
+ }
+
+ public String getOrganization() {
+ return organization;
+ }
+
+ public Date getStartDate() {
+ return startDate;
+ }
+
+ public Date getEndDate() {
+ return endDate;
+ }
+
+ public String toString() {
+ StringBuilder builder = new StringBuilder("");
+ builder.append("{");
+ builder.append("\"organization-name\":");
+ builder.append("\"" + organization + "\"");
+ builder.append(",");
+ builder.append("\"start-date\":");
+ builder.append(startDate);
+ if (endDate != null) {
+ builder.append(",");
+ builder.append("\"end-date\":");
+ builder.append(endDate);
+ }
+ builder.append("}");
+ return new String(builder);
+ }
+
+ }
+
+ public static class FacebookMessage {
+
+ private int messageId;
+ private int authorId;
+ private int inResponseTo;
+ private Point senderLocation;
+ private Message message;
+
+ public int getMessageId() {
+ return messageId;
+ }
+
+ public int getAuthorID() {
+ return authorId;
+ }
+
+ public Point getSenderLocation() {
+ return senderLocation;
+ }
+
+ public Message getMessage() {
+ return message;
+ }
+
+ public int getInResponseTo() {
+ return inResponseTo;
+ }
+
+ public FacebookMessage() {
+
+ }
+
+ public FacebookMessage(int messageId, int authorId, int inResponseTo, Point senderLocation, Message message) {
+ this.messageId = messageId;
+ this.authorId = authorId;
+ this.inResponseTo = inResponseTo;
+ this.senderLocation = senderLocation;
+ this.message = message;
+ }
+
+ public void reset(int messageId, int authorId, int inResponseTo, Point senderLocation, Message message) {
+ this.messageId = messageId;
+ this.authorId = authorId;
+ this.inResponseTo = inResponseTo;
+ this.senderLocation = senderLocation;
+ this.message = message;
+ }
+
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("{");
+ builder.append("\"message-id\":");
+ builder.append(messageId);
+ builder.append(",");
+ builder.append("\"author-id\":");
+ builder.append(authorId);
+ builder.append(",");
+ builder.append("\"in-response-to\":");
+ builder.append(inResponseTo);
+ builder.append(",");
+ builder.append("\"sender-location\":");
+ builder.append(senderLocation);
+ builder.append(",");
+ builder.append("\"message\":");
+ builder.append("\"");
+ for (int i = 0; i < message.getLength(); i++) {
+ builder.append(message.charAt(i));
+ }
+ builder.append("\"");
+ builder.append("}");
+ return new String(builder);
+ }
+ }
+
+ public static class FacebookUser {
+
+ private int id;
+ private String alias;
+ private String name;
+ private String userSince;
+ private int[] friendIds;
+ private Employment employment;
+
+ public FacebookUser() {
+
+ }
+
+ public FacebookUser(int id, String alias, String name, String userSince, int[] friendIds, Employment employment) {
+ this.id = id;
+ this.alias = alias;
+ this.name = name;
+ this.userSince = userSince;
+ this.friendIds = friendIds;
+ this.employment = employment;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ public String getAlias() {
+ return alias;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getUserSince() {
+ return userSince;
+ }
+
+ public int[] getFriendIds() {
+ return friendIds;
+ }
+
+ public Employment getEmployment() {
+ return employment;
+ }
+
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("{");
+ builder.append("\"id\":" + id);
+ builder.append(",");
+ builder.append("\"alias\":" + "\"" + alias + "\"");
+ builder.append(",");
+ builder.append("\"name\":" + "\"" + name + "\"");
+ builder.append(",");
+ builder.append("\"user-since\":" + userSince);
+ builder.append(",");
+ builder.append("\"friend-ids\":");
+ builder.append("{{");
+ for (int i = 0; i < friendIds.length; i++) {
+ builder.append(friendIds[i]);
+ builder.append(",");
+ }
+ if (friendIds.length > 0) {
+ builder.deleteCharAt(builder.lastIndexOf(","));
+ }
+ builder.append("}}");
+ builder.append(",");
+ builder.append("\"employment\":");
+ builder.append("[");
+ builder.append(employment.toString());
+ builder.append("]");
+ builder.append("}");
+ return builder.toString();
+ }
+
+ public void setId(int id) {
+ this.id = id;
+ }
+
+ public void setAlias(String alias) {
+ this.alias = alias;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public void setUserSince(String userSince) {
+ this.userSince = userSince;
+ }
+
+ public void setFriendIds(int[] friendIds) {
+ this.friendIds = friendIds;
+ }
+
+ public void setEmployment(Employment employment) {
+ this.employment = employment;
+ }
+
+ public void reset(int id, String alias, String name, String userSince, int[] friendIds, Employment employment) {
+ this.id = id;
+ this.alias = alias;
+ this.name = name;
+ this.userSince = userSince;
+ this.friendIds = friendIds;
+ this.employment = employment;
+ }
+ }
+
+ public static class TweetMessage {
+
+ private String tweetid;
+ private TwitterUser user;
+ private Point senderLocation;
+ private DateTime sendTime;
+ private List<String> referredTopics;
+ private Message messageText;
+
+ public TweetMessage() {
+
+ }
+
+ public TweetMessage(String tweetid, TwitterUser user, Point senderLocation, DateTime sendTime,
+ List<String> referredTopics, Message messageText) {
+ this.tweetid = tweetid;
+ this.user = user;
+ this.senderLocation = senderLocation;
+ this.sendTime = sendTime;
+ this.referredTopics = referredTopics;
+ this.messageText = messageText;
+ }
+
+ public void reset(String tweetid, TwitterUser user, Point senderLocation, DateTime sendTime,
+ List<String> referredTopics, Message messageText) {
+ this.tweetid = tweetid;
+ this.user = user;
+ this.senderLocation = senderLocation;
+ this.sendTime = sendTime;
+ this.referredTopics = referredTopics;
+ this.messageText = messageText;
+ }
+
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("{");
+ builder.append("\"tweetid\":");
+ builder.append("\"" + tweetid + "\"");
+ builder.append(",");
+ builder.append("\"user\":");
+ builder.append(user);
+ builder.append(",");
+ builder.append("\"sender-location\":");
+ builder.append(senderLocation);
+ builder.append(",");
+ builder.append("\"send-time\":");
+ builder.append(sendTime);
+ builder.append(",");
+ builder.append("\"referred-topics\":");
+ builder.append("{{");
+ for (String topic : referredTopics) {
+ builder.append("\"" + topic + "\"");
+ builder.append(",");
+ }
+ if (referredTopics.size() > 0) {
+ builder.deleteCharAt(builder.lastIndexOf(","));
+ }
+ builder.append("}}");
+ builder.append(",");
+ builder.append("\"message-text\":");
+ builder.append("\"");
+ for (int i = 0; i < messageText.getLength(); i++) {
+ builder.append(messageText.charAt(i));
+ }
+ builder.append("\"");
+ builder.append("}");
+ return new String(builder);
+ }
+
+ public String getTweetid() {
+ return tweetid;
+ }
+
+ public void setTweetid(String tweetid) {
+ this.tweetid = tweetid;
+ }
+
+ public TwitterUser getUser() {
+ return user;
+ }
+
+ public void setUser(TwitterUser user) {
+ this.user = user;
+ }
+
+ public Point getSenderLocation() {
+ return senderLocation;
+ }
+
+ public void setSenderLocation(Point senderLocation) {
+ this.senderLocation = senderLocation;
+ }
+
+ public DateTime getSendTime() {
+ return sendTime;
+ }
+
+ public void setSendTime(DateTime sendTime) {
+ this.sendTime = sendTime;
+ }
+
+ public List<String> getReferredTopics() {
+ return referredTopics;
+ }
+
+ public void setReferredTopics(List<String> referredTopics) {
+ this.referredTopics = referredTopics;
+ }
+
+ public Message getMessageText() {
+ return messageText;
+ }
+
+ public void setMessageText(Message messageText) {
+ this.messageText = messageText;
+ }
+
+ }
+
+ public static class TwitterUser {
+
+ private String screenName;
+ private String lang = "en";
+ private int friendsCount;
+ private int statusesCount;
+ private String name;
+ private int followersCount;
+
+ public TwitterUser() {
+
+ }
+
+ public TwitterUser(String screenName, int friendsCount, int statusesCount, String name, int followersCount) {
+ this.screenName = screenName;
+ this.friendsCount = friendsCount;
+ this.statusesCount = statusesCount;
+ this.name = name;
+ this.followersCount = followersCount;
+ }
+
+ public void reset(String screenName, int friendsCount, int statusesCount, String name, int followersCount) {
+ this.screenName = screenName;
+ this.friendsCount = friendsCount;
+ this.statusesCount = statusesCount;
+ this.name = name;
+ this.followersCount = followersCount;
+ }
+
+ public String getScreenName() {
+ return screenName;
+ }
+
+ public int getFriendsCount() {
+ return friendsCount;
+ }
+
+ public int getStatusesCount() {
+ return statusesCount;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public int getFollowersCount() {
+ return followersCount;
+ }
+
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("{");
+ builder.append("\"screen-name\":" + "\"" + screenName + "\"");
+ builder.append(",");
+ builder.append("\"lang\":" + "\"" + lang + "\"");
+ builder.append(",");
+ builder.append("\"friends_count\":" + friendsCount);
+ builder.append(",");
+ builder.append("\"statuses_count\":" + statusesCount);
+ builder.append(",");
+ builder.append("\"name\":" + "\"" + name + "\"");
+ builder.append(",");
+ builder.append("\"followers_count\":" + followersCount);
+ builder.append("}");
+ return builder.toString();
+ }
+
+ }
+
+ public static class DistributionHandler {
+
+ private final ZipfGenerator zipfGen;
+ private final int totalUsers;
+ private final int totalMessages;
+ private Random random = new Random();
+
+ public DistributionHandler(int totalMessages, double skew, int totalNumUsers) {
+ zipfGen = new ZipfGenerator(totalMessages, skew);
+ totalUsers = totalNumUsers;
+ this.totalMessages = totalMessages;
+ }
+
+ public int getFromDistribution(int rank) {
+ double prob = zipfGen.getProbability(rank);
+ int numMessages = (int) (prob * totalMessages);
+ return numMessages;
+ }
+
+ public static void main(String args[]) {
+ int totalMessages = 1000 * 4070;
+ double skew = 0.5;
+ int totalUsers = 4070;
+ DistributionHandler d = new DistributionHandler(totalMessages, skew, totalUsers);
+ int sum = 0;
+ for (int i = totalUsers; i >= 1; i--) {
+ float contrib = d.getFromDistribution(i);
+ sum += contrib;
+ System.out.println(i + ":" + contrib);
+ }
+
+ System.out.println("SUM" + ":" + sum);
+
+ }
+ }
+
+ public static class ZipfGenerator {
+
+ private Random rnd = new Random(System.currentTimeMillis());
+ private int size;
+ private double skew;
+ private double bottom = 0;
+
+ public ZipfGenerator(int size, double skew) {
+ this.size = size;
+ this.skew = skew;
+ for (int i = 1; i < size; i++) {
+ this.bottom += (1 / Math.pow(i, this.skew));
+ }
+ }
+
+ // the next() method returns an rank id. The frequency of returned rank
+ // ids are follows Zipf distribution.
+ public int next() {
+ int rank;
+ double friquency = 0;
+ double dice;
+ rank = rnd.nextInt(size);
+ friquency = (1.0d / Math.pow(rank, this.skew)) / this.bottom;
+ dice = rnd.nextDouble();
+ while (!(dice < friquency)) {
+ rank = rnd.nextInt(size);
+ friquency = (1.0d / Math.pow(rank, this.skew)) / this.bottom;
+ dice = rnd.nextDouble();
+ }
+ return rank;
+ }
+
+ // This method returns a probability that the given rank occurs.
+ public double getProbability(int rank) {
+ return (1.0d / Math.pow(rank, this.skew)) / this.bottom;
+ }
+
+ public static void main(String[] args) throws IOException {
+ int total = (int) (3.7 * 1000 * 1000);
+ int skew = 2;
+ int numUsers = 1000 * 1000;
+ /*
+ * if (args.length != 2) { System.out.println("usage:" +
+ * "./zipf size skew"); System.exit(-1); }
+ */
+ BufferedWriter buf = new BufferedWriter(new FileWriter(new File("/tmp/zip_output")));
+ ZipfGenerator zipf = new ZipfGenerator(total, skew);
+ double sum = 0;
+ for (int i = 1; i <= numUsers; i++) {
+ double prob = zipf.getProbability(i);
+ double contribution = (double) (prob * total);
+ String contrib = i + ":" + contribution;
+ buf.write(contrib);
+ buf.write("\n");
+ System.out.println(contrib);
+ sum += contribution;
+ }
+ System.out.println("sum is :" + sum);
+ }
+ }
+
+ public static class PartitionElement implements ILibraryElement {
+ private final String name;
+ private final String host;
+ private final int fbUserKeyMin;
+ private final int fbUserKeyMax;
+ private final int twUserKeyMin;
+ private final int twUserKeyMax;
+ private final int fbMessageIdMin;
+ private final int fbMessageIdMax;
+ private final int twMessageIdMin;
+ private final int twMessageIdMax;
+
+ public PartitionElement(String partitionName, String host, int fbUserKeyMin, int fbUserKeyMax,
+ int twUserKeyMin, int twUserKeyMax, int fbMessageIdMin, int fbMessageIdMax, int twMessageIdMin,
+ int twMessageIdMax) {
+ this.name = partitionName;
+ this.host = host;
+ this.fbUserKeyMin = fbUserKeyMin;
+ this.fbUserKeyMax = fbUserKeyMax;
+ this.twUserKeyMin = twUserKeyMax;
+ this.twUserKeyMax = twUserKeyMax;
+ this.twMessageIdMin = twMessageIdMin;
+ this.twMessageIdMax = twMessageIdMax;
+ this.fbMessageIdMin = fbMessageIdMin;
+ this.fbMessageIdMax = fbMessageIdMax;
+ }
+
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append(name);
+ builder.append(" ");
+ builder.append(host);
+ builder.append("\n");
+ builder.append("fbUser:key:min");
+ builder.append(fbUserKeyMin);
+
+ builder.append("\n");
+ builder.append("fbUser:key:max");
+ builder.append(fbUserKeyMax);
+
+ builder.append("\n");
+ builder.append("twUser:key:min");
+ builder.append(twUserKeyMin);
+
+ builder.append("\n");
+ builder.append("twUser:key:max");
+ builder.append(twUserKeyMax);
+
+ builder.append("\n");
+ builder.append("fbMessage:key:min");
+ builder.append(fbMessageIdMin);
+
+ builder.append("\n");
+ builder.append("fbMessage:key:max");
+ builder.append(fbMessageIdMax);
+
+ builder.append("\n");
+ builder.append("twMessage:key:min");
+ builder.append(twMessageIdMin);
+
+ builder.append("\n");
+ builder.append("twMessage:key:max");
+ builder.append(twMessageIdMax);
+
+ builder.append("\n");
+ builder.append("twMessage:key:max");
+ builder.append(twUserKeyMin);
+
+ return new String(builder);
+ }
+
+ @Override
+ public String getName() {
+ return "Partition";
+ }
+
+ }
+
+ interface ILibraryElement {
+
+ public enum ElementType {
+ PARTITION
+ }
+
+ public String getName();
+
+ }
+
+ public static class Configuration {
+
+ private final float numMB;
+ private final String unit;
+
+ private final List<SourcePartition> sourcePartitions;
+ private List<TargetPartition> targetPartitions;
+
+ public Configuration(float numMB, String unit, List<SourcePartition> partitions) throws IOException {
+ this.numMB = numMB;
+ this.unit = unit;
+ this.sourcePartitions = partitions;
+
+ }
+
+ public float getNumMB() {
+ return numMB;
+ }
+
+ public String getUnit() {
+ return unit;
+ }
+
+ public List<SourcePartition> getSourcePartitions() {
+ return sourcePartitions;
+ }
+
+ public List<TargetPartition> getTargetPartitions() {
+ return targetPartitions;
+ }
+
+ public void setTargetPartitions(List<TargetPartition> targetPartitions) {
+ this.targetPartitions = targetPartitions;
+ }
+
+ }
+
+ public static class XMLUtil {
+
+ public static void writeToXML(Configuration conf, String filePath) throws IOException,
+ ParserConfigurationException, TransformerException {
+
+ DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
+ DocumentBuilder docBuilder = docFactory.newDocumentBuilder();
+
+ // root elements
+ Document doc = docBuilder.newDocument();
+ Element rootElement = doc.createElement("Partitions");
+ doc.appendChild(rootElement);
+
+ int index = 0;
+ for (TargetPartition partition : conf.getTargetPartitions()) {
+ writePartitionElement(conf.getSourcePartitions().get(index), partition, rootElement, doc);
+ }
+
+ TransformerFactory transformerFactory = TransformerFactory.newInstance();
+ Transformer transformer = transformerFactory.newTransformer();
+
+ transformer.setOutputProperty(OutputKeys.ENCODING, "utf-8");
+ transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no");
+ transformer.setOutputProperty(OutputKeys.INDENT, "yes");
+ transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "2");
+
+ DOMSource source = new DOMSource(doc);
+ StreamResult result = new StreamResult(new File(filePath));
+
+ transformer.transform(source, result);
+
+ }
+
+ public static void writePartitionInfo(Configuration conf, String filePath) throws IOException {
+ BufferedWriter bw = new BufferedWriter(new FileWriter(filePath));
+ for (SourcePartition sp : conf.getSourcePartitions()) {
+ bw.write(sp.getHost() + ":" + sp.getName() + ":" + sp.getPath());
+ bw.write("\n");
+ }
+ bw.close();
+ }
+
+ public static Document getDocument(String filePath) throws Exception {
+ File inputFile = new File(filePath);
+ DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
+ DocumentBuilder db = dbf.newDocumentBuilder();
+ Document doc = db.parse(inputFile);
+ doc.getDocumentElement().normalize();
+ return doc;
+ }
+
+ public static Configuration getConfiguration(String filePath) throws Exception {
+ Configuration conf = getConfiguration(getDocument(filePath));
+ PartitionMetrics metrics = new PartitionMetrics(conf.getNumMB(), conf.getUnit(), conf.getSourcePartitions()
+ .size());
+ List<TargetPartition> targetPartitions = getTargetPartitions(metrics, conf.getSourcePartitions());
+ conf.setTargetPartitions(targetPartitions);
+ return conf;
+ }
+
+ public static Configuration getConfiguration(Document document) throws IOException {
+ Element rootEle = document.getDocumentElement();
+ NodeList nodeList = rootEle.getChildNodes();
+ float size = Float.parseFloat(getStringValue((Element) nodeList, "size"));
+ String unit = getStringValue((Element) nodeList, "unit");
+ List<SourcePartition> sourcePartitions = getSourcePartitions(document);
+ return new Configuration(size, unit, sourcePartitions);
+ }
+
+ public static List<SourcePartition> getSourcePartitions(Document document) {
+ Element rootEle = document.getDocumentElement();
+ NodeList nodeList = rootEle.getElementsByTagName("partition");
+ List<SourcePartition> sourcePartitions = new ArrayList<SourcePartition>();
+ for (int i = 0; i < nodeList.getLength(); i++) {
+ Node node = nodeList.item(i);
+ sourcePartitions.add(getSourcePartition((Element) node));
+ }
+ return sourcePartitions;
+ }
+
+ public static SourcePartition getSourcePartition(Element functionElement) {
+ String name = getStringValue(functionElement, "name");
+ String host = getStringValue(functionElement, "host");
+ String path = getStringValue(functionElement, "path");
+ SourcePartition sp = new SourcePartition(name, host, path);
+ return sp;
+ }
+
+ public static String getStringValue(Element element, String tagName) {
+ String textValue = null;
+ NodeList nl = element.getElementsByTagName(tagName);
+ if (nl != null && nl.getLength() > 0) {
+ Element el = (Element) nl.item(0);
+ textValue = el.getFirstChild().getNodeValue();
+ }
+ return textValue;
+ }
+
+ public static PartitionConfiguration getPartitionConfiguration(String filePath, String partitionName)
+ throws Exception {
+ PartitionConfiguration pconf = getPartitionConfigurations(getDocument(filePath), partitionName);
+ return pconf;
+ }
+
+ public static PartitionConfiguration getPartitionConfigurations(Document document, String partitionName)
+ throws IOException {
+
+ Element rootEle = document.getDocumentElement();
+ NodeList nodeList = rootEle.getElementsByTagName("Partition");
+
+ for (int i = 0; i < nodeList.getLength(); i++) {
+ Node node = nodeList.item(i);
+ Element nodeElement = (Element) node;
+ String name = getStringValue(nodeElement, "name");
+ if (!name.equalsIgnoreCase(partitionName)) {
+ continue;
+ }
+ String host = getStringValue(nodeElement, "host");
+ String path = getStringValue(nodeElement, "path");
+
+ String fbUserKeyMin = getStringValue(nodeElement, "fbUserKeyMin");
+ String fbUserKeyMax = getStringValue(nodeElement, "fbUserKeyMax");
+ String twUserKeyMin = getStringValue(nodeElement, "twUserKeyMin");
+ String twUserKeyMax = getStringValue(nodeElement, "twUserKeyMax");
+ String fbMessageKeyMin = getStringValue(nodeElement, "fbMessageKeyMin");
+
+ String fbMessageKeyMax = getStringValue(nodeElement, "fbMessageKeyMax");
+ String twMessageKeyMin = getStringValue(nodeElement, "twMessageKeyMin");
+ String twMessageKeyMax = getStringValue(nodeElement, "twMessageKeyMax");
+ String numCommonUsers = getStringValue(nodeElement, "numCommonUsers");
+
+ SourcePartition sp = new SourcePartition(name, host, path);
+
+ TargetPartition tp = new TargetPartition(partitionName, host, path, Integer.parseInt(fbUserKeyMin),
+ Integer.parseInt(fbUserKeyMax), Integer.parseInt(twUserKeyMin), Integer.parseInt(twUserKeyMax),
+ Integer.parseInt(fbMessageKeyMin), Integer.parseInt(fbMessageKeyMax),
+ Integer.parseInt(twMessageKeyMin), Integer.parseInt(twMessageKeyMax),
+ Integer.parseInt(numCommonUsers));
+ PartitionConfiguration pc = new PartitionConfiguration(sp, tp);
+ return pc;
+ }
+ return null;
+ }
+
+ public static List<TargetPartition> getTargetPartitions(PartitionMetrics metrics,
+ List<SourcePartition> sourcePartitions) {
+ List<TargetPartition> partitions = new ArrayList<TargetPartition>();
+ int fbUserKeyMin = 1;
+ int twUserKeyMin = 1;
+ int fbMessageIdMin = 1;
+ int twMessageIdMin = 1;
+
+ for (SourcePartition sp : sourcePartitions) {
+ int fbUserKeyMax = fbUserKeyMin + metrics.getFbOnlyUsers() + metrics.getCommonUsers() - 1;
+ int twUserKeyMax = twUserKeyMin + metrics.getTwitterOnlyUsers() + metrics.getCommonUsers() - 1;
+
+ int fbMessageIdMax = fbMessageIdMin + metrics.getFbMessages() - 1;
+ int twMessageIdMax = twMessageIdMin + metrics.getTwMessages() - 1;
+ TargetPartition pe = new TargetPartition(sp.getName(), sp.getHost(), sp.getPath(), fbUserKeyMin,
+ fbUserKeyMax, twUserKeyMin, twUserKeyMax, fbMessageIdMin, fbMessageIdMax, twMessageIdMin,
+ twMessageIdMax, metrics.getCommonUsers());
+ partitions.add(pe);
+
+ fbUserKeyMin = fbUserKeyMax + 1;
+ twUserKeyMin = twUserKeyMax + 1;
+
+ fbMessageIdMin = fbMessageIdMax + 1;
+ twMessageIdMin = twMessageIdMax + 1;
+ }
+
+ return partitions;
+ }
+
+ public static void writePartitionElement(SourcePartition sourcePartition, TargetPartition partition,
+ Element rootElement, Document doc) {
+ // staff elements
+ Element pe = doc.createElement("Partition");
+ rootElement.appendChild(pe);
+
+ // name element
+ Element name = doc.createElement("name");
+ name.appendChild(doc.createTextNode("" + partition.getName()));
+ pe.appendChild(name);
+
+ // host element
+ Element host = doc.createElement("host");
+ host.appendChild(doc.createTextNode("" + partition.getHost()));
+ pe.appendChild(host);
+
+ // path element
+ Element path = doc.createElement("path");
+ path.appendChild(doc.createTextNode("" + partition.getPath()));
+ pe.appendChild(path);
+
+ // fbUserKeyMin element
+ Element fbUserKeyMin = doc.createElement("fbUserKeyMin");
+ fbUserKeyMin.appendChild(doc.createTextNode("" + partition.getFbUserKeyMin()));
+ pe.appendChild(fbUserKeyMin);
+
+ // fbUserKeyMax element
+ Element fbUserKeyMax = doc.createElement("fbUserKeyMax");
+ fbUserKeyMax.appendChild(doc.createTextNode("" + partition.getFbUserKeyMax()));
+ pe.appendChild(fbUserKeyMax);
+
+ // twUserKeyMin element
+ Element twUserKeyMin = doc.createElement("twUserKeyMin");
+ twUserKeyMin.appendChild(doc.createTextNode("" + partition.getTwUserKeyMin()));
+ pe.appendChild(twUserKeyMin);
+
+ // twUserKeyMax element
+ Element twUserKeyMax = doc.createElement("twUserKeyMax");
+ twUserKeyMax.appendChild(doc.createTextNode("" + partition.getTwUserKeyMax()));
+ pe.appendChild(twUserKeyMax);
+
+ // fbMessgeKeyMin element
+ Element fbMessageKeyMin = doc.createElement("fbMessageKeyMin");
+ fbMessageKeyMin.appendChild(doc.createTextNode("" + partition.getFbMessageIdMin()));
+ pe.appendChild(fbMessageKeyMin);
+
+ // fbMessgeKeyMin element
+ Element fbMessageKeyMax = doc.createElement("fbMessageKeyMax");
+ fbMessageKeyMax.appendChild(doc.createTextNode("" + partition.getFbMessageIdMax()));
+ pe.appendChild(fbMessageKeyMax);
+
+ // twMessgeKeyMin element
+ Element twMessageKeyMin = doc.createElement("twMessageKeyMin");
+ twMessageKeyMin.appendChild(doc.createTextNode("" + partition.getTwMessageIdMin()));
+ pe.appendChild(twMessageKeyMin);
+
+ // twMessgeKeyMin element
+ Element twMessageKeyMax = doc.createElement("twMessageKeyMax");
+ twMessageKeyMax.appendChild(doc.createTextNode("" + partition.getTwMessageIdMax()));
+ pe.appendChild(twMessageKeyMax);
+
+ // twMessgeKeyMin element
+ Element numCommonUsers = doc.createElement("numCommonUsers");
+ numCommonUsers.appendChild(doc.createTextNode("" + partition.getCommonUsers()));
+ pe.appendChild(numCommonUsers);
+
+ }
+
+ public static void main(String args[]) throws Exception {
+ String confFile = "/Users/rgrove1/work/research/asterix/icde/data-gen/conf/conf.xml";
+ String outputPath = "/Users/rgrove1/work/research/asterix/icde/data-gen/output/conf-output.xml";
+ Configuration conf = getConfiguration(confFile);
+ writeToXML(conf, outputPath);
+ }
+
+ }
+
+ public static class Date {
+
+ private int day;
+ private int month;
+ private int year;
+
+ public Date(int month, int day, int year) {
+ this.month = month;
+ this.day = day;
+ this.year = year;
+ }
+
+ public void reset(int month, int day, int year) {
+ this.month = month;
+ this.day = day;
+ this.year = year;
+ }
+
+ public int getDay() {
+ return day;
+ }
+
+ public int getMonth() {
+ return month;
+ }
+
+ public int getYear() {
+ return year;
+ }
+
+ public Date() {
+ }
+
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("date");
+ builder.append("(\"");
+ builder.append(year);
+ builder.append("-");
+ builder.append(month < 10 ? "0" + month : "" + month);
+ builder.append("-");
+ builder.append(day < 10 ? "0" + day : "" + day);
+ builder.append("\")");
+ return builder.toString();
+ }
+
+ public void setDay(int day) {
+ this.day = day;
+ }
+
+ public void setMonth(int month) {
+ this.month = month;
+ }
+
+ public void setYear(int year) {
+ this.year = year;
+ }
+ }
+
+ public static class PartitionMetrics {
+
+ private final int fbMessages;
+ private final int twMessages;
+
+ private final int fbOnlyUsers;
+ private final int twitterOnlyUsers;
+ private final int commonUsers;
+
+ public PartitionMetrics(float number, String unit, int numPartitions) throws IOException {
+
+ int factor = 0;
+ if (unit.equalsIgnoreCase("MB")) {
+ factor = 1024 * 1024;
+ } else if (unit.equalsIgnoreCase("GB")) {
+ factor = 1024 * 1024 * 1024;
+ } else if (unit.equalsIgnoreCase("TB")) {
+ factor = 1024 * 1024 * 1024 * 1024;
+ } else
+ throw new IOException("Invalid unit:" + unit);
+
+ fbMessages = (int) (((number * factor * 0.80) / 258) / numPartitions);
+ twMessages = (int) (fbMessages * 1.1 / 0.35);
+
+ fbOnlyUsers = (int) ((number * factor * 0.20 * 0.0043)) / numPartitions;
+ twitterOnlyUsers = (int) (0.25 * fbOnlyUsers);
+ commonUsers = (int) (0.1 * fbOnlyUsers);
+ }
+
+ public int getFbMessages() {
+ return fbMessages;
+ }
+
+ public int getTwMessages() {
+ return twMessages;
+ }
+
+ public int getFbOnlyUsers() {
+ return fbOnlyUsers;
+ }
+
+ public int getTwitterOnlyUsers() {
+ return twitterOnlyUsers;
+ }
+
+ public int getCommonUsers() {
+ return commonUsers;
+ }
+
+ }
+
+ public static String[] lastNames = { "Hoopengarner", "Harrow", "Gardner", "Blyant", "Best", "Buttermore", "Gronko",
+ "Mayers", "Countryman", "Neely", "Ruhl", "Taggart", "Bash", "Cason", "Hil", "Zalack", "Mingle", "Carr",
+ "Rohtin", "Wardle", "Pullman", "Wire", "Kellogg", "Hiles", "Keppel", "Bratton", "Sutton", "Wickes",
+ "Muller", "Friedline", "Llora", "Elizabeth", "Anderson", "Gaskins", "Rifler", "Vinsant", "Stanfield",
+ "Black", "Guest", "Hujsak", "Carter", "Weidemann", "Hays", "Patton", "Hayhurst", "Paynter", "Cressman",
+ "Fiddler", "Evans", "Sherlock", "Woodworth", "Jackson", "Bloise", "Schneider", "Ring", "Kepplinger",
+ "James", "Moon", "Bennett", "Bashline", "Ryals", "Zeal", "Christman", "Milliron", "Nash", "Ewing", "Camp",
+ "Mason", "Richardson", "Bowchiew", "Hahn", "Wilson", "Wood", "Toyley", "Williamson", "Lafortune", "Errett",
+ "Saltser", "Hirleman", "Brindle", "Newbiggin", "Ulery", "Lambert", "Shick", "Kuster", "Moore", "Finck",
+ "Powell", "Jolce", "Townsend", "Sauter", "Cowher", "Wolfe", "Cavalet", "Porter", "Laborde", "Ballou",
+ "Murray", "Stoddard", "Pycroft", "Milne", "King", "Todd", "Staymates", "Hall", "Romanoff", "Keilbach",
+ "Sandford", "Hamilton", "Fye", "Kline", "Weeks", "Mcelroy", "Mccullough", "Bryant", "Hill", "Moore",
+ "Ledgerwood", "Prevatt", "Eckert", "Read", "Hastings", "Doverspike", "Allshouse", "Bryan", "Mccallum",
+ "Lombardi", "Mckendrick", "Cattley", "Barkley", "Steiner", "Finlay", "Priebe", "Armitage", "Hall", "Elder",
+ "Erskine", "Hatcher", "Walker", "Pearsall", "Dunkle", "Haile", "Adams", "Miller", "Newbern", "Basinger",
+ "Fuhrer", "Brinigh", "Mench", "Blackburn", "Bastion", "Mccune", "Bridger", "Hynes", "Quinn", "Courtney",
+ "Geddinge", "Field", "Seelig", "Cable", "Earhart", "Harshman", "Roby", "Beals", "Berry", "Reed", "Hector",
+ "Pittman", "Haverrman", "Kalp", "Briner", "Joghs", "Cowart", "Close", "Wynne", "Harden", "Weldy",
+ "Stephenson", "Hildyard", "Moberly", "Wells", "Mackendoerfer", "Fisher", "Oppie", "Oneal", "Churchill",
+ "Keister", "Alice", "Tavoularis", "Fisher", "Hair", "Burns", "Veith", "Wile", "Fuller", "Fields", "Clark",
+ "Randolph", "Stone", "Mcclymonds", "Holtzer", "Donkin", "Wilkinson", "Rosensteel", "Albright", "Stahl",
+ "Fox", "Kadel", "Houser", "Hanseu", "Henderson", "Davis", "Bicknell", "Swain", "Mercer", "Holdeman",
+ "Enderly", "Caesar", "Margaret", "Munshower", "Elless", "Lucy", "Feufer", "Schofield", "Graham",
+ "Blatenberger", "Benford", "Akers", "Campbell", "Ann", "Sadley", "Ling", "Gongaware", "Schmidt", "Endsley",
+ "Groah", "Flanders", "Reichard", "Lowstetter", "Sandblom", "Griffis", "Basmanoff", "Coveney", "Hawker",
+ "Archibald", "Hutton", "Barnes", "Diegel", "Raybould", "Focell", "Breitenstein", "Murray", "Chauvin",
+ "Busk", "Pheleps", "Teagarden", "Northey", "Baumgartner", "Fleming", "Harris", "Parkinson", "Carpenter",
+ "Whirlow", "Bonner", "Wortman", "Rogers", "Scott", "Lowe", "Mckee", "Huston", "Bullard", "Throckmorton",
+ "Rummel", "Mathews", "Dull", "Saline", "Tue", "Woolery", "Lalty", "Schrader", "Ramsey", "Eisenmann",
+ "Philbrick", "Sybilla", "Wallace", "Fonblanque", "Paul", "Orbell", "Higgens", "Casteel", "Franks",
+ "Demuth", "Eisenman", "Hay", "Robinson", "Fischer", "Hincken", "Wylie", "Leichter", "Bousum",
+ "Littlefield", "Mcdonald", "Greif", "Rhodes", "Wall", "Steele", "Baldwin", "Smith", "Stewart", "Schere",
+ "Mary", "Aultman", "Emrick", "Guess", "Mitchell", "Painter", "Aft", "Hasely", "Weldi", "Loewentsein",
+ "Poorbaugh", "Kepple", "Noton", "Judge", "Jackson", "Style", "Adcock", "Diller", "Marriman", "Johnston",
+ "Children", "Monahan", "Ehret", "Shaw", "Congdon", "Pinney", "Millard", "Crissman", "Tanner", "Rose",
+ "Knisely", "Cypret", "Sommer", "Poehl", "Hardie", "Bender", "Overholt", "Gottwine", "Beach", "Leslie",
+ "Trevithick", "Langston", "Magor", "Shotts", "Howe", "Hunter", "Cross", "Kistler", "Dealtry", "Christner",
+ "Pennington", "Thorley", "Eckhardstein", "Van", "Stroh", "Stough", "Stall", "Beedell", "Shea", "Garland",
+ "Mays", "Pritchard", "Frankenberger", "Rowley", "Lane", "Baum", "Alliman", "Park", "Jardine", "Butler",
+ "Cherry", "Kooser", "Baxter", "Billimek", "Downing", "Hurst", "Wood", "Baird", "Watkins", "Edwards",
+ "Kemerer", "Harding", "Owens", "Eiford", "Keener", "Garneis", "Fiscina", "Mang", "Draudy", "Mills",
+ "Gibson", "Reese", "Todd", "Ramos", "Levett", "Wilks", "Ward", "Mosser", "Dunlap", "Kifer", "Christopher",
+ "Ashbaugh", "Wynter", "Rawls", "Cribbs", "Haynes", "Thigpen", "Schreckengost", "Bishop", "Linton",
+ "Chapman", "James", "Jerome", "Hook", "Omara", "Houston", "Maclagan", "Sandys", "Pickering", "Blois",
+ "Dickson", "Kemble", "Duncan", "Woodward", "Southern", "Henley", "Treeby", "Cram", "Elsas", "Driggers",
+ "Warrick", "Overstreet", "Hindman", "Buck", "Sulyard", "Wentzel", "Swink", "Butt", "Schaeffer",
+ "Hoffhants", "Bould", "Willcox", "Lotherington", "Bagley", "Graff", "White", "Wheeler", "Sloan",
+ "Rodacker", "Hanford", "Jowers", "Kunkle", "Cass", "Powers", "Gilman", "Mcmichaels", "Hobbs", "Herndon",
+ "Prescott", "Smail", "Mcdonald", "Biery", "Orner", "Richards", "Mueller", "Isaman", "Bruxner", "Goodman",
+ "Barth", "Turzanski", "Vorrasi", "Stainforth", "Nehling", "Rahl", "Erschoff", "Greene", "Mckinnon",
+ "Reade", "Smith", "Pery", "Roose", "Greenwood", "Weisgarber", "Curry", "Holts", "Zadovsky", "Parrish",
+ "Putnam", "Munson", "Mcindoe", "Nickolson", "Brooks", "Bollinger", "Stroble", "Siegrist", "Fulton",
+ "Tomey", "Zoucks", "Roberts", "Otis", "Clarke", "Easter", "Johnson", "Fylbrigg", "Taylor", "Swartzbaugh",
+ "Weinstein", "Gadow", "Sayre", "Marcotte", "Wise", "Atweeke", "Mcfall", "Napier", "Eisenhart", "Canham",
+ "Sealis", "Baughman", "Gertraht", "Losey", "Laurence", "Eva", "Pershing", "Kern", "Pirl", "Rega",
+ "Sanborn", "Kanaga", "Sanders", "Anderson", "Dickinson", "Osteen", "Gettemy", "Crom", "Snyder", "Reed",
+ "Laurenzi", "Riggle", "Tillson", "Fowler", "Raub", "Jenner", "Koepple", "Soames", "Goldvogel", "Dimsdale",
+ "Zimmer", "Giesen", "Baker", "Beail", "Mortland", "Bard", "Sanner", "Knopsnider", "Jenkins", "Bailey",
+ "Werner", "Barrett", "Faust", "Agg", "Tomlinson", "Williams", "Little", "Greenawalt", "Wells", "Wilkins",
+ "Gisiko", "Bauerle", "Harrold", "Prechtl", "Polson", "Faast", "Winton", "Garneys", "Peters", "Potter",
+ "Porter", "Tennant", "Eve", "Dugger", "Jones", "Burch", "Cowper", "Whittier" };
+
+ public static String[] firstNames = { "Albert", "Jacquelin", "Dona", "Alia", "Mayme", "Genoveva", "Emma", "Lena",
+ "Melody", "Vilma", "Katelyn", "Jeremy", "Coral", "Leann", "Lita", "Gilda", "Kayla", "Alvina", "Maranda",
+ "Verlie", "Khadijah", "Karey", "Patrice", "Kallie", "Corey", "Mollie", "Daisy", "Melanie", "Sarita",
+ "Nichole", "Pricilla", "Terresa", "Berneice", "Arianne", "Brianne", "Lavinia", "Ulrike", "Lesha", "Adell",
+ "Ardelle", "Marisha", "Laquita", "Karyl", "Maryjane", "Kendall", "Isobel", "Raeann", "Heike", "Barbera",
+ "Norman", "Yasmine", "Nevada", "Mariam", "Edith", "Eugena", "Lovie", "Maren", "Bennie", "Lennie", "Tamera",
+ "Crystal", "Randi", "Anamaria", "Chantal", "Jesenia", "Avis", "Shela", "Randy", "Laurena", "Sharron",
+ "Christiane", "Lorie", "Mario", "Elizabeth", "Reina", "Adria", "Lakisha", "Brittni", "Azzie", "Dori",
+ "Shaneka", "Asuncion", "Katheryn", "Laurice", "Sharita", "Krystal", "Reva", "Inger", "Alpha", "Makeda",
+ "Anabel", "Loni", "Tiara", "Meda", "Latashia", "Leola", "Chin", "Daisey", "Ivory", "Amalia", "Logan",
+ "Tyler", "Kyong", "Carolann", "Maryetta", "Eufemia", "Anya", "Doreatha", "Lorna", "Rutha", "Ehtel",
+ "Debbie", "Chassidy", "Sang", "Christa", "Lottie", "Chun", "Karine", "Peggie", "Amina", "Melany", "Alayna",
+ "Scott", "Romana", "Naomi", "Christiana", "Salena", "Taunya", "Mitsue", "Regina", "Chelsie", "Charity",
+ "Dacia", "Aletha", "Latosha", "Lia", "Tamica", "Chery", "Bianca", "Shu", "Georgianne", "Myriam", "Austin",
+ "Wan", "Mallory", "Jana", "Georgie", "Jenell", "Kori", "Vicki", "Delfina", "June", "Mellisa", "Catherina",
+ "Claudie", "Tynisha", "Dayle", "Enriqueta", "Belen", "Pia", "Sarai", "Rosy", "Renay", "Kacie", "Frieda",
+ "Cayla", "Elissa", "Claribel", "Sabina", "Mackenzie", "Raina", "Cira", "Mitzie", "Aubrey", "Serafina",
+ "Maria", "Katharine", "Esperanza", "Sung", "Daria", "Billye", "Stefanie", "Kasha", "Holly", "Suzanne",
+ "Inga", "Flora", "Andria", "Genevie", "Eladia", "Janet", "Erline", "Renna", "Georgeanna", "Delorse",
+ "Elnora", "Rudy", "Rima", "Leanora", "Letisha", "Love", "Alverta", "Pinkie", "Domonique", "Jeannie",
+ "Jose", "Jacqueline", "Tara", "Lily", "Erna", "Tennille", "Galina", "Tamala", "Kirby", "Nichelle",
+ "Myesha", "Farah", "Santa", "Ludie", "Kenia", "Yee", "Micheline", "Maryann", "Elaina", "Ethelyn",
+ "Emmaline", "Shanell", "Marina", "Nila", "Alane", "Shakira", "Dorris", "Belinda", "Elois", "Barbie",
+ "Carita", "Gisela", "Lura", "Fransisca", "Helga", "Peg", "Leonarda", "Earlie", "Deetta", "Jacquetta",
+ "Blossom", "Kayleigh", "Deloras", "Keshia", "Christinia", "Dulce", "Bernie", "Sheba", "Lashanda", "Tula",
+ "Claretta", "Kary", "Jeanette", "Lupita", "Lenora", "Hisako", "Sherise", "Glynda", "Adela", "Chia",
+ "Sudie", "Mindy", "Caroyln", "Lindsey", "Xiomara", "Mercedes", "Onie", "Loan", "Alexis", "Tommie",
+ "Donette", "Monica", "Soo", "Camellia", "Lavera", "Valery", "Ariana", "Sophia", "Loris", "Ginette",
+ "Marielle", "Tari", "Julissa", "Alesia", "Suzanna", "Emelda", "Erin", "Ladawn", "Sherilyn", "Candice",
+ "Nereida", "Fairy", "Carl", "Joel", "Marilee", "Gracia", "Cordie", "So", "Shanita", "Drew", "Cassie",
+ "Sherie", "Marget", "Norma", "Delois", "Debera", "Chanelle", "Catarina", "Aracely", "Carlene", "Tricia",
+ "Aleen", "Katharina", "Marguerita", "Guadalupe", "Margorie", "Mandie", "Kathe", "Chong", "Sage", "Faith",
+ "Maryrose", "Stephany", "Ivy", "Pauline", "Susie", "Cristen", "Jenifer", "Annette", "Debi", "Karmen",
+ "Luci", "Shayla", "Hope", "Ocie", "Sharie", "Tami", "Breana", "Kerry", "Rubye", "Lashay", "Sondra",
+ "Katrice", "Brunilda", "Cortney", "Yan", "Zenobia", "Penni", "Addie", "Lavona", "Noel", "Anika",
+ "Herlinda", "Valencia", "Bunny", "Tory", "Victoria", "Carrie", "Mikaela", "Wilhelmina", "Chung",
+ "Hortencia", "Gerda", "Wen", "Ilana", "Sibyl", "Candida", "Victorina", "Chantell", "Casie", "Emeline",
+ "Dominica", "Cecila", "Delora", "Miesha", "Nova", "Sally", "Ronald", "Charlette", "Francisca", "Mina",
+ "Jenna", "Loraine", "Felisa", "Lulu", "Page", "Lyda", "Babara", "Flor", "Walter", "Chan", "Sherika",
+ "Kala", "Luna", "Vada", "Syreeta", "Slyvia", "Karin", "Renata", "Robbi", "Glenda", "Delsie", "Lizzie",
+ "Genia", "Caitlin", "Bebe", "Cory", "Sam", "Leslee", "Elva", "Caren", "Kasie", "Leticia", "Shannan",
+ "Vickey", "Sandie", "Kyle", "Chang", "Terrilyn", "Sandra", "Elida", "Marketta", "Elsy", "Tu", "Carman",
+ "Ashlie", "Vernia", "Albertine", "Vivian", "Elba", "Bong", "Margy", "Janetta", "Xiao", "Teofila", "Danyel",
+ "Nickole", "Aleisha", "Tera", "Cleotilde", "Dara", "Paulita", "Isela", "Maricela", "Rozella", "Marivel",
+ "Aurora", "Melissa", "Carylon", "Delinda", "Marvella", "Candelaria", "Deidre", "Tawanna", "Myrtie",
+ "Milagro", "Emilie", "Coretta", "Ivette", "Suzann", "Ammie", "Lucina", "Lory", "Tena", "Eleanor",
+ "Cherlyn", "Tiana", "Brianna", "Myra", "Flo", "Carisa", "Kandi", "Erlinda", "Jacqulyn", "Fermina", "Riva",
+ "Palmira", "Lindsay", "Annmarie", "Tamiko", "Carline", "Amelia", "Quiana", "Lashawna", "Veola", "Belva",
+ "Marsha", "Verlene", "Alex", "Leisha", "Camila", "Mirtha", "Melva", "Lina", "Arla", "Cythia", "Towanda",
+ "Aracelis", "Tasia", "Aurore", "Trinity", "Bernadine", "Farrah", "Deneen", "Ines", "Betty", "Lorretta",
+ "Dorethea", "Hertha", "Rochelle", "Juli", "Shenika", "Yung", "Lavon", "Deeanna", "Nakia", "Lynnette",
+ "Dinorah", "Nery", "Elene", "Carolee", "Mira", "Franchesca", "Lavonda", "Leida", "Paulette", "Dorine",
+ "Allegra", "Keva", "Jeffrey", "Bernardina", "Maryln", "Yoko", "Faviola", "Jayne", "Lucilla", "Charita",
+ "Ewa", "Ella", "Maggie", "Ivey", "Bettie", "Jerri", "Marni", "Bibi", "Sabrina", "Sarah", "Marleen",
+ "Katherin", "Remona", "Jamika", "Antonina", "Oliva", "Lajuana", "Fonda", "Sigrid", "Yael", "Billi",
+ "Verona", "Arminda", "Mirna", "Tesha", "Katheleen", "Bonita", "Kamilah", "Patrica", "Julio", "Shaina",
+ "Mellie", "Denyse", "Deandrea", "Alena", "Meg", "Kizzie", "Krissy", "Karly", "Alleen", "Yahaira", "Lucie",
+ "Karena", "Elaine", "Eloise", "Buena", "Marianela", "Renee", "Nan", "Carolynn", "Windy", "Avril", "Jane",
+ "Vida", "Thea", "Marvel", "Rosaline", "Tifany", "Robena", "Azucena", "Carlota", "Mindi", "Andera", "Jenny",
+ "Courtney", "Lyndsey", "Willette", "Kristie", "Shaniqua", "Tabatha", "Ngoc", "Una", "Marlena", "Louetta",
+ "Vernie", "Brandy", "Jacquelyne", "Jenelle", "Elna", "Erminia", "Ida", "Audie", "Louis", "Marisol",
+ "Shawana", "Harriette", "Karol", "Kitty", "Esmeralda", "Vivienne", "Eloisa", "Iris", "Jeanice", "Cammie",
+ "Jacinda", "Shena", "Floy", "Theda", "Lourdes", "Jayna", "Marg", "Kati", "Tanna", "Rosalyn", "Maxima",
+ "Soon", "Angelika", "Shonna", "Merle", "Kassandra", "Deedee", "Heidi", "Marti", "Renae", "Arleen",
+ "Alfredia", "Jewell", "Carley", "Pennie", "Corina", "Tonisha", "Natividad", "Lilliana", "Darcie", "Shawna",
+ "Angel", "Piedad", "Josefa", "Rebbeca", "Natacha", "Nenita", "Petrina", "Carmon", "Chasidy", "Temika",
+ "Dennise", "Renetta", "Augusta", "Shirlee", "Valeri", "Casimira", "Janay", "Berniece", "Deborah", "Yaeko",
+ "Mimi", "Digna", "Irish", "Cher", "Yong", "Lucila", "Jimmie", "Junko", "Lezlie", "Waneta", "Sandee",
+ "Marquita", "Eura", "Freeda", "Annabell", "Laree", "Jaye", "Wendy", "Toshia", "Kylee", "Aleta", "Emiko",
+ "Clorinda", "Sixta", "Audrea", "Juanita", "Birdie", "Reita", "Latanya", "Nia", "Leora", "Laurine",
+ "Krysten", "Jerrie", "Chantel", "Ira", "Sena", "Andre", "Jann", "Marla", "Precious", "Katy", "Gabrielle",
+ "Yvette", "Brook", "Shirlene", "Eldora", "Laura", "Milda", "Euna", "Jettie", "Debora", "Lise", "Edythe",
+ "Leandra", "Shandi", "Araceli", "Johanne", "Nieves", "Denese", "Carmelita", "Nohemi", "Annice", "Natalie",
+ "Yolande", "Jeffie", "Vashti", "Vickie", "Obdulia", "Youlanda", "Lupe", "Tomoko", "Monserrate", "Domitila",
+ "Etsuko", "Adrienne", "Lakesha", "Melissia", "Odessa", "Meagan", "Veronika", "Jolyn", "Isabelle", "Leah",
+ "Rhiannon", "Gianna", "Audra", "Sommer", "Renate", "Perla", "Thao", "Myong", "Lavette", "Mark", "Emilia",
+ "Ariane", "Karl", "Dorie", "Jacquie", "Mia", "Malka", "Shenita", "Tashina", "Christine", "Cherri", "Roni",
+ "Fran", "Mildred", "Sara", "Clarissa", "Fredia", "Elease", "Samuel", "Earlene", "Vernita", "Mae", "Concha",
+ "Renea", "Tamekia", "Hye", "Ingeborg", "Tessa", "Kelly", "Kristin", "Tam", "Sacha", "Kanisha", "Jillian",
+ "Tiffanie", "Ashlee", "Madelyn", "Donya", "Clementine", "Mickie", "My", "Zena", "Terrie", "Samatha",
+ "Gertie", "Tarra", "Natalia", "Sharlene", "Evie", "Shalon", "Rosalee", "Numbers", "Jodi", "Hattie",
+ "Naoma", "Valene", "Whitley", "Claude", "Alline", "Jeanne", "Camie", "Maragret", "Viola", "Kris", "Marlo",
+ "Arcelia", "Shari", "Jalisa", "Corrie", "Eleonor", "Angelyn", "Merry", "Lauren", "Melita", "Gita",
+ "Elenor", "Aurelia", "Janae", "Lyndia", "Margeret", "Shawanda", "Rolande", "Shirl", "Madeleine", "Celinda",
+ "Jaleesa", "Shemika", "Joye", "Tisa", "Trudie", "Kathrine", "Clarita", "Dinah", "Georgia", "Antoinette",
+ "Janis", "Suzette", "Sherri", "Herta", "Arie", "Hedy", "Cassi", "Audrie", "Caryl", "Jazmine", "Jessica",
+ "Beverly", "Elizbeth", "Marylee", "Londa", "Fredericka", "Argelia", "Nana", "Donnette", "Damaris",
+ "Hailey", "Jamee", "Kathlene", "Glayds", "Lydia", "Apryl", "Verla", "Adam", "Concepcion", "Zelda",
+ "Shonta", "Vernice", "Detra", "Meghann", "Sherley", "Sheri", "Kiyoko", "Margarita", "Adaline", "Mariela",
+ "Velda", "Ailene", "Juliane", "Aiko", "Edyth", "Cecelia", "Shavon", "Florance", "Madeline", "Rheba",
+ "Deann", "Ignacia", "Odelia", "Heide", "Mica", "Jennette", "Maricruz", "Ouida", "Darcy", "Laure",
+ "Justina", "Amada", "Laine", "Cruz", "Sunny", "Francene", "Roxanna", "Nam", "Nancie", "Deanna", "Letty",
+ "Britni", "Kazuko", "Lacresha", "Simon", "Caleb", "Milton", "Colton", "Travis", "Miles", "Jonathan",
+ "Logan", "Rolf", "Emilio", "Roberto", "Marcus", "Tim", "Delmar", "Devon", "Kurt", "Edward", "Jeffrey",
+ "Elvis", "Alfonso", "Blair", "Wm", "Sheldon", "Leonel", "Michal", "Federico", "Jacques", "Leslie",
+ "Augustine", "Hugh", "Brant", "Hong", "Sal", "Modesto", "Curtis", "Jefferey", "Adam", "John", "Glenn",
+ "Vance", "Alejandro", "Refugio", "Lucio", "Demarcus", "Chang", "Huey", "Neville", "Preston", "Bert",
+ "Abram", "Foster", "Jamison", "Kirby", "Erich", "Manual", "Dustin", "Derrick", "Donnie", "Jospeh", "Chris",
+ "Josue", "Stevie", "Russ", "Stanley", "Nicolas", "Samuel", "Waldo", "Jake", "Max", "Ernest", "Reinaldo",
+ "Rene", "Gale", "Morris", "Nathan", "Maximo", "Courtney", "Theodore", "Octavio", "Otha", "Delmer",
+ "Graham", "Dean", "Lowell", "Myles", "Colby", "Boyd", "Adolph", "Jarrod", "Nick", "Mark", "Clinton", "Kim",
+ "Sonny", "Dalton", "Tyler", "Jody", "Orville", "Luther", "Rubin", "Hollis", "Rashad", "Barton", "Vicente",
+ "Ted", "Rick", "Carmine", "Clifton", "Gayle", "Christopher", "Jessie", "Bradley", "Clay", "Theo", "Josh",
+ "Mitchell", "Boyce", "Chung", "Eugenio", "August", "Norbert", "Sammie", "Jerry", "Adan", "Edmundo",
+ "Homer", "Hilton", "Tod", "Kirk", "Emmett", "Milan", "Quincy", "Jewell", "Herb", "Steve", "Carmen",
+ "Bobby", "Odis", "Daron", "Jeremy", "Carl", "Hunter", "Tuan", "Thurman", "Asa", "Brenton", "Shane",
+ "Donny", "Andreas", "Teddy", "Dario", "Cyril", "Hoyt", "Teodoro", "Vincenzo", "Hilario", "Daren",
+ "Agustin", "Marquis", "Ezekiel", "Brendan", "Johnson", "Alden", "Richie", "Granville", "Chad", "Joseph",
+ "Lamont", "Jordon", "Gilberto", "Chong", "Rosendo", "Eddy", "Rob", "Dewitt", "Andre", "Titus", "Russell",
+ "Rigoberto", "Dick", "Garland", "Gabriel", "Hank", "Darius", "Ignacio", "Lazaro", "Johnie", "Mauro",
+ "Edmund", "Trent", "Harris", "Osvaldo", "Marvin", "Judson", "Rodney", "Randall", "Renato", "Richard",
+ "Denny", "Jon", "Doyle", "Cristopher", "Wilson", "Christian", "Jamie", "Roland", "Ken", "Tad", "Romeo",
+ "Seth", "Quinton", "Byron", "Ruben", "Darrel", "Deandre", "Broderick", "Harold", "Ty", "Monroe", "Landon",
+ "Mohammed", "Angel", "Arlen", "Elias", "Andres", "Carlton", "Numbers", "Tony", "Thaddeus", "Issac",
+ "Elmer", "Antoine", "Ned", "Fermin", "Grover", "Benito", "Abdul", "Cortez", "Eric", "Maxwell", "Coy",
+ "Gavin", "Rich", "Andy", "Del", "Giovanni", "Major", "Efren", "Horacio", "Joaquin", "Charles", "Noah",
+ "Deon", "Pasquale", "Reed", "Fausto", "Jermaine", "Irvin", "Ray", "Tobias", "Carter", "Yong", "Jorge",
+ "Brent", "Daniel", "Zane", "Walker", "Thad", "Shaun", "Jaime", "Mckinley", "Bradford", "Nathanial",
+ "Jerald", "Aubrey", "Virgil", "Abel", "Philip", "Chester", "Chadwick", "Dominick", "Britt", "Emmitt",
+ "Ferdinand", "Julian", "Reid", "Santos", "Dwain", "Morgan", "James", "Marion", "Micheal", "Eddie", "Brett",
+ "Stacy", "Kerry", "Dale", "Nicholas", "Darrick", "Freeman", "Scott", "Newton", "Sherman", "Felton",
+ "Cedrick", "Winfred", "Brad", "Fredric", "Dewayne", "Virgilio", "Reggie", "Edgar", "Heriberto", "Shad",
+ "Timmy", "Javier", "Nestor", "Royal", "Lynn", "Irwin", "Ismael", "Jonas", "Wiley", "Austin", "Kieth",
+ "Gonzalo", "Paris", "Earnest", "Arron", "Jarred", "Todd", "Erik", "Maria", "Chauncey", "Neil", "Conrad",
+ "Maurice", "Roosevelt", "Jacob", "Sydney", "Lee", "Basil", "Louis", "Rodolfo", "Rodger", "Roman", "Corey",
+ "Ambrose", "Cristobal", "Sylvester", "Benton", "Franklin", "Marcelo", "Guillermo", "Toby", "Jeramy",
+ "Donn", "Danny", "Dwight", "Clifford", "Valentine", "Matt", "Jules", "Kareem", "Ronny", "Lonny", "Son",
+ "Leopoldo", "Dannie", "Gregg", "Dillon", "Orlando", "Weston", "Kermit", "Damian", "Abraham", "Walton",
+ "Adrian", "Rudolf", "Will", "Les", "Norberto", "Fred", "Tyrone", "Ariel", "Terry", "Emmanuel", "Anderson",
+ "Elton", "Otis", "Derek", "Frankie", "Gino", "Lavern", "Jarod", "Kenny", "Dane", "Keenan", "Bryant",
+ "Eusebio", "Dorian", "Ali", "Lucas", "Wilford", "Jeremiah", "Warner", "Woodrow", "Galen", "Bob",
+ "Johnathon", "Amado", "Michel", "Harry", "Zachery", "Taylor", "Booker", "Hershel", "Mohammad", "Darrell",
+ "Kyle", "Stuart", "Marlin", "Hyman", "Jeffery", "Sidney", "Merrill", "Roy", "Garrett", "Porter", "Kenton",
+ "Giuseppe", "Terrance", "Trey", "Felix", "Buster", "Von", "Jackie", "Linwood", "Darron", "Francisco",
+ "Bernie", "Diego", "Brendon", "Cody", "Marco", "Ahmed", "Antonio", "Vince", "Brooks", "Kendrick", "Ross",
+ "Mohamed", "Jim", "Benny", "Gerald", "Pablo", "Charlie", "Antony", "Werner", "Hipolito", "Minh", "Mel",
+ "Derick", "Armand", "Fidel", "Lewis", "Donnell", "Desmond", "Vaughn", "Guadalupe", "Keneth", "Rodrick",
+ "Spencer", "Chas", "Gus", "Harlan", "Wes", "Carmelo", "Jefferson", "Gerard", "Jarvis", "Haywood", "Hayden",
+ "Sergio", "Gene", "Edgardo", "Colin", "Horace", "Dominic", "Aldo", "Adolfo", "Juan", "Man", "Lenard",
+ "Clement", "Everett", "Hal", "Bryon", "Mason", "Emerson", "Earle", "Laurence", "Columbus", "Lamar",
+ "Douglas", "Ian", "Fredrick", "Marc", "Loren", "Wallace", "Randell", "Noble", "Ricardo", "Rory", "Lindsey",
+ "Boris", "Bill", "Carlos", "Domingo", "Grant", "Craig", "Ezra", "Matthew", "Van", "Rudy", "Danial",
+ "Brock", "Maynard", "Vincent", "Cole", "Damion", "Ellsworth", "Marcel", "Markus", "Rueben", "Tanner",
+ "Reyes", "Hung", "Kennith", "Lindsay", "Howard", "Ralph", "Jed", "Monte", "Garfield", "Avery", "Bernardo",
+ "Malcolm", "Sterling", "Ezequiel", "Kristofer", "Luciano", "Casey", "Rosario", "Ellis", "Quintin",
+ "Trevor", "Miquel", "Jordan", "Arthur", "Carson", "Tyron", "Grady", "Walter", "Jonathon", "Ricky",
+ "Bennie", "Terrence", "Dion", "Dusty", "Roderick", "Isaac", "Rodrigo", "Harrison", "Zack", "Dee", "Devin",
+ "Rey", "Ulysses", "Clint", "Greg", "Dino", "Frances", "Wade", "Franklyn", "Jude", "Bradly", "Salvador",
+ "Rocky", "Weldon", "Lloyd", "Milford", "Clarence", "Alec", "Allan", "Bobbie", "Oswaldo", "Wilfred",
+ "Raleigh", "Shelby", "Willy", "Alphonso", "Arnoldo", "Robbie", "Truman", "Nicky", "Quinn", "Damien",
+ "Lacy", "Marcos", "Parker", "Burt", "Carroll", "Denver", "Buck", "Dong", "Normand", "Billie", "Edwin",
+ "Troy", "Arden", "Rusty", "Tommy", "Kenneth", "Leo", "Claud", "Joel", "Kendall", "Dante", "Milo", "Cruz",
+ "Lucien", "Ramon", "Jarrett", "Scottie", "Deshawn", "Ronnie", "Pete", "Alonzo", "Whitney", "Stefan",
+ "Sebastian", "Edmond", "Enrique", "Branden", "Leonard", "Loyd", "Olin", "Ron", "Rhett", "Frederic",
+ "Orval", "Tyrell", "Gail", "Eli", "Antonia", "Malcom", "Sandy", "Stacey", "Nickolas", "Hosea", "Santo",
+ "Oscar", "Fletcher", "Dave", "Patrick", "Dewey", "Bo", "Vito", "Blaine", "Randy", "Robin", "Winston",
+ "Sammy", "Edwardo", "Manuel", "Valentin", "Stanford", "Filiberto", "Buddy", "Zachariah", "Johnnie",
+ "Elbert", "Paul", "Isreal", "Jerrold", "Leif", "Owen", "Sung", "Junior", "Raphael", "Josef", "Donte",
+ "Allen", "Florencio", "Raymond", "Lauren", "Collin", "Eliseo", "Bruno", "Martin", "Lyndon", "Kurtis",
+ "Salvatore", "Erwin", "Michael", "Sean", "Davis", "Alberto", "King", "Rolland", "Joe", "Tory", "Chase",
+ "Dallas", "Vernon", "Beau", "Terrell", "Reynaldo", "Monty", "Jame", "Dirk", "Florentino", "Reuben", "Saul",
+ "Emory", "Esteban", "Michale", "Claudio", "Jacinto", "Kelley", "Levi", "Andrea", "Lanny", "Wendell",
+ "Elwood", "Joan", "Felipe", "Palmer", "Elmo", "Lawrence", "Hubert", "Rudolph", "Duane", "Cordell",
+ "Everette", "Mack", "Alan", "Efrain", "Trenton", "Bryan", "Tom", "Wilmer", "Clyde", "Chance", "Lou",
+ "Brain", "Justin", "Phil", "Jerrod", "George", "Kris", "Cyrus", "Emery", "Rickey", "Lincoln", "Renaldo",
+ "Mathew", "Luke", "Dwayne", "Alexis", "Jackson", "Gil", "Marty", "Burton", "Emil", "Glen", "Willian",
+ "Clemente", "Keven", "Barney", "Odell", "Reginald", "Aurelio", "Damon", "Ward", "Gustavo", "Harley",
+ "Peter", "Anibal", "Arlie", "Nigel", "Oren", "Zachary", "Scot", "Bud", "Wilbert", "Bart", "Josiah",
+ "Marlon", "Eldon", "Darryl", "Roger", "Anthony", "Omer", "Francis", "Patricia", "Moises", "Chuck",
+ "Waylon", "Hector", "Jamaal", "Cesar", "Julius", "Rex", "Norris", "Ollie", "Isaias", "Quentin", "Graig",
+ "Lyle", "Jeffry", "Karl", "Lester", "Danilo", "Mike", "Dylan", "Carlo", "Ryan", "Leon", "Percy", "Lucius",
+ "Jamel", "Lesley", "Joey", "Cornelius", "Rico", "Arnulfo", "Chet", "Margarito", "Ernie", "Nathanael",
+ "Amos", "Cleveland", "Luigi", "Alfonzo", "Phillip", "Clair", "Elroy", "Alva", "Hans", "Shon", "Gary",
+ "Jesus", "Cary", "Silas", "Keith", "Israel", "Willard", "Randolph", "Dan", "Adalberto", "Claude",
+ "Delbert", "Garry", "Mary", "Larry", "Riley", "Robt", "Darwin", "Barrett", "Steven", "Kelly", "Herschel",
+ "Darnell", "Scotty", "Armando", "Miguel", "Lawerence", "Wesley", "Garth", "Carol", "Micah", "Alvin",
+ "Billy", "Earl", "Pat", "Brady", "Cory", "Carey", "Bernard", "Jayson", "Nathaniel", "Gaylord", "Archie",
+ "Dorsey", "Erasmo", "Angelo", "Elisha", "Long", "Augustus", "Hobert", "Drew", "Stan", "Sherwood",
+ "Lorenzo", "Forrest", "Shawn", "Leigh", "Hiram", "Leonardo", "Gerry", "Myron", "Hugo", "Alvaro", "Leland",
+ "Genaro", "Jamey", "Stewart", "Elden", "Irving", "Olen", "Antone", "Freddy", "Lupe", "Joshua", "Gregory",
+ "Andrew", "Sang", "Wilbur", "Gerardo", "Merlin", "Williams", "Johnny", "Alex", "Tommie", "Jimmy",
+ "Donovan", "Dexter", "Gaston", "Tracy", "Jeff", "Stephen", "Berry", "Anton", "Darell", "Fritz", "Willis",
+ "Noel", "Mariano", "Crawford", "Zoey", "Alex", "Brianna", "Carlie", "Lloyd", "Cal", "Astor", "Randolf",
+ "Magdalene", "Trevelyan", "Terance", "Roy", "Kermit", "Harriett", "Crystal", "Laurinda", "Kiersten",
+ "Phyllida", "Liz", "Bettie", "Rena", "Colten", "Berenice", "Sindy", "Wilma", "Amos", "Candi", "Ritchie",
+ "Dirk", "Kathlyn", "Callista", "Anona", "Flossie", "Sterling", "Calista", "Regan", "Erica", "Jeana",
+ "Keaton", "York", "Nolan", "Daniel", "Benton", "Tommie", "Serenity", "Deanna", "Chas", "Heron", "Marlyn",
+ "Xylia", "Tristin", "Lyndon", "Andriana", "Madelaine", "Maddison", "Leila", "Chantelle", "Audrey",
+ "Connor", "Daley", "Tracee", "Tilda", "Eliot", "Merle", "Linwood", "Kathryn", "Silas", "Alvina",
+ "Phinehas", "Janis", "Alvena", "Zubin", "Gwendolen", "Caitlyn", "Bertram", "Hailee", "Idelle", "Homer",
+ "Jannah", "Delbert", "Rhianna", "Cy", "Jefferson", "Wayland", "Nona", "Tempest", "Reed", "Jenifer",
+ "Ellery", "Nicolina", "Aldous", "Prince", "Lexia", "Vinnie", "Doug", "Alberic", "Kayleen", "Woody",
+ "Rosanne", "Ysabel", "Skyler", "Twyla", "Geordie", "Leta", "Clive", "Aaron", "Scottie", "Celeste", "Chuck",
+ "Erle", "Lallie", "Jaycob", "Ray", "Carrie", "Laurita", "Noreen", "Meaghan", "Ulysses", "Andy", "Drogo",
+ "Dina", "Yasmin", "Mya", "Luvenia", "Urban", "Jacob", "Laetitia", "Sherry", "Love", "Michaela", "Deonne",
+ "Summer", "Brendon", "Sheena", "Mason", "Jayson", "Linden", "Salal", "Darrell", "Diana", "Hudson",
+ "Lennon", "Isador", "Charley", "April", "Ralph", "James", "Mina", "Jolyon", "Laurine", "Monna", "Carita",
+ "Munro", "Elsdon", "Everette", "Radclyffe", "Darrin", "Herbert", "Gawain", "Sheree", "Trudy", "Emmaline",
+ "Kassandra", "Rebecca", "Basil", "Jen", "Don", "Osborne", "Lilith", "Hannah", "Fox", "Rupert", "Paulene",
+ "Darius", "Wally", "Baptist", "Sapphire", "Tia", "Sondra", "Kylee", "Ashton", "Jepson", "Joetta", "Val",
+ "Adela", "Zacharias", "Zola", "Marmaduke", "Shannah", "Posie", "Oralie", "Brittany", "Ernesta", "Raymund",
+ "Denzil", "Daren", "Roosevelt", "Nelson", "Fortune", "Mariel", "Nick", "Jaden", "Upton", "Oz", "Margaux",
+ "Precious", "Albert", "Bridger", "Jimmy", "Nicola", "Rosalynne", "Keith", "Walt", "Della", "Joanna",
+ "Xenia", "Esmeralda", "Major", "Simon", "Rexana", "Stacy", "Calanthe", "Sherley", "Kaitlyn", "Graham",
+ "Ramsey", "Abbey", "Madlyn", "Kelvin", "Bill", "Rue", "Monica", "Caileigh", "Laraine", "Booker", "Jayna",
+ "Greta", "Jervis", "Sherman", "Kendrick", "Tommy", "Iris", "Geffrey", "Kaelea", "Kerr", "Garrick", "Jep",
+ "Audley", "Nic", "Bronte", "Beulah", "Patricia", "Jewell", "Deidra", "Cory", "Everett", "Harper",
+ "Charity", "Godfrey", "Jaime", "Sinclair", "Talbot", "Dayna", "Cooper", "Rosaline", "Jennie", "Eileen",
+ "Latanya", "Corinna", "Roxie", "Caesar", "Charles", "Pollie", "Lindsey", "Sorrel", "Dwight", "Jocelyn",
+ "Weston", "Shyla", "Valorie", "Bessie", "Josh", "Lessie", "Dayton", "Kathi", "Chasity", "Wilton", "Adam",
+ "William", "Ash", "Angela", "Ivor", "Ria", "Jazmine", "Hailey", "Jo", "Silvestra", "Ernie", "Clifford",
+ "Levi", "Matilda", "Quincey", "Camilla", "Delicia", "Phemie", "Laurena", "Bambi", "Lourdes", "Royston",
+ "Chastity", "Lynwood", "Elle", "Brenda", "Phoebe", "Timothy", "Raschelle", "Lilly", "Burt", "Rina",
+ "Rodney", "Maris", "Jaron", "Wilf", "Harlan", "Audra", "Vincent", "Elwyn", "Drew", "Wynter", "Ora",
+ "Lissa", "Virgil", "Xavier", "Chad", "Ollie", "Leyton", "Karolyn", "Skye", "Roni", "Gladys", "Dinah",
+ "Penny", "August", "Osmund", "Whitaker", "Brande", "Cornell", "Phil", "Zara", "Kilie", "Gavin", "Coty",
+ "Randy", "Teri", "Keira", "Pru", "Clemency", "Kelcey", "Nevil", "Poppy", "Gareth", "Christabel", "Bastian",
+ "Wynonna", "Roselyn", "Goddard", "Collin", "Trace", "Neal", "Effie", "Denys", "Virginia", "Richard",
+ "Isiah", "Harrietta", "Gaylord", "Diamond", "Trudi", "Elaine", "Jemmy", "Gage", "Annabel", "Quincy", "Syd",
+ "Marianna", "Philomena", "Aubree", "Kathie", "Jacki", "Kelley", "Bess", "Cecil", "Maryvonne", "Kassidy",
+ "Anselm", "Dona", "Darby", "Jamison", "Daryl", "Darell", "Teal", "Lennie", "Bartholomew", "Katie",
+ "Maybelline", "Kimball", "Elvis", "Les", "Flick", "Harley", "Beth", "Bidelia", "Montague", "Helen", "Ozzy",
+ "Stef", "Debra", "Maxene", "Stefanie", "Russ", "Avril", "Johnathan", "Orson", "Chelsey", "Josephine",
+ "Deshaun", "Wendell", "Lula", "Ferdinanda", "Greg", "Brad", "Kynaston", "Dena", "Russel", "Robertina",
+ "Misti", "Leon", "Anjelica", "Bryana", "Myles", "Judi", "Curtis", "Davin", "Kristia", "Chrysanta",
+ "Hayleigh", "Hector", "Osbert", "Eustace", "Cary", "Tansy", "Cayley", "Maryann", "Alissa", "Ike",
+ "Tranter", "Reina", "Alwilda", "Sidony", "Columbine", "Astra", "Jillie", "Stephania", "Jonah", "Kennedy",
+ "Ferdinand", "Allegria", "Donella", "Kelleigh", "Darian", "Eldreda", "Jayden", "Herbie", "Jake", "Winston",
+ "Vi", "Annie", "Cherice", "Hugo", "Tricia", "Haydee", "Cassarah", "Darden", "Mallory", "Alton", "Hadley",
+ "Romayne", "Lacey", "Ern", "Alayna", "Cecilia", "Seward", "Tilly", "Edgar", "Concordia", "Ibbie", "Dahlia",
+ "Oswin", "Stu", "Brett", "Maralyn", "Kristeen", "Dotty", "Robyn", "Nessa", "Tresha", "Guinevere",
+ "Emerson", "Haze", "Lyn", "Henderson", "Lexa", "Jaylen", "Gail", "Lizette", "Tiara", "Robbie", "Destiny",
+ "Alice", "Livia", "Rosy", "Leah", "Jan", "Zach", "Vita", "Gia", "Micheal", "Rowina", "Alysha", "Bobbi",
+ "Delores", "Osmond", "Karaugh", "Wilbur", "Kasandra", "Renae", "Kaety", "Dora", "Gaye", "Amaryllis",
+ "Katelyn", "Dacre", "Prudence", "Ebony", "Camron", "Jerrold", "Vivyan", "Randall", "Donna", "Misty",
+ "Damon", "Selby", "Esmund", "Rian", "Garry", "Julius", "Raelene", "Clement", "Dom", "Tibby", "Moss",
+ "Millicent", "Gwendoline", "Berry", "Ashleigh", "Lilac", "Quin", "Vere", "Creighton", "Harriet", "Malvina",
+ "Lianne", "Pearle", "Kizzie", "Kara", "Petula", "Jeanie", "Maria", "Pacey", "Victoria", "Huey", "Toni",
+ "Rose", "Wallis", "Diggory", "Josiah", "Delma", "Keysha", "Channing", "Prue", "Lee", "Ryan", "Sidney",
+ "Valerie", "Clancy", "Ezra", "Gilbert", "Clare", "Laz", "Crofton", "Mike", "Annabella", "Tara", "Eldred",
+ "Arthur", "Jaylon", "Peronel", "Paden", "Dot", "Marian", "Amyas", "Alexus", "Esmond", "Abbie", "Stanley",
+ "Brittani", "Vickie", "Errol", "Kimberlee", "Uland", "Ebenezer", "Howie", "Eveline", "Andrea", "Trish",
+ "Hopkin", "Bryanna", "Temperance", "Valarie", "Femie", "Alix", "Terrell", "Lewin", "Lorrin", "Happy",
+ "Micah", "Rachyl", "Sloan", "Gertrude", "Elizabeth", "Dorris", "Andra", "Bram", "Gary", "Jeannine",
+ "Maurene", "Irene", "Yolonda", "Jonty", "Coleen", "Cecelia", "Chantal", "Stuart", "Caris", "Ros",
+ "Kaleigh", "Mirabelle", "Kolby", "Primrose", "Susannah", "Ginny", "Jinny", "Dolly", "Lettice", "Sonny",
+ "Melva", "Ernest", "Garret", "Reagan", "Trenton", "Gallagher", "Edwin", "Nikolas", "Corrie", "Lynette",
+ "Ettie", "Sly", "Debbi", "Eudora", "Brittney", "Tacey", "Marius", "Anima", "Gordon", "Olivia", "Kortney",
+ "Shantel", "Kolleen", "Nevaeh", "Buck", "Sera", "Liliana", "Aric", "Kalyn", "Mick", "Libby", "Ingram",
+ "Alexandria", "Darleen", "Jacklyn", "Hughie", "Tyler", "Aida", "Ronda", "Deemer", "Taryn", "Laureen",
+ "Samantha", "Dave", "Hardy", "Baldric", "Montgomery", "Gus", "Ellis", "Titania", "Luke", "Chase", "Haidee",
+ "Mayra", "Isabell", "Trinity", "Milo", "Abigail", "Tacita", "Meg", "Hervey", "Natasha", "Sadie", "Holden",
+ "Dee", "Mansel", "Perry", "Randi", "Frederica", "Georgina", "Kolour", "Debbie", "Seraphina", "Elspet",
+ "Julyan", "Raven", "Zavia", "Jarvis", "Jaymes", "Grover", "Cairo", "Alea", "Jordon", "Braxton", "Donny",
+ "Rhoda", "Tonya", "Bee", "Alyssia", "Ashlyn", "Reanna", "Lonny", "Arlene", "Deb", "Jane", "Nikole",
+ "Bettina", "Harrison", "Tamzen", "Arielle", "Adelaide", "Faith", "Bridie", "Wilburn", "Fern", "Nan",
+ "Shaw", "Zeke", "Alan", "Dene", "Gina", "Alexa", "Bailey", "Sal", "Tammy", "Maximillian", "America",
+ "Sylvana", "Fitz", "Mo", "Marissa", "Cass", "Eldon", "Wilfrid", "Tel", "Joann", "Kendra", "Tolly",
+ "Leanne", "Ferdie", "Haven", "Lucas", "Marlee", "Cyrilla", "Red", "Phoenix", "Jazmin", "Carin", "Gena",
+ "Lashonda", "Tucker", "Genette", "Kizzy", "Winifred", "Melody", "Keely", "Kaylyn", "Radcliff", "Lettie",
+ "Foster", "Lyndsey", "Nicholas", "Farley", "Louisa", "Dana", "Dortha", "Francine", "Doran", "Bonita",
+ "Hal", "Sawyer", "Reginald", "Aislin", "Nathan", "Baylee", "Abilene", "Ladonna", "Maurine", "Shelly",
+ "Deandre", "Jasmin", "Roderic", "Tiffany", "Amanda", "Verity", "Wilford", "Gayelord", "Whitney", "Demelza",
+ "Kenton", "Alberta", "Kyra", "Tabitha", "Sampson", "Korey", "Lillian", "Edison", "Clayton", "Steph",
+ "Maya", "Dusty", "Jim", "Ronny", "Adrianne", "Bernard", "Harris", "Kiley", "Alexander", "Kisha", "Ethalyn",
+ "Patience", "Briony", "Indigo", "Aureole", "Makenzie", "Molly", "Sherilyn", "Barry", "Laverne", "Hunter",
+ "Rocky", "Tyreek", "Madalyn", "Phyliss", "Chet", "Beatrice", "Faye", "Lavina", "Madelyn", "Tracey",
+ "Gyles", "Patti", "Carlyn", "Stephanie", "Jackalyn", "Larrie", "Kimmy", "Isolda", "Emelina", "Lis",
+ "Zillah", "Cody", "Sheard", "Rufus", "Paget", "Mae", "Rexanne", "Luvinia", "Tamsen", "Rosanna", "Greig",
+ "Stacia", "Mabelle", "Quianna", "Lotus", "Delice", "Bradford", "Angus", "Cosmo", "Earlene", "Adrian",
+ "Arlie", "Noelle", "Sabella", "Isa", "Adelle", "Innocent", "Kirby", "Trixie", "Kenelm", "Nelda", "Melia",
+ "Kendal", "Dorinda", "Placid", "Linette", "Kam", "Sherisse", "Evan", "Ewart", "Janice", "Linton",
+ "Jacaline", "Charissa", "Douglas", "Aileen", "Kemp", "Oli", "Amethyst", "Rosie", "Nigella", "Sherill",
+ "Anderson", "Alanna", "Eric", "Claudia", "Jennifer", "Boniface", "Harriet", "Vernon", "Lucy", "Shawnee",
+ "Gerard", "Cecily", "Romey", "Randall", "Wade", "Lux", "Dawson", "Gregg", "Kade", "Roxanne", "Melinda",
+ "Rolland", "Rowanne", "Fannie", "Isidore", "Melia", "Harvie", "Salal", "Eleonor", "Jacquette", "Lavone",
+ "Shanika", "Tarquin", "Janet", "Josslyn", "Maegan", "Augusta", "Aubree", "Francene", "Martie", "Marisa",
+ "Tyreek", "Tatianna", "Caleb", "Sheridan", "Nellie", "Barbara", "Wat", "Jayla", "Esmaralda", "Graeme",
+ "Lavena", "Jemima", "Nikolas", "Triston", "Portia", "Kyla", "Marcus", "Raeburn", "Jamison", "Earl", "Wren",
+ "Leighton", "Lagina", "Lucasta", "Dina", "Amaranta", "Jessika", "Claud", "Bernard", "Winifred", "Ebba",
+ "Sammi", "Gall", "Chloe", "Ottoline", "Herbert", "Janice", "Gareth", "Channing", "Caleigh", "Kailee",
+ "Ralphie", "Tamzen", "Quincy", "Beaumont", "Albert", "Jadyn", "Violet", "Luanna", "Moriah", "Humbert",
+ "Jed", "Leona", "Hale", "Mitch", "Marlin", "Nivek", "Darwin", "Dirk", "Liliana", "Meadow", "Bernadine",
+ "Jorie", "Peyton", "Astra", "Roscoe", "Gina", "Lovell", "Jewel", "Romayne", "Rosy", "Imogene",
+ "Margaretta", "Lorinda", "Hopkin", "Bobby", "Flossie", "Bennie", "Horatio", "Jonah", "Lyn", "Deana",
+ "Juliana", "Blanch", "Wright", "Kendal", "Woodrow", "Tania", "Austyn", "Val", "Mona", "Charla", "Rudyard",
+ "Pamela", "Raven", "Zena", "Nicola", "Kaelea", "Conor", "Virgil", "Sonnie", "Goodwin", "Christianne",
+ "Linford", "Myron", "Denton", "Charita", "Brody", "Ginnie", "Harrison", "Jeanine", "Quin", "Isolda",
+ "Zoie", "Pearce", "Margie", "Larrie", "Angelina", "Marcia", "Jessamine", "Delilah", "Dick", "Luana",
+ "Delicia", "Lake", "Luvenia", "Vaughan", "Concordia", "Gayelord", "Cheyenne", "Felix", "Dorris", "Pen",
+ "Kristeen", "Parris", "Everitt", "Josephina", "Amy", "Tommie", "Adrian", "April", "Rosaline", "Zachery",
+ "Trace", "Phoebe", "Jenelle", "Kameron", "Katharine", "Media", "Colton", "Tad", "Quianna", "Kerenza",
+ "Greta", "Luvinia", "Pete", "Tonya", "Beckah", "Barbra", "Jon", "Tetty", "Corey", "Sylvana", "Kizzy",
+ "Korey", "Trey", "Haydee", "Penny", "Mandy", "Panda", "Coline", "Ramsey", "Sukie", "Annabel", "Sarina",
+ "Corbin", "Suzanna", "Rob", "Duana", "Shell", "Jason", "Eddy", "Rube", "Roseann", "Celia", "Brianne",
+ "Nerissa", "Jera", "Humphry", "Ashlynn", "Terrence", "Philippina", "Coreen", "Kolour", "Indiana", "Paget",
+ "Marlyn", "Hester", "Isbel", "Ocean", "Harris", "Leslie", "Vere", "Monroe", "Isabelle", "Bertie", "Clitus",
+ "Dave", "Alethea", "Lessie", "Louiza", "Madlyn", "Garland", "Wolf", "Lalo", "Donny", "Amabel", "Tianna",
+ "Louie", "Susie", "Mackenzie", "Renie", "Tess", "Marmaduke", "Gwendolen", "Bettina", "Beatrix", "Esmund",
+ "Minnie", "Carlie", "Barnabas", "Ruthie", "Honour", "Haylie", "Xavior", "Freddie", "Ericka", "Aretha",
+ "Edie", "Madelina", "Anson", "Tabby", "Derrick", "Jocosa", "Deirdre", "Aislin", "Chastity", "Abigail",
+ "Wynonna", "Zo", "Eldon", "Krystine", "Ghislaine", "Zavia", "Nolene", "Marigold", "Kelley", "Sylvester",
+ "Odell", "George", "Laurene", "Franklyn", "Clarice", "Mo", "Dustin", "Debbi", "Lina", "Tony", "Acacia",
+ "Hettie", "Natalee", "Marcie", "Brittany", "Elnora", "Rachel", "Dawn", "Basil", "Christal", "Anjelica",
+ "Fran", "Tawny", "Delroy", "Tameka", "Lillie", "Ceara", "Deanna", "Deshaun", "Ken", "Bradford", "Justina",
+ "Merle", "Draven", "Gretta", "Harriette", "Webster", "Nathaniel", "Anemone", "Coleen", "Ruth", "Chryssa",
+ "Hortensia", "Saffie", "Deonne", "Leopold", "Harlan", "Lea", "Eppie", "Lucinda", "Tilda", "Fanny", "Titty",
+ "Lockie", "Jepson", "Sherisse", "Maralyn", "Ethel", "Sly", "Ebenezer", "Canute", "Ella", "Freeman",
+ "Reuben", "Olivette", "Nona", "Rik", "Amice", "Kristine", "Kathie", "Jayne", "Jeri", "Mckenna", "Bertram",
+ "Kaylee", "Livia", "Gil", "Wallace", "Maryann", "Keeleigh", "Laurinda", "Doran", "Khloe", "Dakota",
+ "Yaron", "Kimberleigh", "Gytha", "Doris", "Marylyn", "Benton", "Linnette", "Esther", "Jakki", "Rowina",
+ "Marian", "Roselyn", "Norbert", "Maggie", "Caesar", "Phinehas", "Jerry", "Jasmine", "Antonette", "Miriam",
+ "Monna", "Maryvonne", "Jacquetta", "Bernetta", "Napier", "Annie", "Gladwin", "Sheldon", "Aric", "Elouise",
+ "Gawain", "Kristia", "Gabe", "Kyra", "Red", "Tod", "Dudley", "Lorraine", "Ryley", "Sabina", "Poppy",
+ "Leland", "Aileen", "Eglantine", "Alicia", "Jeni", "Addy", "Tiffany", "Geffrey", "Lavina", "Collin",
+ "Clover", "Vin", "Jerome", "Doug", "Vincent", "Florence", "Scarlet", "Celeste", "Desdemona", "Tiphanie",
+ "Kassandra", "Ashton", "Madison", "Art", "Magdalene", "Iona", "Josepha", "Anise", "Ferne", "Derek",
+ "Huffie", "Qiana", "Ysabel", "Tami", "Shannah", "Xavier", "Willard", "Winthrop", "Vickie", "Maura",
+ "Placid", "Tiara", "Reggie", "Elissa", "Isa", "Chrysanta", "Jeff", "Bessie", "Terri", "Amilia", "Brett",
+ "Daniella", "Damion", "Carolina", "Maximillian", "Travers", "Benjamin", "Oprah", "Darcy", "Yolanda",
+ "Nicolina", "Crofton", "Jarrett", "Kaitlin", "Shauna", "Keren", "Bevis", "Kalysta", "Sharron", "Alyssa",
+ "Blythe", "Zelma", "Caelie", "Norwood", "Billie", "Patrick", "Gary", "Cambria", "Tylar", "Mason", "Helen",
+ "Melyssa", "Gene", "Gilberta", "Carter", "Herbie", "Harmonie", "Leola", "Eugenia", "Clint", "Pauletta",
+ "Edwyna", "Georgina", "Teal", "Harper", "Izzy", "Dillon", "Kezia", "Evangeline", "Colene", "Madelaine",
+ "Zilla", "Rudy", "Dottie", "Caris", "Morton", "Marge", "Tacey", "Parker", "Troy", "Liza", "Lewin",
+ "Tracie", "Justine", "Dallas", "Linden", "Ray", "Loretta", "Teri", "Elvis", "Diane", "Julianna", "Manfred",
+ "Denise", "Eireen", "Ann", "Kenith", "Linwood", "Kathlyn", "Bernice", "Shelley", "Oswald", "Amedeus",
+ "Homer", "Tanzi", "Ted", "Ralphina", "Hyacinth", "Lotus", "Matthias", "Arlette", "Clark", "Cecil",
+ "Elspeth", "Alvena", "Noah", "Millard", "Brenden", "Cole", "Philipa", "Nina", "Thelma", "Iantha", "Reid",
+ "Jefferson", "Meg", "Elsie", "Shirlee", "Nathan", "Nancy", "Simona", "Racheal", "Carin", "Emory", "Delice",
+ "Kristi", "Karaugh", "Kaety", "Tilly", "Em", "Alanis", "Darrin", "Jerrie", "Hollis", "Cary", "Marly",
+ "Carita", "Jody", "Farley", "Hervey", "Rosalin", "Cuthbert", "Stewart", "Jodene", "Caileigh", "Briscoe",
+ "Dolores", "Sheree", "Eustace", "Nigel", "Detta", "Barret", "Rowland", "Kenny", "Githa", "Zoey", "Adela",
+ "Petronella", "Opal", "Coleman", "Niles", "Cyril", "Dona", "Alberic", "Allannah", "Jules", "Avalon",
+ "Hadley", "Thomas", "Renita", "Calanthe", "Heron", "Shawnda", "Chet", "Malina", "Manny", "Rina", "Frieda",
+ "Eveleen", "Deshawn", "Amos", "Raelene", "Paige", "Molly", "Nannie", "Ileen", "Brendon", "Milford",
+ "Unice", "Rebeccah", "Caedmon", "Gae", "Doreen", "Vivian", "Louis", "Raphael", "Vergil", "Lise", "Glenn",
+ "Karyn", "Terance", "Reina", "Jake", "Gordon", "Wisdom", "Isiah", "Gervase", "Fern", "Marylou", "Roddy",
+ "Justy", "Derick", "Shantelle", "Adam", "Chantel", "Madoline", "Emmerson", "Lexie", "Mickey", "Stephen",
+ "Dane", "Stacee", "Elwin", "Tracey", "Alexandra", "Ricky", "Ian", "Kasey", "Rita", "Alanna", "Georgene",
+ "Deon", "Zavier", "Ophelia", "Deforest", "Lowell", "Zubin", "Hardy", "Osmund", "Tabatha", "Debby",
+ "Katlyn", "Tallulah", "Priscilla", "Braden", "Wil", "Keziah", "Jen", "Aggie", "Korbin", "Lemoine",
+ "Barnaby", "Tranter", "Goldie", "Roderick", "Trina", "Emery", "Pris", "Sidony", "Adelle", "Tate", "Wilf",
+ "Zola", "Brande", "Chris", "Calanthia", "Lilly", "Kaycee", "Lashonda", "Jasmin", "Elijah", "Shantel",
+ "Simon", "Rosalind", "Jarod", "Kaylie", "Corrine", "Joselyn", "Archibald", "Mariabella", "Winton",
+ "Merlin", "Chad", "Ursula", "Kristopher", "Hewie", "Adrianna", "Lyndsay", "Jasmyn", "Tim", "Evette",
+ "Margaret", "Samson", "Bronte", "Terence", "Leila", "Candice", "Tori", "Jamey", "Coriander", "Conrad",
+ "Floyd", "Karen", "Lorin", "Maximilian", "Cairo", "Emily", "Yasmin", "Karolyn", "Bryan", "Lanny",
+ "Kimberly", "Rick", "Chaz", "Krystle", "Lyric", "Laura", "Garrick", "Flip", "Monty", "Brendan",
+ "Ermintrude", "Rayner", "Merla", "Titus", "Marva", "Patricia", "Leone", "Tracy", "Jaqueline", "Hallam",
+ "Delores", "Cressida", "Carlyle", "Leann", "Kelcey", "Laurence", "Ryan", "Reynold", "Mark", "Collyn",
+ "Audie", "Sammy", "Ellery", "Sallie", "Pamelia", "Adolph", "Lydia", "Titania", "Ron", "Bridger", "Aline",
+ "Read", "Kelleigh", "Weldon", "Irving", "Garey", "Diggory", "Evander", "Kylee", "Deidre", "Ormond",
+ "Laurine", "Reannon", "Arline", "Pat"
+
+ };
+
+ public static String[] jargon = { "wireless", "signal", "network", "3G", "plan", "touch-screen",
+ "customer-service", "reachability", "voice-command", "shortcut-menu", "customization", "platform", "speed",
+ "voice-clarity", "voicemail-service" };
+
+ public static String[] vendors = { "at&t", "verizon", "t-mobile", "sprint", "motorola", "samsung", "iphone" };
+
+ public static String[] org_list = { "Latsonity", "ganjalax", "Zuncan", "Lexitechno", "Hot-tech", "subtam",
+ "Coneflex", "Ganjatax", "physcane", "Tranzap", "Qvohouse", "Zununoing", "jaydax", "Keytech", "goldendexon",
+ "Villa-tech", "Trustbam", "Newcom", "Voltlane", "Ontohothex", "Ranhotfan", "Alphadax", "Transhigh",
+ "kin-ron", "Doublezone", "Solophase", "Vivaace", "silfind", "Basecone", "sonstreet", "Freshfix",
+ "Techitechi", "Kanelectrics", "linedexon", "Goldcity", "Newfase", "Technohow", "Zimcone", "Salthex",
+ "U-ron", "Solfix", "whitestreet", "Xx-technology", "Hexviafind", "over-it", "Strongtone", "Tripplelane",
+ "geomedia", "Scotcity", "Inchex", "Vaiatech", "Striptaxon", "Hatcom", "tresline", "Sanjodax", "freshdox",
+ "Sumlane", "Quadlane", "Newphase", "overtech", "Voltbam", "Icerunin", "Fixdintex", "Hexsanhex", "Statcode",
+ "Greencare", "U-electrics", "Zamcorporation", "Ontotanin", "Tanzimcare", "Groovetex", "Ganjastrip",
+ "Redelectronics", "Dandamace", "Whitemedia", "strongex", "Streettax", "highfax", "Mathtech", "Xx-drill",
+ "Sublamdox", "Unijobam", "Rungozoom", "Fixelectrics", "Villa-dox", "Ransaofan", "Plexlane", "itlab",
+ "Lexicone", "Fax-fax", "Viatechi", "Inchdox", "Kongreen", "Doncare", "Y-geohex", "Opeelectronics",
+ "Medflex", "Dancode", "Roundhex", "Labzatron", "Newhotplus", "Sancone", "Ronholdings", "Quoline",
+ "zoomplus", "Fix-touch", "Codetechno", "Tanzumbam", "Indiex", "Canline" };
+}
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator2.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator2.java
new file mode 100644
index 0000000..703aacb
--- /dev/null
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator2.java
@@ -0,0 +1,165 @@
+package edu.uci.ics.asterix.tools.external.data;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.tools.external.data.DataGenerator2.InitializationInfo;
+import edu.uci.ics.asterix.tools.external.data.DataGenerator2.TweetMessage;
+import edu.uci.ics.asterix.tools.external.data.DataGenerator2.TweetMessageIterator;
+
+public class TweetGenerator2 {
+
+ private static final Logger LOGGER = Logger.getLogger(TweetGenerator.class.getName());
+
+ public static final String KEY_DURATION = "duration";
+ public static final String KEY_TPS = "tps";
+ public static final String KEY_TPUT_DURATION = "tput-duration";
+
+ public static final String OUTPUT_FORMAT = "output-format";
+
+ public static final String OUTPUT_FORMAT_ARECORD = "arecord";
+ public static final String OUTPUT_FORMAT_ADM_STRING = "adm-string";
+
+ private int duration;
+ private long tweetInterval;
+ private int numTweetsBeforeDelay;
+ private TweetMessageIterator tweetIterator = null;
+ private long exeptionInterval;
+
+
+
+ private int partition;
+ private int tweetCount = 0;
+ private int frameTweetCount = 0;
+ private int numFlushedTweets = 0;
+
+ public int getTweetCount() {
+ return tweetCount;
+ }
+
+ private int exceptionPeriod = -1;
+ private boolean isOutputFormatRecord = false;
+ private byte[] EOL = "\n".getBytes();
+ private OutputStream os;
+ private DataGenerator2 dataGenerator = null;
+ private ByteBuffer outputBuffer = ByteBuffer.allocate(32 * 1024);
+ private int flushedTweetCount = 0;
+
+ public TweetGenerator2(Map<String, String> configuration, int partition, String format) throws Exception {
+ String value = configuration.get(KEY_DURATION);
+ duration = value != null ? Integer.parseInt(value) : 60;
+ initializeTweetRate(configuration.get(KEY_TPS));
+ if (value != null) {
+ exceptionPeriod = Integer.parseInt(value);
+ }
+
+ isOutputFormatRecord = format.equalsIgnoreCase(OUTPUT_FORMAT_ARECORD);
+ InitializationInfo info = new InitializationInfo();
+ info.timeDurationInSecs = duration;
+ dataGenerator = new DataGenerator2(info);
+ tweetIterator = dataGenerator.new TweetMessageIterator(duration);
+ }
+
+ private void initializeTweetRate(String tps) {
+ numTweetsBeforeDelay = 0;
+ if (tps == null) {
+ tweetInterval = 0;
+ } else {
+ int val = Integer.parseInt(tps);
+ double interval = new Double(((double) 1000 / val));
+ if (interval > 1) {
+ tweetInterval = (long) interval;
+ numTweetsBeforeDelay = 1;
+ } else {
+ tweetInterval = 1;
+ Double numTweets = new Double(1 / interval);
+ if (numTweets.intValue() != numTweets) {
+ tweetInterval = 5;
+ numTweetsBeforeDelay = (new Double(10 * numTweets * 1)).intValue();
+ } else {
+ numTweetsBeforeDelay = new Double((numTweets * 1)).intValue();
+ }
+ }
+ }
+
+ }
+
+ private void writeTweetString(TweetMessage next) throws IOException {
+ String tweet = next.toString() + "\n";
+ tweetCount++;
+ byte[] b = tweet.getBytes();
+ if (outputBuffer.position() + b.length > outputBuffer.limit()) {
+ flush();
+ numFlushedTweets += frameTweetCount;
+ frameTweetCount = 0;
+ flushedTweetCount += tweetCount - 1;
+ outputBuffer.put(tweet.getBytes());
+ frameTweetCount++;
+
+ } else {
+ outputBuffer.put(tweet.getBytes());
+ frameTweetCount++;
+ }
+ }
+
+ public int getNumFlushedTweets() {
+ return numFlushedTweets;
+ }
+
+ public int getFrameTweetCount() {
+ return frameTweetCount;
+ }
+
+ private void flush() throws IOException {
+ outputBuffer.flip();
+ os.write(outputBuffer.array(), 0, outputBuffer.limit());
+ outputBuffer.position(0);
+ outputBuffer.limit(32 * 1024);
+ }
+
+ private void writeTweetRecord(TweetMessage next) {
+ throw new UnsupportedOperationException("Invalid format");
+ }
+
+ public boolean setNextRecord() throws Exception {
+ boolean moreData = tweetIterator.hasNext();
+ if (!moreData) {
+ return false;
+ }
+ TweetMessage msg = tweetIterator.next();
+ if (isOutputFormatRecord) {
+ writeTweetRecord(msg);
+ } else {
+ writeTweetString(msg);
+ }
+ if (tweetInterval != 0) {
+ tweetCount++;
+ if (tweetCount == numTweetsBeforeDelay) {
+ Thread.sleep(tweetInterval);
+ tweetCount = 0;
+ }
+ }
+ return true;
+ }
+
+ public boolean setNextRecordBatch(int numTweetsInBatch) throws Exception {
+ int count = 0;
+ // if (tweetIterator.hasNext()) {
+ while (count < numTweetsInBatch) {
+ writeTweetString(tweetIterator.next());
+ count++;
+ }
+ // } else {
+ // System.out.println("Flushing last batch, count so far:" + tweetCount);
+ // flush();
+ /// }
+ return true;
+ }
+
+ public void setOutputStream(OutputStream os) {
+ this.os = os;
+ }
+}
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
index d59a09e..94250d2 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
@@ -6,12 +6,15 @@
import java.net.ServerSocket;
import java.net.Socket;
import java.net.UnknownHostException;
+import java.util.Date;
import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.external.dataset.adapter.IPullBasedFeedClient.InflowState;
import edu.uci.ics.asterix.external.dataset.adapter.StreamBasedAdapter;
import edu.uci.ics.asterix.metadata.feeds.IFeedAdapter;
import edu.uci.ics.asterix.om.types.ARecordType;
@@ -37,16 +40,19 @@
private static final String LOCALHOST = "127.0.0.1";
private static final int PORT = 2909;
+ private ExecutorService executorService = Executors.newCachedThreadPool();
+
public TwitterFirehoseFeedAdapter(Map<String, String> configuration, ITupleParserFactory parserFactory,
- ARecordType outputtype, IHyracksTaskContext ctx) throws AsterixException, IOException {
+ ARecordType outputtype, IHyracksTaskContext ctx) throws Exception {
super(parserFactory, outputtype, ctx);
- this.twitterServer = new TwitterServer(configuration, outputtype);
+ this.twitterServer = new TwitterServer(configuration, outputtype, executorService);
this.twitterClient = new TwitterClient(twitterServer.getPort());
}
@Override
public void start(int partition, IFrameWriter writer) throws Exception {
twitterServer.start();
+ twitterServer.getListener().setPartition(partition);
twitterClient.start();
super.start(partition, writer);
}
@@ -60,9 +66,10 @@
private ServerSocket serverSocket;
private final Listener listener;
private int port = -1;
+ private ExecutorService executorService;
- public TwitterServer(Map<String, String> configuration, ARecordType outputtype) throws IOException,
- AsterixException {
+ public TwitterServer(Map<String, String> configuration, ARecordType outputtype, ExecutorService executorService)
+ throws Exception {
int numAttempts = 0;
while (port < 0) {
try {
@@ -78,16 +85,22 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Twitter server configured to use port: " + port);
}
- listener = new Listener(serverSocket, configuration, outputtype);
+ String dvds = configuration.get("dataverse-dataset");
+ listener = new Listener(serverSocket, configuration, outputtype, dvds);
+ this.executorService = executorService;
+ }
+
+ public Listener getListener() {
+ return listener;
}
public void start() {
- Thread t = new Thread(listener);
- t.start();
+ executorService.execute(listener);
}
- public void stop() {
+ public void stop() throws IOException {
listener.stop();
+ serverSocket.close();
}
public int getPort() {
@@ -112,40 +125,86 @@
public void start() throws UnknownHostException, IOException {
socket = new Socket(LOCALHOST, port);
}
+
}
private static class Listener implements Runnable {
private final ServerSocket serverSocket;
private Socket socket;
- private TweetGenerator tweetGenerator;
+ private TweetGenerator2 tweetGenerator;
private boolean continuePush = true;
+ private int tps;
+ private int tputDuration;
+ private int partition;
+ private Rate task;
+ private Mode mode;
- public Listener(ServerSocket serverSocket, Map<String, String> configuration, ARecordType outputtype)
- throws IOException, AsterixException {
+ public static final String KEY_MODE = "mode";
+
+ public static enum Mode {
+ AGGRESSIVE,
+ CONTROLLED
+ }
+
+ public void setPartition(int partition) {
+ this.partition = partition;
+ task.setPartition(partition);
+ }
+
+ public Listener(ServerSocket serverSocket, Map<String, String> configuration, ARecordType outputtype,
+ String datasetName) throws Exception {
this.serverSocket = serverSocket;
- this.tweetGenerator = new TweetGenerator(configuration, outputtype, 0,
- TweetGenerator.OUTPUT_FORMAT_ADM_STRING);
+ this.tweetGenerator = new TweetGenerator2(configuration, 0, TweetGenerator.OUTPUT_FORMAT_ADM_STRING);
+ tps = Integer.parseInt(configuration.get(TweetGenerator2.KEY_TPS));
+ tputDuration = Integer.parseInt(configuration.get(TweetGenerator2.KEY_TPUT_DURATION));
+ task = new Rate(tweetGenerator, tputDuration, datasetName, partition);
+ String value = configuration.get(KEY_MODE);
+ if (value != null) {
+ mode = Mode.valueOf(value.toUpperCase());
+ } else {
+ mode = Mode.AGGRESSIVE;
+ }
+
}
@Override
public void run() {
while (true) {
- InflowState state = InflowState.DATA_AVAILABLE;
try {
socket = serverSocket.accept();
OutputStream os = socket.getOutputStream();
tweetGenerator.setOutputStream(os);
- while (state.equals(InflowState.DATA_AVAILABLE) && continuePush) {
- state = tweetGenerator.setNextRecord();
+ boolean moreData = true;
+ Timer timer = new Timer();
+ timer.schedule(task, tputDuration * 1000, tputDuration * 1000);
+ long startBatch;
+ long endBatch;
+ while (moreData && continuePush) {
+ startBatch = System.currentTimeMillis();
+ moreData = tweetGenerator.setNextRecordBatch(tps);
+ endBatch = System.currentTimeMillis();
+ if (mode.equals(Mode.CONTROLLED)) {
+ if (endBatch - startBatch < 1000) {
+ Thread.sleep(1000 - (endBatch - startBatch));
+ }
+ }
}
+ timer.cancel();
os.close();
break;
} catch (Exception e) {
-
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Exception in adaptor " + e.getMessage());
+ }
} finally {
try {
- socket.close();
+ if (socket != null && socket.isClosed()) {
+ socket.close();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Closed socket:" + socket.getPort());
+ }
+ }
} catch (IOException e) {
e.printStackTrace();
}
@@ -158,6 +217,45 @@
continuePush = false;
}
+ private static class Rate extends TimerTask {
+
+ private TweetGenerator2 gen;
+ int prevMeasuredTweets = 0;
+ private int tputDuration;
+ private int partition;
+ private String dataset;
+
+ public Rate(TweetGenerator2 gen, int tputDuration, String dataset, int partition) {
+ this.gen = gen;
+ this.tputDuration = tputDuration;
+ this.dataset = dataset;
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning(new Date() + " " + "Dataset" + " " + "partition" + " " + "Total flushed tweets"
+ + "\t" + "intantaneous throughput");
+ }
+ }
+
+ @Override
+ public void run() {
+
+ int currentMeasureTweets = gen.getNumFlushedTweets();
+
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning(dataset + " " + partition + " " + gen.getNumFlushedTweets() + "\t"
+ + ((currentMeasureTweets - prevMeasuredTweets) / tputDuration) + " ID "
+ + Thread.currentThread().getId());
+ }
+
+ prevMeasuredTweets = currentMeasureTweets;
+
+ }
+
+ public void setPartition(int partition) {
+ this.partition = partition;
+ }
+
+ }
+
}
@Override
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
index b0ff634..8bb0146 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
@@ -28,6 +28,7 @@
import edu.uci.ics.asterix.metadata.entities.Dataset;
import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
import edu.uci.ics.asterix.metadata.entities.NodeGroup;
+import edu.uci.ics.asterix.metadata.feeds.AdapterRuntimeManager;
import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
import edu.uci.ics.asterix.om.types.ARecordType;