In my previous post Streaming Twitter Data using Apache Flume which fetches tweets using Flume and twitter streaming for data analysis.Twitter streaming converts tweets to Avro format and send Avro events to downsteam HDFS sinks, when Hive table backed by Avro load the data, I got the error message said "Avro block size is invalid or too large". In order to overcome this issue, I used Cloudera TwitterSource rather than apache TwitterSource.
Download Cloudera twitter source
Download Cloudera twitter source from below location
https://github.com/cloudera/cdh-twitter-example
click clone and download button, it will download a zip file (cdh-twitter-example-master.zip)
unzip this file , it will have different folder including flume-sources and hive-serdes.
After download and unzip, compile flume-sources.1.0-SNAPSHOT.jar. This jar contains the implementation of Cloudera TwitterSource. You should compile it using maven
After unzip file , it has a folder flume-sources. Copy it to your node where Maven is installted. I placed this folder in /usr/hadoopsw/
The flume-sources directory contains a Maven project with a custom Flume source designed to connect to the Twitter Streaming API and ingest tweets in a raw JSON format into HDFS.
Download Maven
If you don't have maven already then download Maven (apache-maven-3.5.0-bin.tar.gz) from below location
https://maven.apache.org/download.cgi
This will generate a file called flume-sources-1.0-SNAPSHOT.jar in the target directory.
Modify twitter.conf file
Remove the files in HDFS and run the agent again to stream twitter data
Now tweets streamed successfully, so cat and view the contents
[hdpsysuser@vbgeneric target]$ hdfs dfs -cat /flume/twitter/FlumeData.1497219143017
hive> select * from twitterdata limit 2;
OK
Failed with exception java.io.IOException:java.io.IOException: Not a data file.
Time taken: 0.591 seconds
hive>
Setting up Hive
Build or Download the JSON SerDeCopy hive-serdes folder to your desired location eg; /usr/hadoopsw/
The hive-serdes directory contains a Maven project with a JSON SerDe which enables Hive to query raw JSON data.
[hdpsysuser@vbgeneric ~]$ cd /usr/hadoopsw/hive-serdes/
Build the hive-serdes JAR, from the root of the git repository:
Optional: Create view in PrestoDB
create or replace view hive.flume.vw_tweets as
SELECT
cast(json_extract_scalar(json_response, '$.created_at') as varchar) created_at,
json_extract_scalar(json_response, '$.source') source,
cast(json_extract_scalar(json_response, '$.retweet_count') as bigint) retweet_count,
json_extract_scalar(json_response, '$.retweeted') retweeted,
json_extract_scalar(json_response, '$.id') id,
json_extract_scalar(json_response, '$.text') text,
json_extract_scalar(json_response, '$.lang') lang,
json_extract_scalar(json_response, '$.favorited') favorited,
json_extract_scalar(json_response, '$.possibly_sensitive') possibly_sensitive,
json_extract_scalar(json_response, '$.coordinates') coordinates,
json_extract_scalar(json_response, '$.truncated') truncated,
json_extract_scalar(json_response, '$.timestamp_ms') timestamp_ms,
json_extract_scalar(json_response, '$.entities.urls[0].display_url') display_ur,
json_extract_scalar(json_response, '$.entities.urls[0].expanded_url') expanded_url,
json_extract_scalar(json_response, '$.entities.urls[0].url') url,
cast(json_extract_scalar(json_response, '$.user.friends_count') as bigint) user_friends_count,
json_extract_scalar(json_response, '$.user.profile_image_url_https') user_profile_image_url_https,
cast(json_extract_scalar(json_response, '$.user.listed_count') as bigint) user_listed_count,
json_extract_scalar(json_response, '$.user.profile_background_image_url') user_profile_background_image_url,
cast(json_extract_scalar(json_response, '$.user.favourites_count') as bigint) user_favourites_count,
json_extract_scalar(json_response, '$.user.description') user_description,
json_extract_scalar(json_response, '$.user.created_at') user_created_at,
json_extract_scalar(json_response, '$.user.profile_background_image_url_https') user_profile_background_image_url_https,
json_extract_scalar(json_response, '$.user.protected') user_protected,
json_extract_scalar(json_response, '$.user.id') user_id,
json_extract_scalar(json_response, '$.user.geo_enabled') user_geo_enabled,
json_extract_scalar(json_response, '$.user.lang') user_lang,
json_extract_scalar(json_response, '$.user.verified') user_verified,
json_extract_scalar(json_response, '$.user.time_zone') user_time_zone,
json_extract_scalar(json_response, '$.user.url') user_url,
json_extract_scalar(json_response, '$.user.contributors_enabled') user_contributors_enabled,
cast(json_extract_scalar(json_response, '$.user.statuses_count') as bigint) user_statuses_count,
cast(json_extract_scalar(json_response, '$.user.followers_count') as bigint) user_followers_count,
json_extract_scalar(json_response, '$.user.name') user_name,
json_extract_scalar(json_response, '$.user.location') user_location
FROM
hive.flume.raw_tweets
Streaming to HBase
You can stream the tweets to HBase also using AsyncHBaseSink. Example configuration is below.
1- Create a table in HBase
hbase(main):040:0* create 'json_data','cf'
0 row(s) in 2.3020 seconds
=> Hbase::Table - json_data
export HIVE_HOME=/usr/hdp/current/hive-server2
export HCAT_HOME=/usr/hdp/current/hive-webhcat
export FLUME_HOME=/usr/hdp/2.6.1.0-129/flume
[flume@dn04 ~]$ export HIVE_HOME=/usr/hdp/current/hive-server2
[flume@dn04 ~]$ export HCAT_HOME=/usr/hdp/current/hive-webhcat
[flume@dn04 ~]$ export FLUME_HOME=/usr/hdp/2.6.1.0-129/flume
2- Agent configuration File
hbase_agent.conf
# Naming the components on the current agent.
hbase_agent1.sources = Twitter
hbase_agent1.channels = MemChannelHBase
hbase_agent1.sinks = HBASESINK
# Describing/Configuring the source
hbase_agent1.sources.Twitter.type = com.cloudera.flume.source.TwitterSource
hbase_agent1.sources.Twitter.consumerKey = NZTCCsoEEZEpnvzcguFUYvvlmy
hbase_agent1.sources.Twitter.consumerSecret = odBgVnMd8xDoFzNh56CyKH6l4Q9uyJE24yGKfRZyy3uNGlWyqqI
hbase_agent1.sources.Twitter.accessToken = 522008992-jcg9PYx74FVw7fkdPk8jLYgwpIJ69Kedft1VdTsR
hbase_agent1.sources.Twitter.accessTokenSecret = KFfGh1Uu1jSvXkZTy6FJcnJIxdfuBOGTLUHlyyuLwACqEI
hbase_agent1.sources.Twitter.keywords = pakistan
# Describing/Configuring the sink
#Use the AsyncHBaseSink
hbase_agent1.sinks.HBASESINK.type = org.apache.flume.sink.hbase.AsyncHBaseSink
hbase_agent1.sinks.HBASESINK.channel = MemChannelHBase
hbase_agent1.sinks.HBASESINK.table = json_data
hbase_agent1.sinks.HBASESINK.columnFamily = cf
hbase_agent1.sinks.HBASESINK.column = charges
hbase_agent1.sinks.HBASESINK.batchSize = 5000
#Use the SimpleAsyncHbaseEventSerializer that comes with Flume
hbase_agent1.sinks.HBASESINK.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
hbase_agent1.sinks.HBASESINK.serializer.incrementColumn = icol
# Describing/Configuring the channel
hbase_agent1.channels.MemChannelHBase.type=memory
hbase_agent1.channels.MemChannelHBase.capacity=10000
hbase_agent1.channels.MemChannelHBase.transactionCapacity=1000
# Binding the source and sink to the channel
hbase_agent1.sources.Twitter.channels = MemChannelHBase
hbase_agent1.sinks.HBASESINK.channel = MemChannelHBase
3- Run Agent
[flume@dn04 ~]$ flume-ng agent --conf $FLUME_HOME/conf/ -f $FLUME_HOME/conf/hbase_agent.conf -Dflume.root.logger=DEBUG,console -n hbase_agent1
4- Test from HBase Shell
hbase(main):025:0> count 'json_data'
Current count: 1000, row: default4f0868fe-0860-4cca-b2ea-8d64b386c658
Current count: 2000, row: defaulta2d5f305-3949-462b-9094-944fa517629e
Current count: 3000, row: defaultf0ae4d91-5325-4eb1-81b6-bce5c47b1755
3177 row(s) in 0.4750 seconds
=> 3177
If you don't have maven already then download Maven (apache-maven-3.5.0-bin.tar.gz) from below location
https://maven.apache.org/download.cgi
[hdpsysuser@vbgeneric ~]$ tar -xvf apache-maven-3.5.0-bin.tar.gz
[hdpsysuser@vbgeneric ~]$ cd flume-sources/
[hdpsysuser@vbgeneric flume-sources]$ pwd
/usr/hadoopsw/flume-sources
[hdpsysuser@vbgeneric flume-sources]$ ll
total 12
-rw-r--r-- 1 root root 2224 Jul 14 2016 flume.conf
-rw-r--r-- 1 root root 4108 Jul 14 2016 pom.xml
drwxr-xr-x 3 root root 17 Jun 11 17:49 src
[hdpsysuser@vbgeneric flume-sources]$ /usr/hadoopsw/apache-maven-3.5.0/bin/mvn package
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building flume-sources 1.0-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ flume-sources ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory /usr/hadoopsw/flume-sources/src/main/resources
[INFO]
[INFO] --- maven-compiler-plugin:2.3.2:compile (default-compile) @ flume-sources ---
[INFO] Compiling 2 source files to /usr/hadoopsw/flume-sources/target/classes
[INFO]
[INFO] --- maven-resources-plugin:2.6:testResources (default-testResources) @ flume-sources ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory /usr/hadoopsw/flume-sources/src/test/resources
[INFO]
[INFO] --- maven-compiler-plugin:2.3.2:testCompile (default-testCompile) @ flume-sources ---
[INFO] No sources to compile
[INFO]
[INFO] --- maven-surefire-plugin:2.12.4:test (default-test) @ flume-sources ---
Downloading: https://repo.maven.apache.org/maven2/org/apache/maven/maven-plugin-api/2.0.9/maven-plugin-api-2.0.9.pom
Downloaded: https://repo.maven.apache.org/maven2/org/apache/maven/maven-plugin-api/2.0.9/maven-plugin-api-2.0.9.pom (1.5 kB at 559 B/s)
Downloading: https://repo.maven.apache.org/maven2/org/apache/maven/maven/2.0.9/maven-2.0.9.pom
Downloaded: https://repo.maven.apache.org/maven2/org/apache/maven/maven/2.0.9/maven-2.0.9.pom (19 kB at 43 kB/s)
Downloading: https://repo.maven.apache.org/maven2/org/apache/maven/maven-parent/8/maven-parent-8.pom
Downloaded: https://repo.maven.apache.org/maven2/org/apache/maven/maven-parent/8/maven-parent-8.pom (24 kB at 67 kB/s)
Downloading: https://repo.maven.apache.org/maven2/org/apache/maven/surefire/surefire-booter/2.12.4/surefire-booter-2.12.4.pom
Downloaded: https://repo.maven.apache.org/maven2/org/apache/maven/surefire/surefire-booter/2.12.4/surefire-booter-2.12.4.pom (3.0 kB at 9.7 kB/s)
Downloading: https://repo.maven.apache.org/maven2/org/apache/maven/surefire/surefire-api/2.12.4/surefire-api-2.12.4.pom
Downloaded: https://repo.maven.apache.org/maven2/org/apache/maven/surefire/surefire-api/2.12.4/surefire-api-2.12.4.pom (2.5 kB at 8.0 kB/s)
Downloading: https://repo.maven.apache.org/maven2/org/apache/maven/surefire/maven-surefire-common/2.12.4/maven-surefire-common-2.12.4.pom
.....
....
Downloaded: https://repo.maven.apache.org/maven2/asm/asm-util/3.2/asm-util-3.2.jar (37 kB at 32 kB/s)
[INFO] Including org.slf4j:slf4j-api:jar:1.6.1 in the shaded jar.
[INFO] Including org.twitter4j:twitter4j-stream:jar:3.0.5 in the shaded jar.
[INFO] Including org.twitter4j:twitter4j-core:jar:3.0.5 in the shaded jar.
[INFO] Replacing original artifact with shaded artifact.
[INFO] Replacing /usr/hadoopsw/flume-sources/target/flume-sources-1.0-SNAPSHOT.jar with /usr/hadoopsw/flume-sources/target/flume-sources-1.0-SNAPSHOT-shaded.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 58.692 s
[INFO] Finished at: 2017-06-11T18:04:30-04:00
[INFO] Final Memory: 27M/283M
[INFO] ------------------------------------------------------------------------
[hdpsysuser@vbgeneric flume-sources]$ ll
total 24
-rw-rw-r-- 1 hdpsysuser hdpsysuser 10633 Jun 11 18:04 dependency-reduced-pom.xml
-rw-r--r-- 1 hdpsysuser hadoop_grp 2224 Jul 14 2016 flume.conf
-rw-r--r-- 1 hdpsysuser hadoop_grp 4108 Jul 14 2016 pom.xml
drwxr-xr-x 3 hdpsysuser hadoop_grp 17 Jun 11 17:49 src
drwxrwxr-x 5 hdpsysuser hdpsysuser 148 Jun 11 18:04 target
[hdpsysuser@vbgeneric flume-sources]$ cd target/
[hdpsysuser@vbgeneric target]$ ll
total 388
drwxrwxr-x 3 hdpsysuser hdpsysuser 16 Jun 11 18:03 classes
-rw-rw-r-- 1 hdpsysuser hdpsysuser 388883 Jun 11 18:04 flume-sources-1.0-SNAPSHOT.jar
drwxrwxr-x 3 hdpsysuser hdpsysuser 24 Jun 11 18:03 generated-sources
drwxrwxr-x 2 hdpsysuser hdpsysuser 27 Jun 11 18:04 maven-archiver
-rw-rw-r-- 1 hdpsysuser hdpsysuser 7000 Jun 11 18:04 original-flume-sources-1.0-SNAPSHOT.jar
Now copy flume-sources-1.0-SNAPSHOT.jar to $FLUME_HOME/lib
Modify twitter.conf file
Now Change the type in twitter.conf file as explained in Streaming Twitter Data using Apache Flume , I used apache TwitterSource ie: TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
Please change it to cloudera TwitterSource: ie;
TwitterAgent.sources.Twitter.type = com.cloudera.flume.source.TwitterSource
# Describing/Configuring the source
#TwitterAgent1.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
TwitterAgent1.sources.Twitter.type = com.cloudera.flume.source.TwitterSource
Remove the files in HDFS and run the agent again to stream twitter data
[hdpsysuser@vbgeneric ~]$ hdfs dfs -rm /flume/twitter/*
flume-ng agent --conf $FLUME_HOME/conf/ -f $FLUME_HOME/conf/twitter.conf -Dflume.root.logger=DEBUG,console -n TwitterAgent1
[hdpsysuser@vbgeneric target]$ hdfs dfs -cat /flume/twitter/FlumeData.1497219143017
we will get the below error if we query the hive table based on Avro (in previous post) as file generated by cloudera jar is text JSON
OK
Failed with exception java.io.IOException:java.io.IOException: Not a data file.
Time taken: 0.591 seconds
hive>
Build or Download the JSON SerDeCopy hive-serdes folder to your desired location eg; /usr/hadoopsw/
The hive-serdes directory contains a Maven project with a JSON SerDe which enables Hive to query raw JSON data.
[hdpsysuser@vbgeneric ~]$ cd /usr/hadoopsw/hive-serdes/
[hdpsysuser@vbgeneric hive-serdes]$ ll
total 4
-rw-r--r-- 1 root root 3840 Jul 14 2016 pom.xml
drwxr-xr-x 3 root root 17 Jun 11 18:39 src
total 4
-rw-r--r-- 1 root root 3840 Jul 14 2016 pom.xml
drwxr-xr-x 3 root root 17 Jun 11 18:39 src
[hdpsysuser@vbgeneric hive-serdes]$ /usr/hadoopsw/apache-maven-3.5.0/bin/mvn package
This will generate a file called hive-serdes-1.0-SNAPSHOT.jar in the target directory.
Copy hive-serdes-1.0-SNAPSHOT.jar to $HIVE_HOME/lib
Run hive, and create table using the following commands: CREATE EXTERNAL TABLE tweets_data (
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building hive-serdes 1.0-SNAPSHOT
[INFO] ------------------------------------------------------------------------
Downloading: https://repository.cloudera.com/artifactory/cloudera-repos/org/codehaus/jackson/jackson-core-asl/1.9.8/jackson-core-asl-1.9.8.pom
Downloading: https://repo.maven.apache.org/maven2/org/codehaus/jackson/jackson-core-asl/1.9.8/jackson-core-asl-1.9.8.pom
...
...
Downloaded: https://repo.maven.apache.org/maven2/org/apache/avro/avro-ipc/1.7.3/avro-ipc-1.7.3-tests.jar (264 kB at 74 kB/s)
[INFO]
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ hive-serdes ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory /usr/hadoopsw/hive-serdes/src/main/resources
[INFO]
[INFO] --- maven-compiler-plugin:2.3.2:compile (default-compile) @ hive-serdes ---
[INFO] Compiling 1 source file to /usr/hadoopsw/hive-serdes/target/classes
[INFO]
[INFO] --- maven-resources-plugin:2.6:testResources (default-testResources) @ hive-serdes ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory /usr/hadoopsw/hive-serdes/src/test/resources
[INFO]
[INFO] --- maven-compiler-plugin:2.3.2:testCompile (default-testCompile) @ hive-serdes ---
[INFO] No sources to compile
[INFO]
[INFO] --- maven-surefire-plugin:2.12.4:test (default-test) @ hive-serdes ---
[INFO] No tests to run.
[INFO]
[INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ hive-serdes ---
[INFO] Building jar: /usr/hadoopsw/hive-serdes/target/hive-serdes-1.0-SNAPSHOT.jar
[INFO]
[INFO] --- maven-shade-plugin:1.7.1:shade (default) @ hive-serdes ---
[INFO] Including org.codehaus.jackson:jackson-core-asl:jar:1.9.8 in the shaded jar.
[INFO] Replacing original artifact with shaded artifact.
[INFO] Replacing /usr/hadoopsw/hive-serdes/target/hive-serdes-1.0-SNAPSHOT.jar with /usr/hadoopsw/hive-serdes/target/hive-serdes-1.0-SNAPSHOT-shaded.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 01:09 min
[INFO] Finished at: 2017-06-11T18:45:07-04:00
[INFO] Final Memory: 25M/211M
[INFO] ------------------------------------------------------------------------
Copy hive-serdes-1.0-SNAPSHOT.jar to $HIVE_HOME/lib
id BIGINT,
created_at STRING,
source STRING,
favorited BOOLEAN,
retweeted_status STRUCT<
text:STRING,
twitter_user:STRUCT<screen_name:STRING,name:STRING>,
retweet_count:INT>,
entities STRUCT<
urls:ARRAY<STRUCT<expanded_url:STRING>>,
user_mentions:ARRAY<STRUCT<screen_name:STRING,name:STRING>>,
hashtags:ARRAY<STRUCT<text:STRING>>>,
text STRING,
twitter_user STRUCT<
screen_name:STRING,
name:STRING,
friends_count:INT,
followers_count:INT,
statuses_count:INT,
verified:BOOLEAN,
utc_offset:INT,
time_zone:STRING>,
in_reply_to_screen_name STRING
)
PARTITIONED BY (datehour INT)
ROW FORMAT SERDE 'com.cloudera.hive.serde.JSONSerDe'
LOCATION '/flume/twitter';
If you don't want to use JSONSerde , you could create an external table as below and then use hive function to parse the tweets.
hive> CREATE EXTERNAL TABLE RAW_TWEETS(json_response STRING) STORED AS TEXTFILE LOCATION '/flume/twitter';
hive> select * from RAW_TWEETS limit 10;
Parse tweets
CREATE VIEW vw_parsed_tweets as
SELECT
CAST(get_json_object(json_response, '$.id') as BIGINT) as ID, get_json_object(json_response, '$.created_at') as CREATED_AT,get_json_object(json_response, '$.text') as TEXT, get_json_object(json_response, '$.retweeted') as RETWEETED,
get_json_object(json_response, '$.coordinates') COORDINATES, get_json_object(json_response, '$.source') SOURCE, CAST (get_json_object(json_response, '$.retweet_count') as INT) RETWEET_COUNT,
get_json_object(json_response, '$.entities.urls[0].display_url') DISPLAY_URL,get_json_object(json_response, '$.user.screen_name') USER_SCREEN_NAME, get_json_object(json_response, '$.user.name') USER_NAME,
CAST (get_json_object(json_response, '$.user.followers_count') as INT) FOLLOWER_COUNT, CAST (get_json_object(json_response, '$.user.listed_count') as INT) LISTED_COUNT, CAST (get_json_object(json_response, '$.user.friends_count') as INT) FRIENDS_COUNT,
get_json_object(json_response, '$.user.lang') USER_LANG, get_json_object(json_response, '$.user.location') USER_LOCATION, get_json_object(json_response, '$.user.time_zone') USER_TZ,
get_json_object(json_response, '$.user.profile_image_url') PROFILE_IMAGE_URL
from raw_tweets;
create or replace view hive.flume.vw_tweets as
SELECT
cast(json_extract_scalar(json_response, '$.created_at') as varchar) created_at,
json_extract_scalar(json_response, '$.source') source,
cast(json_extract_scalar(json_response, '$.retweet_count') as bigint) retweet_count,
json_extract_scalar(json_response, '$.retweeted') retweeted,
json_extract_scalar(json_response, '$.id') id,
json_extract_scalar(json_response, '$.text') text,
json_extract_scalar(json_response, '$.lang') lang,
json_extract_scalar(json_response, '$.favorited') favorited,
json_extract_scalar(json_response, '$.possibly_sensitive') possibly_sensitive,
json_extract_scalar(json_response, '$.coordinates') coordinates,
json_extract_scalar(json_response, '$.truncated') truncated,
json_extract_scalar(json_response, '$.timestamp_ms') timestamp_ms,
json_extract_scalar(json_response, '$.entities.urls[0].display_url') display_ur,
json_extract_scalar(json_response, '$.entities.urls[0].expanded_url') expanded_url,
json_extract_scalar(json_response, '$.entities.urls[0].url') url,
cast(json_extract_scalar(json_response, '$.user.friends_count') as bigint) user_friends_count,
json_extract_scalar(json_response, '$.user.profile_image_url_https') user_profile_image_url_https,
cast(json_extract_scalar(json_response, '$.user.listed_count') as bigint) user_listed_count,
json_extract_scalar(json_response, '$.user.profile_background_image_url') user_profile_background_image_url,
cast(json_extract_scalar(json_response, '$.user.favourites_count') as bigint) user_favourites_count,
json_extract_scalar(json_response, '$.user.description') user_description,
json_extract_scalar(json_response, '$.user.created_at') user_created_at,
json_extract_scalar(json_response, '$.user.profile_background_image_url_https') user_profile_background_image_url_https,
json_extract_scalar(json_response, '$.user.protected') user_protected,
json_extract_scalar(json_response, '$.user.id') user_id,
json_extract_scalar(json_response, '$.user.geo_enabled') user_geo_enabled,
json_extract_scalar(json_response, '$.user.lang') user_lang,
json_extract_scalar(json_response, '$.user.verified') user_verified,
json_extract_scalar(json_response, '$.user.time_zone') user_time_zone,
json_extract_scalar(json_response, '$.user.url') user_url,
json_extract_scalar(json_response, '$.user.contributors_enabled') user_contributors_enabled,
cast(json_extract_scalar(json_response, '$.user.statuses_count') as bigint) user_statuses_count,
cast(json_extract_scalar(json_response, '$.user.followers_count') as bigint) user_followers_count,
json_extract_scalar(json_response, '$.user.name') user_name,
json_extract_scalar(json_response, '$.user.location') user_location
FROM
hive.flume.raw_tweets
Streaming to HBase
You can stream the tweets to HBase also using AsyncHBaseSink. Example configuration is below.
1- Create a table in HBase
hbase(main):040:0* create 'json_data','cf'
0 row(s) in 2.3020 seconds
=> Hbase::Table - json_data
export HIVE_HOME=/usr/hdp/current/hive-server2
export HCAT_HOME=/usr/hdp/current/hive-webhcat
export FLUME_HOME=/usr/hdp/2.6.1.0-129/flume
[flume@dn04 ~]$ export HIVE_HOME=/usr/hdp/current/hive-server2
[flume@dn04 ~]$ export HCAT_HOME=/usr/hdp/current/hive-webhcat
[flume@dn04 ~]$ export FLUME_HOME=/usr/hdp/2.6.1.0-129/flume
2- Agent configuration File
hbase_agent.conf
# Naming the components on the current agent.
hbase_agent1.sources = Twitter
hbase_agent1.channels = MemChannelHBase
hbase_agent1.sinks = HBASESINK
# Describing/Configuring the source
hbase_agent1.sources.Twitter.type = com.cloudera.flume.source.TwitterSource
hbase_agent1.sources.Twitter.consumerKey = NZTCCsoEEZEpnvzcguFUYvvlmy
hbase_agent1.sources.Twitter.consumerSecret = odBgVnMd8xDoFzNh56CyKH6l4Q9uyJE24yGKfRZyy3uNGlWyqqI
hbase_agent1.sources.Twitter.accessToken = 522008992-jcg9PYx74FVw7fkdPk8jLYgwpIJ69Kedft1VdTsR
hbase_agent1.sources.Twitter.accessTokenSecret = KFfGh1Uu1jSvXkZTy6FJcnJIxdfuBOGTLUHlyyuLwACqEI
hbase_agent1.sources.Twitter.keywords = pakistan
# Describing/Configuring the sink
#Use the AsyncHBaseSink
hbase_agent1.sinks.HBASESINK.type = org.apache.flume.sink.hbase.AsyncHBaseSink
hbase_agent1.sinks.HBASESINK.channel = MemChannelHBase
hbase_agent1.sinks.HBASESINK.table = json_data
hbase_agent1.sinks.HBASESINK.columnFamily = cf
hbase_agent1.sinks.HBASESINK.column = charges
hbase_agent1.sinks.HBASESINK.batchSize = 5000
#Use the SimpleAsyncHbaseEventSerializer that comes with Flume
hbase_agent1.sinks.HBASESINK.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
hbase_agent1.sinks.HBASESINK.serializer.incrementColumn = icol
# Describing/Configuring the channel
hbase_agent1.channels.MemChannelHBase.type=memory
hbase_agent1.channels.MemChannelHBase.capacity=10000
hbase_agent1.channels.MemChannelHBase.transactionCapacity=1000
# Binding the source and sink to the channel
hbase_agent1.sources.Twitter.channels = MemChannelHBase
hbase_agent1.sinks.HBASESINK.channel = MemChannelHBase
3- Run Agent
[flume@dn04 ~]$ flume-ng agent --conf $FLUME_HOME/conf/ -f $FLUME_HOME/conf/hbase_agent.conf -Dflume.root.logger=DEBUG,console -n hbase_agent1
hbase(main):025:0> count 'json_data'
Current count: 1000, row: default4f0868fe-0860-4cca-b2ea-8d64b386c658
Current count: 2000, row: defaulta2d5f305-3949-462b-9094-944fa517629e
Current count: 3000, row: defaultf0ae4d91-5325-4eb1-81b6-bce5c47b1755
3177 row(s) in 0.4750 seconds
=> 3177
No comments:
Post a Comment