checkpoint
diff --git a/asterix-events/src/main/java/edu/uci/ics/asterix/event/util/AsterixConstants.java b/asterix-events/src/main/java/edu/uci/ics/asterix/event/util/AsterixConstants.java
index ff600ce..c29a96e 100644
--- a/asterix-events/src/main/java/edu/uci/ics/asterix/event/util/AsterixConstants.java
+++ b/asterix-events/src/main/java/edu/uci/ics/asterix/event/util/AsterixConstants.java
@@ -1,3 +1,17 @@
+/*
+x * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package edu.uci.ics.asterix.event.util;
public class AsterixConstants {
diff --git a/asterix-events/src/main/java/edu/uci/ics/asterix/event/util/PatternCreator.java b/asterix-events/src/main/java/edu/uci/ics/asterix/event/util/PatternCreator.java
index e06d66c..9af9307 100644
--- a/asterix-events/src/main/java/edu/uci/ics/asterix/event/util/PatternCreator.java
+++ b/asterix-events/src/main/java/edu/uci/ics/asterix/event/util/PatternCreator.java
@@ -499,6 +499,9 @@
for (Node node : cluster.getNode()) {
nodeid = new Nodeid(new Value(null, node.getId()));
+ if (node.getLogDir() != null) {
+ pargs = node.getLogDir();
+ }
event = new Event("file_delete", nodeid, pargs);
patternList.add(new Pattern(null, 1, null, event));
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
index bf89ccb..fa66715 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
@@ -19,7 +19,7 @@
import java.util.List;
import java.util.Map;
-import edu.uci.ics.asterix.external.dataset.adapter.CNNFeedAdapter;
+import edu.uci.ics.asterix.external.dataset.adapter.RSSFeedAdapter;
import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
import edu.uci.ics.asterix.om.types.ARecordType;
@@ -81,7 +81,7 @@
@Override
public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
- CNNFeedAdapter cnnFeedAdapter = new CNNFeedAdapter(configuration, recordType, ctx);
+ RSSFeedAdapter cnnFeedAdapter = new RSSFeedAdapter(configuration, recordType, ctx);
return cnnFeedAdapter;
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
index 054fee3..669dc61 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
@@ -14,49 +14,23 @@
*/
package edu.uci.ics.asterix.external.adapter.factory;
-import java.util.Arrays;
-import java.util.HashMap;
import java.util.Map;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-
import edu.uci.ics.asterix.external.dataset.adapter.HDFSAdapter;
import edu.uci.ics.asterix.external.dataset.adapter.HiveAdapter;
import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
import edu.uci.ics.asterix.metadata.feeds.IGenericAdapterFactory;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
-import edu.uci.ics.hyracks.hdfs.dataflow.InputSplitsFactory;
-import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
/**
* A factory class for creating an instance of HiveAdapter
*/
-@SuppressWarnings("deprecation")
public class HiveAdapterFactory extends StreamBasedAdapterFactory implements IGenericAdapterFactory {
private static final long serialVersionUID = 1L;
- public static final String HDFS_ADAPTER_NAME = "hdfs";
- public static final String CLUSTER_LOCATIONS = "cluster-locations";
- public static transient String SCHEDULER = "hdfs-scheduler";
-
- public static final String KEY_HDFS_URL = "hdfs";
- public static final String KEY_PATH = "path";
- public static final String KEY_INPUT_FORMAT = "input-format";
- public static final String INPUT_FORMAT_TEXT = "text-input-format";
- public static final String INPUT_FORMAT_SEQUENCE = "sequence-input-format";
-
- public static final String KEY_FORMAT = "format";
- public static final String KEY_PARSER_FACTORY = "parser";
- public static final String FORMAT_DELIMITED_TEXT = "delimited-text";
- public static final String FORMAT_ADM = "adm";
-
public static final String HIVE_DATABASE = "database";
public static final String HIVE_TABLE = "table";
public static final String HIVE_HOME = "hive-home";
@@ -64,30 +38,19 @@
public static final String HIVE_WAREHOUSE_DIR = "warehouse-dir";
public static final String HIVE_METASTORE_RAWSTORE_IMPL = "rawstore-impl";
- private String[] readSchedule;
- private boolean executed[];
- private InputSplitsFactory inputSplitsFactory;
- private ConfFactory confFactory;
- private transient AlgebricksPartitionConstraint clusterLocations;
+ private HDFSAdapterFactory hdfsAdapterFactory;
+ private HDFSAdapter hdfsAdapter;
private boolean configured = false;
private IAType atype;
- private static final Map<String, String> formatClassNames = initInputFormatMap();
-
- private static Map<String, String> initInputFormatMap() {
- Map<String, String> formatClassNames = new HashMap<String, String>();
- formatClassNames.put(INPUT_FORMAT_TEXT, "org.apache.hadoop.mapred.TextInputFormat");
- formatClassNames.put(INPUT_FORMAT_SEQUENCE, "org.apache.hadoop.mapred.SequenceFileInputFormat");
- return formatClassNames;
+ public HiveAdapterFactory() {
+ hdfsAdapterFactory = new HDFSAdapterFactory();
}
@Override
public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
- JobConf conf = confFactory.getConf();
- InputSplit[] inputSplits = inputSplitsFactory.getSplits();
- String nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
- HiveAdapter hiveAdapter = new HiveAdapter(atype, readSchedule, executed, inputSplits, conf, clusterLocations,
- nodeName, parserFactory, ctx);
+ hdfsAdapter = (HDFSAdapter) hdfsAdapterFactory.createAdapter(ctx, partition);
+ HiveAdapter hiveAdapter = new HiveAdapter(atype, hdfsAdapter, parserFactory, ctx);
return hiveAdapter;
}
@@ -109,32 +72,12 @@
@Override
public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
if (!configured) {
- /** set up the factory --serializable stuff --- this if-block should be called only once for each factory instance */
- configureJobConf(configuration);
- JobConf conf = configureJobConf(configuration);
- confFactory = new ConfFactory(conf);
-
- clusterLocations = AsterixClusterProperties.INSTANCE.getClusterLocations();
- int numPartitions = ((AlgebricksAbsolutePartitionConstraint) clusterLocations).getLocations().length;
-
- InputSplit[] inputSplits = conf.getInputFormat().getSplits(conf, numPartitions);
- inputSplitsFactory = new InputSplitsFactory(inputSplits);
-
- Scheduler scheduler = HDFSAdapterFactory.hdfsScheduler;
- readSchedule = scheduler.getLocationConstraints(inputSplits);
- executed = new boolean[readSchedule.length];
- Arrays.fill(executed, false);
-
- atype = (IAType) outputType;
- configureFormat(atype);
- configured = true;
+ populateConfiguration(configuration);
+ hdfsAdapterFactory.configure(configuration, outputType);
}
-
}
- private JobConf configureJobConf(Map<String, String> configuration) throws Exception {
- JobConf conf = new JobConf();
-
+ private void populateConfiguration(Map<String, String> configuration) throws Exception {
/** configure hive */
String database = (String) configuration.get(HIVE_DATABASE);
String tablePath = null;
@@ -154,21 +97,11 @@
throw new IllegalArgumentException("file input format"
+ configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT) + " is not supported");
}
-
- /** configure hdfs */
- conf.set("fs.default.name", ((String) configuration.get(KEY_HDFS_URL)).trim());
- conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
- conf.setClassLoader(HDFSAdapter.class.getClassLoader());
- conf.set("mapred.input.dir", ((String) configuration.get(KEY_PATH)).trim());
- conf.set("mapred.input.format.class",
- (String) formatClassNames.get(((String) configuration.get(KEY_INPUT_FORMAT)).trim()));
- return conf;
}
@Override
public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
- // TODO Auto-generated method stub
- return null;
+ return hdfsAdapterFactory.getPartitionConstraint();
}
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java
deleted file mode 100644
index 017b511..0000000
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.external.dataset.adapter;
-
-import java.util.Map;
-
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
-import edu.uci.ics.asterix.metadata.feeds.IFeedAdapter;
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-
-/**
- * An Adapter that provides the functionality of fetching news feed from CNN service
- * The Adapter provides news feed as ADM records.
- */
-public class CNNFeedAdapter extends RSSFeedAdapter implements IDatasourceAdapter, IFeedAdapter {
-
- private static final long serialVersionUID = 1L;
-
- public CNNFeedAdapter(Map<String, String> configuration, ARecordType recordType, IHyracksTaskContext ctx)
- throws AsterixException {
- super(configuration, recordType, ctx);
- }
-
-}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
index 1e694e2..1a046a5 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
@@ -27,7 +27,6 @@
import org.apache.hadoop.mapred.TextInputFormat;
import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java
index f4ff44e..6280635 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java
@@ -17,11 +17,7 @@
import java.io.IOException;
import java.io.InputStream;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-
import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
@@ -29,26 +25,16 @@
/**
* Provides the functionality of fetching data in form of ADM records from a Hive dataset.
*/
-@SuppressWarnings("deprecation")
public class HiveAdapter extends FileSystemBasedAdapter {
private static final long serialVersionUID = 1L;
- public static final String HIVE_DATABASE = "database";
- public static final String HIVE_TABLE = "table";
- public static final String HIVE_HOME = "hive-home";
- public static final String HIVE_METASTORE_URI = "metastore-uri";
- public static final String HIVE_WAREHOUSE_DIR = "warehouse-dir";
- public static final String HIVE_METASTORE_RAWSTORE_IMPL = "rawstore-impl";
-
private HDFSAdapter hdfsAdapter;
- public HiveAdapter(IAType atype, String[] readSchedule, boolean[] executed, InputSplit[] inputSplits, JobConf conf,
- AlgebricksPartitionConstraint clusterLocations, String nodeName, ITupleParserFactory parserFactory,
- IHyracksTaskContext ctx) throws HyracksDataException {
+ public HiveAdapter(IAType atype, HDFSAdapter hdfsAdapter, ITupleParserFactory parserFactory, IHyracksTaskContext ctx)
+ throws HyracksDataException {
super(parserFactory, atype, ctx);
- this.hdfsAdapter = new HDFSAdapter(atype, readSchedule, executed, inputSplits, conf, nodeName, parserFactory,
- ctx);
+ this.hdfsAdapter = hdfsAdapter;
}
@Override
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IPullBasedFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IPullBasedFeedClient.java
index 2e79679..be3a2fd 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IPullBasedFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IPullBasedFeedClient.java
@@ -39,14 +39,4 @@
*/
public InflowState nextTuple(DataOutput dataOutput, int timeout) throws AsterixException;
- /**
- * Provides logic for any corrective action that feed client needs to execute on
- * encountering an exception.
- *
- * @param e
- * The exception encountered during fetching of data from external source
- * @throws AsterixException
- */
- public void resetOnFailure(Exception e) throws AsterixException;
-
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
index 3f3ca50..193cce4 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
@@ -20,9 +20,8 @@
import java.util.logging.Logger;
import edu.uci.ics.asterix.external.dataset.adapter.IPullBasedFeedClient.InflowState;
-import edu.uci.ics.asterix.metadata.feeds.AbstractFeedDatasourceAdapter;
-import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
-import edu.uci.ics.asterix.metadata.feeds.IFeedAdapter;
+import edu.uci.ics.asterix.metadata.feeds.FeedPolicyEnforcer;
+import edu.uci.ics.asterix.metadata.feeds.IPullBasedFeedAdapter;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -36,8 +35,7 @@
* the common logic for obtaining bytes from an external source and packing them
* into frames as tuples.
*/
-public abstract class PullBasedAdapter extends AbstractFeedDatasourceAdapter implements IDatasourceAdapter,
- IFeedAdapter {
+public abstract class PullBasedAdapter implements IPullBasedFeedAdapter {
private static final long serialVersionUID = 1L;
private static final Logger LOGGER = Logger.getLogger(PullBasedAdapter.class.getName());
@@ -55,6 +53,16 @@
private final IHyracksTaskContext ctx;
private int frameTupleCount = 0;
+ protected FeedPolicyEnforcer policyEnforcer;
+
+ public FeedPolicyEnforcer getPolicyEnforcer() {
+ return policyEnforcer;
+ }
+
+ public void setFeedPolicyEnforcer(FeedPolicyEnforcer policyEnforcer) {
+ this.policyEnforcer = policyEnforcer;
+ }
+
public abstract IPullBasedFeedClient getFeedClient(int partition) throws Exception;
public PullBasedAdapter(Map<String, String> configuration, IHyracksTaskContext ctx) {
@@ -112,7 +120,6 @@
failureException.printStackTrace();
boolean continueIngestion = policyEnforcer.continueIngestionPostSoftwareFailure(failureException);
if (continueIngestion) {
- pullBasedFeedClient.resetOnFailure(failureException);
tupleBuilder.reset();
continue;
} else {
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
index 5aafef4..e9cc21b 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
@@ -49,4 +49,9 @@
return recordType;
}
+ @Override
+ public DataExchangeMode getDataExchangeMode() {
+ return DataExchangeMode.PULL;
+ }
+
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
index 0cd14b8..75f8fba 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
@@ -17,7 +17,6 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import java.util.logging.Logger;
import twitter4j.Query;
import twitter4j.QueryResult;
@@ -25,7 +24,6 @@
import twitter4j.Twitter;
import twitter4j.TwitterException;
import twitter4j.TwitterFactory;
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
import edu.uci.ics.asterix.om.base.AMutableRecord;
import edu.uci.ics.asterix.om.base.AMutableString;
@@ -51,8 +49,6 @@
private ARecordType recordType;
private int nextTweetIndex = 0;
- private static final Logger LOGGER = Logger.getLogger(PullBasedTwitterFeedClient.class.getName());
-
public PullBasedTwitterFeedClient(IHyracksTaskContext ctx, PullBasedTwitterAdapter adapter) {
twitter = new TwitterFactory().getInstance();
mutableFields = new IAObject[] { new AMutableString(null), new AMutableString(null), new AMutableString(null),
@@ -92,11 +88,6 @@
return InflowState.DATA_AVAILABLE;
}
- @Override
- public void resetOnFailure(Exception e) throws AsterixException {
- // TOOO: implement resetting logic for Twitter
- }
-
private void initialize(Map<String, String> params) {
this.keywords = (String) params.get(PullBasedTwitterAdapter.QUERY);
this.requestInterval = Integer.parseInt((String) params.get(PullBasedTwitterAdapter.INTERVAL));
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
index 6ace5c5..4eea034 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
@@ -30,16 +30,14 @@
private static final long serialVersionUID = 1L;
+ private static final String KEY_RSS_URL = "rss_url";
+
private List<String> feedURLs = new ArrayList<String>();
- private boolean isStopRequested = false;
private String id_prefix = "";
private IPullBasedFeedClient rssFeedClient;
- private ARecordType recordType;
- public boolean isStopRequested() {
- return isStopRequested;
- }
+ private ARecordType recordType;
public RSSFeedAdapter(Map<String, String> configuration, ARecordType recordType, IHyracksTaskContext ctx)
throws AsterixException {
@@ -48,15 +46,6 @@
this.recordType = recordType;
}
- public void setStopRequested(boolean isStopRequested) {
- this.isStopRequested = isStopRequested;
- }
-
- @Override
- public void stop() {
- isStopRequested = true;
- }
-
private void initializeFeedURLs(String rssURLProperty) {
feedURLs.clear();
String[] feedURLProperty = rssURLProperty.split(",");
@@ -66,7 +55,7 @@
}
protected void reconfigure(Map<String, String> arguments) {
- String rssURLProperty = configuration.get("KEY_RSS_URL");
+ String rssURLProperty = configuration.get(KEY_RSS_URL);
if (rssURLProperty != null) {
initializeFeedURLs(rssURLProperty);
}
@@ -84,4 +73,9 @@
return recordType;
}
+ @Override
+ public DataExchangeMode getDataExchangeMode() {
+ return DataExchangeMode.PULL;
+ }
+
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedClient.java
index 0522d42..41ed923 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedClient.java
@@ -18,7 +18,6 @@
import java.net.URL;
import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
import java.util.Queue;
import com.sun.syndication.feed.synd.SyndEntryImpl;
@@ -42,7 +41,6 @@
@SuppressWarnings("rawtypes")
public class RSSFeedClient extends PullBasedFeedClient {
- private final String feedURL;
private long id = 0;
private String idPrefix;
private boolean feedModified = false;
@@ -67,9 +65,8 @@
}
public RSSFeedClient(RSSFeedAdapter adapter, String feedURL, String id_prefix) throws MalformedURLException {
- this.feedURL = feedURL;
this.idPrefix = id_prefix;
- feedUrl = new URL(feedURL);
+ this.feedUrl = new URL(feedURL);
feedInfoCache = HashMapFeedInfoCache.getInstance();
fetcher = new HttpURLFeedFetcher(feedInfoCache);
listener = new FetcherEventListenerImpl(this);
@@ -132,12 +129,6 @@
}
}
- @Override
- public void resetOnFailure(Exception e) {
- // TODO Auto-generated method stub
-
- }
-
}
class FetcherEventListenerImpl implements FetcherListener {
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/StreamBasedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/StreamBasedAdapter.java
index 1871111..f09a841 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/StreamBasedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/StreamBasedAdapter.java
@@ -5,7 +5,6 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import edu.uci.ics.asterix.metadata.feeds.AdapterRuntimeManager;
import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -20,14 +19,11 @@
protected static final Logger LOGGER = Logger.getLogger(StreamBasedAdapter.class.getName());
- public static final String NODE_RESOLVER_FACTORY_PROPERTY = "node.Resolver";
-
public abstract InputStream getInputStream(int partition) throws IOException;
protected final ITupleParser tupleParser;
+
protected final IAType sourceDatatype;
- protected IHyracksTaskContext ctx;
- protected AdapterRuntimeManager runtimeManager;
public StreamBasedAdapter(ITupleParserFactory parserFactory, IAType sourceDatatype, IHyracksTaskContext ctx)
throws HyracksDataException {
@@ -46,4 +42,5 @@
}
}
}
+
}
diff --git a/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/CreateCommand.java b/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/CreateCommand.java
index 63d2f33..5b024ec 100644
--- a/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/CreateCommand.java
+++ b/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/CreateCommand.java
@@ -16,9 +16,6 @@
import java.io.File;
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.Unmarshaller;
-
import org.kohsuke.args4j.Option;
import edu.uci.ics.asterix.common.configuration.AsterixConfiguration;
diff --git a/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/driver/InstallerDriver.java b/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/driver/InstallerDriver.java
index 53ec136..8461518 100644
--- a/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/driver/InstallerDriver.java
+++ b/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/driver/InstallerDriver.java
@@ -24,7 +24,6 @@
import edu.uci.ics.asterix.event.service.AsterixEventService;
import edu.uci.ics.asterix.event.service.ILookupService;
import edu.uci.ics.asterix.event.service.ServiceProvider;
-import edu.uci.ics.asterix.event.util.PatternCreator;
import edu.uci.ics.asterix.installer.command.CommandHandler;
import edu.uci.ics.asterix.installer.schema.conf.Configuration;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AbstractFeedDatasourceAdapter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AbstractFeedDatasourceAdapter.java
deleted file mode 100644
index 2dc7c56..0000000
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AbstractFeedDatasourceAdapter.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package edu.uci.ics.asterix.metadata.feeds;
-
-public abstract class AbstractFeedDatasourceAdapter implements IDatasourceAdapter {
-
- private static final long serialVersionUID = 1L;
-
- protected FeedPolicyEnforcer policyEnforcer;
-
- public FeedPolicyEnforcer getPolicyEnforcer() {
- return policyEnforcer;
- }
-
- public void setFeedPolicyEnforcer(FeedPolicyEnforcer policyEnforcer) {
- this.policyEnforcer = policyEnforcer;
- }
-
-}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
index 3dbd2e2..24c6281 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
@@ -27,6 +27,7 @@
import edu.uci.ics.asterix.common.feeds.FeedRuntimeManager;
import edu.uci.ics.asterix.common.feeds.IFeedManager;
import edu.uci.ics.asterix.metadata.feeds.AdapterRuntimeManager.State;
+import edu.uci.ics.asterix.metadata.feeds.IFeedAdapter.DataExchangeMode;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
@@ -79,8 +80,8 @@
adapterRuntimeMgr = new AdapterRuntimeManager(feedId, adapter, feedFrameWriter, partition, inbox,
feedManager);
- if (adapter instanceof AbstractFeedDatasourceAdapter) {
- ((AbstractFeedDatasourceAdapter) adapter).setFeedPolicyEnforcer(policyEnforcer);
+ if (adapter.getDataExchangeMode().equals(DataExchangeMode.PULL)) {
+ ((IPullBasedFeedAdapter) adapter).setFeedPolicyEnforcer(policyEnforcer);
}
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Beginning new feed:" + feedId);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedAdapter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedAdapter.java
index f525420..55abd73 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedAdapter.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedAdapter.java
@@ -20,6 +20,16 @@
*/
public interface IFeedAdapter extends IDatasourceAdapter {
+ public enum DataExchangeMode {
+ PULL,
+ PUSH
+ }
+
+ /**
+ * @return
+ */
+ public DataExchangeMode getDataExchangeMode();
+
/**
* Discontinue the ingestion of data and end the feed.
*
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IPullBasedFeedAdapter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IPullBasedFeedAdapter.java
new file mode 100644
index 0000000..50641b0
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IPullBasedFeedAdapter.java
@@ -0,0 +1,28 @@
+/*
+x * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.metadata.feeds;
+
+public interface IPullBasedFeedAdapter extends IFeedAdapter {
+
+ /**
+ * @return
+ */
+ public FeedPolicyEnforcer getPolicyEnforcer();
+
+ /**
+ * @param feedPolicyEnforcer
+ */
+ public void setFeedPolicyEnforcer(FeedPolicyEnforcer feedPolicyEnforcer);
+}
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/DataGenerator.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/DataGenerator.java
index b6f693a..ce22887 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/DataGenerator.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/DataGenerator.java
@@ -1,3 +1,17 @@
+/*
+x * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package edu.uci.ics.asterix.tools.external.data;
import java.nio.CharBuffer;
@@ -7,8 +21,6 @@
import java.util.List;
import java.util.Random;
-import edu.uci.ics.asterix.tools.external.data.TweetGenerator.DataMode;
-
public class DataGenerator {
private RandomDateGenerator randDateGen;
@@ -32,20 +44,13 @@
public class TweetMessageIterator implements Iterator<TweetMessage> {
private final int duration;
- private final GULongIDGenerator[] idGens;
+ private final GULongIDGenerator idGen;
private long startTime = 0;
- private int uidGenInUse = 0;
- private TweetMessage dummyMessage;
- private DataMode dataMode;
- public TweetMessageIterator(int duration, GULongIDGenerator[] idGens, DataMode dataMode) {
+ public TweetMessageIterator(int duration, GULongIDGenerator idGen) {
this.duration = duration;
- this.idGens = idGens;
+ this.idGen = idGen;
this.startTime = System.currentTimeMillis();
- if (dataMode.equals(DataMode.REUSE_DATA)) {
- dummyMessage = next();
- }
- this.dataMode = dataMode;
}
@Override
@@ -56,32 +61,18 @@
@Override
public TweetMessage next() {
TweetMessage msg = null;
- switch (dataMode) {
- case NEW_DATA:
- getTwitterUser(null);
- Message message = randMessageGen.getNextRandomMessage();
- Point location = randLocationGen.getRandomPoint();
- DateTime sendTime = randDateGen.getNextRandomDatetime();
- twMessage.reset(idGens[uidGenInUse].getNextULong(), twUser, location, sendTime,
- message.getReferredTopics(), message);
- msg = twMessage;
- break;
- case REUSE_DATA:
- dummyMessage.setTweetid(idGens[uidGenInUse].getNextULong());
- msg = dummyMessage;
- break;
- }
+ getTwitterUser(null);
+ Message message = randMessageGen.getNextRandomMessage();
+ Point location = randLocationGen.getRandomPoint();
+ DateTime sendTime = randDateGen.getNextRandomDatetime();
+ twMessage.reset(idGen.getNextULong(), twUser, location, sendTime, message.getReferredTopics(), message);
+ msg = twMessage;
return msg;
}
@Override
public void remove() {
// TODO Auto-generated method stub
-
- }
-
- public void toggleUidKeySpace() {
- uidGenInUse = (uidGenInUse + 1) % idGens.length;
}
}
@@ -130,10 +121,10 @@
public RandomDateGenerator(Date startDate, Date endDate) {
this.startDate = startDate;
this.endDate = endDate;
- yearDifference = endDate.getYear() - startDate.getYear() + 1;
- workingDate = new Date();
- recentDate = new Date();
- dateTime = new DateTime();
+ this.yearDifference = endDate.getYear() - startDate.getYear() + 1;
+ this.workingDate = new Date();
+ this.recentDate = new Date();
+ this.dateTime = new DateTime();
}
public Date getStartDate() {
@@ -1167,4 +1158,5 @@
"Lexicone", "Fax-fax", "Viatechi", "Inchdox", "Kongreen", "Doncare", "Y-geohex", "Opeelectronics",
"Medflex", "Dancode", "Roundhex", "Labzatron", "Newhotplus", "Sancone", "Ronholdings", "Quoline",
"zoomplus", "Fix-touch", "Codetechno", "Tanzumbam", "Indiex", "Canline" };
+
}
\ No newline at end of file
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapter.java
index 278565a..20b9be1 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapter.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapter.java
@@ -5,7 +5,6 @@
import java.net.ServerSocket;
import java.net.Socket;
import java.util.logging.Level;
-import java.util.logging.Logger;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.external.dataset.adapter.StreamBasedAdapter;
@@ -19,8 +18,6 @@
private static final long serialVersionUID = 1L;
- private static final Logger LOGGER = Logger.getLogger(GenericSocketFeedAdapter.class.getName());
-
private SocketFeedServer socketFeedServer;
public GenericSocketFeedAdapter(ITupleParserFactory parserFactory, ARecordType outputtype, int port,
@@ -89,4 +86,8 @@
socketFeedServer.stop();
}
+ public DataExchangeMode getDataExchangeMode() {
+ return DataExchangeMode.PUSH;
+ }
+
}
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapterFactory.java
index 4fd0453..dbe3611 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/GenericSocketFeedAdapterFactory.java
@@ -1,5 +1,5 @@
/*
-x * Copyright 2009-2012 by The Regents of the University of California
+ * Copyright 2009-2012 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
@@ -28,7 +28,6 @@
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.util.AsterixRuntimeUtil;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -102,11 +101,11 @@
mode = Mode.valueOf(modeValue.trim().toUpperCase());
}
String socketsValue = configuration.get(KEY_SOCKETS);
- Map<String, Set<String>> ncMap = AsterixRuntimeUtil.getNodeControllerMap();
- List<String> ncs = AsterixRuntimeUtil.getAllNodeControllers();
if (socketsValue == null) {
throw new IllegalArgumentException("\'sockets\' parameter not specified as part of adaptor configuration");
}
+ Map<String, Set<String>> ncMap = AsterixRuntimeUtil.getNodeControllerMap();
+ List<String> ncs = AsterixRuntimeUtil.getAllNodeControllers();
String[] socketsArray = socketsValue.split(",");
Random random = new Random();
for (String socket : socketsArray) {
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
index a92cb55..8f169f2 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
@@ -51,5 +51,10 @@
public void stop() {
((RateControlledTupleParser) tupleParser).stop();
}
+
+ @Override
+ public DataExchangeMode getDataExchangeMode() {
+ return DataExchangeMode.PULL;
+ }
}
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator.java
index 28ff14d..9c390c1 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator.java
@@ -1,9 +1,25 @@
+/*
+x * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package edu.uci.ics.asterix.tools.external.data;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import edu.uci.ics.asterix.tools.external.data.DataGenerator.InitializationInfo;
import edu.uci.ics.asterix.tools.external.data.DataGenerator.TweetMessage;
@@ -11,19 +27,17 @@
public class TweetGenerator {
- public static final String NUM_KEY_SPACES = "num-key-spaces";
+ private static Logger LOGGER = Logger.getLogger(TweetGenerator.class.getName());
+
public static final String KEY_DURATION = "duration";
public static final String KEY_TPS = "tps";
- public static final String KEY_TPUT_DURATION = "tput-duration";
public static final String KEY_GUID_SEED = "guid-seed";
- public static final String KEY_FRAME_WRITER_MODE = "frame-writer-mode";
- public static final String KEY_DATA_MODE = "data-mode";
public static final String OUTPUT_FORMAT = "output-format";
public static final String OUTPUT_FORMAT_ARECORD = "arecord";
public static final String OUTPUT_FORMAT_ADM_STRING = "adm-string";
- private static final int DEFAULT_DURATION = 60;
+ private static final int DEFAULT_DURATION = 60; //seconds
private static final int DEFAULT_GUID_SEED = 0;
private int duration;
@@ -35,69 +49,35 @@
private OutputStream os;
private DataGenerator dataGenerator = null;
private ByteBuffer outputBuffer = ByteBuffer.allocate(32 * 1024);
- private GULongIDGenerator[] uidGenerators;
- private int numUidGenerators;
- private FrameWriterMode frameWriterMode;
- private DataMode dataMode;
+ private GULongIDGenerator uidGenerator;
public int getTweetCount() {
return tweetCount;
}
- public enum DataMode {
- REUSE_DATA,
- NEW_DATA
- }
-
- public enum FrameWriterMode {
- DUMMY_NO_PARSING,
- PARSING
- }
-
public TweetGenerator(Map<String, String> configuration, int partition, String format, OutputStream os)
throws Exception {
this.partition = partition;
String value = configuration.get(KEY_DURATION);
- duration = value != null ? Integer.parseInt(value) : DEFAULT_DURATION;
-
- value = configuration.get(KEY_DATA_MODE);
- dataMode = value != null ? DataMode.valueOf(value) : DataMode.NEW_DATA;
- numUidGenerators = configuration.get(NUM_KEY_SPACES) != null ? Integer.parseInt(configuration
- .get(NUM_KEY_SPACES)) : 1;
- uidGenerators = new GULongIDGenerator[numUidGenerators];
-
+ this.duration = value != null ? Integer.parseInt(value) : DEFAULT_DURATION;
int guidSeed = configuration.get(KEY_GUID_SEED) != null ? Integer.parseInt(configuration.get(KEY_GUID_SEED))
: DEFAULT_GUID_SEED;
-
- for (int i = 0; i < uidGenerators.length; i++) {
- uidGenerators[i] = new GULongIDGenerator(partition, (byte) (i + guidSeed));
- }
-
- InitializationInfo info = new InitializationInfo();
- dataGenerator = new DataGenerator(info);
- value = configuration.get(KEY_FRAME_WRITER_MODE);
- frameWriterMode = value != null ? FrameWriterMode.valueOf(value.toUpperCase()) : FrameWriterMode.PARSING;
- dataMode = configuration.get(KEY_DATA_MODE) != null ? DataMode.valueOf(configuration.get(KEY_DATA_MODE))
- : DataMode.NEW_DATA;
- tweetIterator = dataGenerator.new TweetMessageIterator(duration, uidGenerators, dataMode);
+ uidGenerator = new GULongIDGenerator(partition, (byte) (guidSeed));
+ dataGenerator = new DataGenerator(new InitializationInfo());
+ tweetIterator = dataGenerator.new TweetMessageIterator(duration, uidGenerator);
this.os = os;
}
private void writeTweetString(TweetMessage tweetMessage) throws IOException {
String tweet = tweetMessage.toString() + "\n";
tweetCount++;
- if (frameWriterMode.equals(FrameWriterMode.PARSING)) {
- byte[] b = tweet.getBytes();
- if (outputBuffer.position() + b.length > outputBuffer.limit()) {
- flush();
- numFlushedTweets += frameTweetCount;
- frameTweetCount = 0;
- outputBuffer.put(b);
- frameTweetCount++;
- } else {
- outputBuffer.put(b);
- frameTweetCount++;
- }
+ byte[] b = tweet.getBytes();
+ if (outputBuffer.position() + b.length > outputBuffer.limit()) {
+ flush();
+ numFlushedTweets += frameTweetCount;
+ frameTweetCount = 0;
+ outputBuffer.put(b);
+ frameTweetCount++;
}
}
@@ -110,13 +90,14 @@
os.write(outputBuffer.array(), 0, outputBuffer.limit());
outputBuffer.position(0);
outputBuffer.limit(32 * 1024);
- tweetIterator.toggleUidKeySpace();
}
public boolean setNextRecordBatch(int numTweetsInBatch) throws Exception {
boolean moreData = tweetIterator.hasNext();
if (!moreData) {
- System.out.println("TWEET COUNT: [" + partition + "]" + tweetCount);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Reached end of batch. Tweet Count: [" + partition + "]" + tweetCount);
+ }
return false;
} else {
int count = 0;
@@ -127,5 +108,4 @@
return true;
}
}
-
}
\ No newline at end of file
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
index 7bbfae1..9e3c4dd 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
@@ -1,3 +1,17 @@
+/*
+x * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package edu.uci.ics.asterix.tools.external.data;
import java.io.IOException;
@@ -54,7 +68,7 @@
return inputStream;
}
- private static class TwitterServer {
+ public static class TwitterServer {
private final DataProvider dataProvider;
private final ExecutorService executorService;
@@ -74,7 +88,7 @@
}
- private static class DataProvider implements Runnable {
+ public static class DataProvider implements Runnable {
public static final String KEY_MODE = "mode";
@@ -116,29 +130,30 @@
long startBatch;
long endBatch;
- while (true) {
- try {
- while (moreData && continuePush) {
- switch (mode) {
- case AGGRESSIVE:
- moreData = tweetGenerator.setNextRecordBatch(batchSize);
- break;
- case CONTROLLED:
- startBatch = System.currentTimeMillis();
- moreData = tweetGenerator.setNextRecordBatch(batchSize);
- endBatch = System.currentTimeMillis();
- if (endBatch - startBatch < 1000) {
- Thread.sleep(1000 - (endBatch - startBatch));
+ try {
+ while (moreData && continuePush) {
+ switch (mode) {
+ case AGGRESSIVE:
+ moreData = tweetGenerator.setNextRecordBatch(batchSize);
+ break;
+ case CONTROLLED:
+ startBatch = System.currentTimeMillis();
+ moreData = tweetGenerator.setNextRecordBatch(batchSize);
+ endBatch = System.currentTimeMillis();
+ if (endBatch - startBatch < 1000) {
+ Thread.sleep(1000 - (endBatch - startBatch));
+ } else {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Unable to reach the required tps of " + batchSize);
}
- break;
- }
+ }
+ break;
}
- os.close();
- break;
- } catch (Exception e) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Exception in adaptor " + e.getMessage());
- }
+ }
+ os.close();
+ } catch (Exception e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Exception in adaptor " + e.getMessage());
}
}
}
@@ -154,5 +169,9 @@
twitterServer.stop();
}
-
+ @Override
+ public DataExchangeMode getDataExchangeMode() {
+ return DataExchangeMode.PUSH;
+ }
+
}
\ No newline at end of file