Please see my other blog for Oracle EBusiness Suite Posts - EBMentors

Search This Blog

Note: All the posts are based on practical approach avoiding lengthy theory. All have been tested on some development servers. Please don’t test any post on production servers until you are sure.

Friday, June 23, 2017

Streaming Twitter Data by Flume using Cloudera Twitter Source

In my previous post Streaming Twitter Data using Apache Flume which fetches tweets using Flume and twitter streaming for data analysis.Twitter streaming converts tweets to Avro format and send Avro events to downsteam HDFS sinks, when Hive table backed by Avro load the data, I got the error message said "Avro block size is invalid or too large". In order to overcome this issue, I used Cloudera TwitterSource rather than apache TwitterSource.


Download Cloudera twitter source 

Download Cloudera twitter source from below location
https://github.com/cloudera/cdh-twitter-example 


click clone and download button, it will download a zip file (cdh-twitter-example-master.zip)
unzip this file , it will have different folder including flume-sources and hive-serdes.


After download and unzip, compile flume-sources.1.0-SNAPSHOT.jar. This jar contains the implementation of Cloudera TwitterSource. You should compile it using maven

After unzip file , it has a folder flume-sources. Copy it to your node where Maven is installted. I placed this folder in /usr/hadoopsw/


The flume-sources directory contains a Maven project with a custom Flume source designed to connect to the Twitter Streaming API and ingest tweets in a raw JSON format into HDFS.

Download Maven
If you don't have maven already then download Maven (apache-maven-3.5.0-bin.tar.gz) from below location


https://maven.apache.org/download.cgi
[hdpsysuser@vbgeneric ~]$ tar -xvf apache-maven-3.5.0-bin.tar.gz
[hdpsysuser@vbgeneric ~]$ cd flume-sources/
[hdpsysuser@vbgeneric flume-sources]$ pwd
/usr/hadoopsw/flume-sources

[hdpsysuser@vbgeneric flume-sources]$ ll
total 12
-rw-r--r-- 1 root root 2224 Jul 14  2016 flume.conf
-rw-r--r-- 1 root root 4108 Jul 14  2016 pom.xml
drwxr-xr-x 3 root root   17 Jun 11 17:49 src

[hdpsysuser@vbgeneric flume-sources]$ /usr/hadoopsw/apache-maven-3.5.0/bin/mvn package
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building flume-sources 1.0-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ flume-sources ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory /usr/hadoopsw/flume-sources/src/main/resources
[INFO]
[INFO] --- maven-compiler-plugin:2.3.2:compile (default-compile) @ flume-sources ---
[INFO] Compiling 2 source files to /usr/hadoopsw/flume-sources/target/classes
[INFO]
[INFO] --- maven-resources-plugin:2.6:testResources (default-testResources) @ flume-sources ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory /usr/hadoopsw/flume-sources/src/test/resources
[INFO]
[INFO] --- maven-compiler-plugin:2.3.2:testCompile (default-testCompile) @ flume-sources ---
[INFO] No sources to compile
[INFO]
[INFO] --- maven-surefire-plugin:2.12.4:test (default-test) @ flume-sources ---
Downloading: https://repo.maven.apache.org/maven2/org/apache/maven/maven-plugin-api/2.0.9/maven-plugin-api-2.0.9.pom
Downloaded: https://repo.maven.apache.org/maven2/org/apache/maven/maven-plugin-api/2.0.9/maven-plugin-api-2.0.9.pom (1.5 kB at 559 B/s)
Downloading: https://repo.maven.apache.org/maven2/org/apache/maven/maven/2.0.9/maven-2.0.9.pom
Downloaded: https://repo.maven.apache.org/maven2/org/apache/maven/maven/2.0.9/maven-2.0.9.pom (19 kB at 43 kB/s)
Downloading: https://repo.maven.apache.org/maven2/org/apache/maven/maven-parent/8/maven-parent-8.pom
Downloaded: https://repo.maven.apache.org/maven2/org/apache/maven/maven-parent/8/maven-parent-8.pom (24 kB at 67 kB/s)
Downloading: https://repo.maven.apache.org/maven2/org/apache/maven/surefire/surefire-booter/2.12.4/surefire-booter-2.12.4.pom
Downloaded: https://repo.maven.apache.org/maven2/org/apache/maven/surefire/surefire-booter/2.12.4/surefire-booter-2.12.4.pom (3.0 kB at 9.7 kB/s)
Downloading: https://repo.maven.apache.org/maven2/org/apache/maven/surefire/surefire-api/2.12.4/surefire-api-2.12.4.pom
Downloaded: https://repo.maven.apache.org/maven2/org/apache/maven/surefire/surefire-api/2.12.4/surefire-api-2.12.4.pom (2.5 kB at 8.0 kB/s)
Downloading: https://repo.maven.apache.org/maven2/org/apache/maven/surefire/maven-surefire-common/2.12.4/maven-surefire-common-2.12.4.pom
.....
....

Downloaded: https://repo.maven.apache.org/maven2/asm/asm-util/3.2/asm-util-3.2.jar (37 kB at 32 kB/s)
[INFO] Including org.slf4j:slf4j-api:jar:1.6.1 in the shaded jar.
[INFO] Including org.twitter4j:twitter4j-stream:jar:3.0.5 in the shaded jar.
[INFO] Including org.twitter4j:twitter4j-core:jar:3.0.5 in the shaded jar.
[INFO] Replacing original artifact with shaded artifact.
[INFO] Replacing /usr/hadoopsw/flume-sources/target/flume-sources-1.0-SNAPSHOT.jar with /usr/hadoopsw/flume-sources/target/flume-sources-1.0-SNAPSHOT-shaded.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 58.692 s
[INFO] Finished at: 2017-06-11T18:04:30-04:00
[INFO] Final Memory: 27M/283M
[INFO] ------------------------------------------------------------------------

This will generate a file called flume-sources-1.0-SNAPSHOT.jar in the target directory.
[hdpsysuser@vbgeneric flume-sources]$ ll
total 24
-rw-rw-r-- 1 hdpsysuser hdpsysuser 10633 Jun 11 18:04 dependency-reduced-pom.xml
-rw-r--r-- 1 hdpsysuser hadoop_grp  2224 Jul 14  2016 flume.conf
-rw-r--r-- 1 hdpsysuser hadoop_grp  4108 Jul 14  2016 pom.xml
drwxr-xr-x 3 hdpsysuser hadoop_grp    17 Jun 11 17:49 src
drwxrwxr-x 5 hdpsysuser hdpsysuser   148 Jun 11 18:04 target

[hdpsysuser@vbgeneric flume-sources]$ cd target/
[hdpsysuser@vbgeneric target]$ ll
total 388
drwxrwxr-x 3 hdpsysuser hdpsysuser     16 Jun 11 18:03 classes
-rw-rw-r-- 1 hdpsysuser hdpsysuser 388883 Jun 11 18:04 flume-sources-1.0-SNAPSHOT.jar
drwxrwxr-x 3 hdpsysuser hdpsysuser     24 Jun 11 18:03 generated-sources
drwxrwxr-x 2 hdpsysuser hdpsysuser     27 Jun 11 18:04 maven-archiver
-rw-rw-r-- 1 hdpsysuser hdpsysuser   7000 Jun 11 18:04 original-flume-sources-1.0-SNAPSHOT.jar

Now copy flume-sources-1.0-SNAPSHOT.jar to $FLUME_HOME/lib

Modify twitter.conf file
Now Change the type in twitter.conf file as explained in Streaming Twitter Data using Apache Flume I used apache TwitterSource ie: TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource

Please change it to cloudera TwitterSource: ie;
TwitterAgent.sources.Twitter.type = com.cloudera.flume.source.TwitterSource

# Describing/Configuring the source 
#TwitterAgent1.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
TwitterAgent1.sources.Twitter.type = com.cloudera.flume.source.TwitterSource

Remove the files in HDFS and run the agent again to stream twitter data
[hdpsysuser@vbgeneric ~]$ hdfs dfs -rm /flume/twitter/*

flume-ng agent --conf $FLUME_HOME/conf/ -f $FLUME_HOME/conf/twitter.conf -Dflume.root.logger=DEBUG,console -n TwitterAgent1

Now tweets streamed successfully, so cat and view the contents

[hdpsysuser@vbgeneric target]$ hdfs dfs -cat /flume/twitter/FlumeData.1497219143017

we will get the below error if we query the hive table based on Avro (in previous post)  as file generated by cloudera jar is text JSON

hive> select * from twitterdata limit 2;
OK
Failed with exception java.io.IOException:java.io.IOException: Not a data file.
Time taken: 0.591 seconds
hive>

Setting up Hive

Build or Download the JSON SerDeCopy hive-serdes folder to your desired location eg; /usr/hadoopsw/

The hive-serdes directory contains a Maven project with a JSON SerDe which enables Hive to query raw JSON data.

[hdpsysuser@vbgeneric ~]$ cd /usr/hadoopsw/hive-serdes/
[hdpsysuser@vbgeneric hive-serdes]$ ll
total 4
-rw-r--r-- 1 root root 3840 Jul 14 2016 pom.xml
drwxr-xr-x 3 root root 17 Jun 11 18:39 src

Build the hive-serdes JAR, from the root of the git repository:
[hdpsysuser@vbgeneric hive-serdes]$ /usr/hadoopsw/apache-maven-3.5.0/bin/mvn package

[INFO] Scanning for projects...

[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building hive-serdes 1.0-SNAPSHOT
[INFO] ------------------------------------------------------------------------
Downloading: https://repository.cloudera.com/artifactory/cloudera-repos/org/codehaus/jackson/jackson-core-asl/1.9.8/jackson-core-asl-1.9.8.pom
Downloading: https://repo.maven.apache.org/maven2/org/codehaus/jackson/jackson-core-asl/1.9.8/jackson-core-asl-1.9.8.pom
...
...
Downloaded: https://repo.maven.apache.org/maven2/org/apache/avro/avro-ipc/1.7.3/avro-ipc-1.7.3-tests.jar (264 kB at 74 kB/s)
[INFO]
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ hive-serdes ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory /usr/hadoopsw/hive-serdes/src/main/resources
[INFO]
[INFO] --- maven-compiler-plugin:2.3.2:compile (default-compile) @ hive-serdes ---
[INFO] Compiling 1 source file to /usr/hadoopsw/hive-serdes/target/classes
[INFO]
[INFO] --- maven-resources-plugin:2.6:testResources (default-testResources) @ hive-serdes ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory /usr/hadoopsw/hive-serdes/src/test/resources
[INFO]
[INFO] --- maven-compiler-plugin:2.3.2:testCompile (default-testCompile) @ hive-serdes ---
[INFO] No sources to compile
[INFO]
[INFO] --- maven-surefire-plugin:2.12.4:test (default-test) @ hive-serdes ---
[INFO] No tests to run.
[INFO]
[INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ hive-serdes ---
[INFO] Building jar: /usr/hadoopsw/hive-serdes/target/hive-serdes-1.0-SNAPSHOT.jar
[INFO]
[INFO] --- maven-shade-plugin:1.7.1:shade (default) @ hive-serdes ---
[INFO] Including org.codehaus.jackson:jackson-core-asl:jar:1.9.8 in the shaded jar.
[INFO] Replacing original artifact with shaded artifact.
[INFO] Replacing /usr/hadoopsw/hive-serdes/target/hive-serdes-1.0-SNAPSHOT.jar with /usr/hadoopsw/hive-serdes/target/hive-serdes-1.0-SNAPSHOT-shaded.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 01:09 min
[INFO] Finished at: 2017-06-11T18:45:07-04:00
[INFO] Final Memory: 25M/211M
[INFO] ------------------------------------------------------------------------

This will generate a file called hive-serdes-1.0-SNAPSHOT.jar in the target directory.
Copy hive-serdes-1.0-SNAPSHOT.jar to $HIVE_HOME/lib


Run hive, and create table using the following commands: CREATE EXTERNAL TABLE tweets_data (
   id BIGINT,

   created_at STRING,

   source STRING,
   favorited BOOLEAN,
   retweeted_status STRUCT<
     text:STRING,
     twitter_user:STRUCT<screen_name:STRING,name:STRING>,
     retweet_count:INT>,
   entities STRUCT<
     urls:ARRAY<STRUCT<expanded_url:STRING>>,
     user_mentions:ARRAY<STRUCT<screen_name:STRING,name:STRING>>,
     hashtags:ARRAY<STRUCT<text:STRING>>>,
   text STRING,
   twitter_user STRUCT<
     screen_name:STRING,
     name:STRING,
     friends_count:INT,
     followers_count:INT,
     statuses_count:INT,
     verified:BOOLEAN,
     utc_offset:INT,
     time_zone:STRING>,
   in_reply_to_screen_name STRING
 ) 
 PARTITIONED BY (datehour INT)
 ROW FORMAT SERDE 'com.cloudera.hive.serde.JSONSerDe'
 LOCATION '/flume/twitter';

If you don't want to use JSONSerde , you could create an external table as below and then use hive function to parse the tweets.

hive> CREATE EXTERNAL TABLE RAW_TWEETS(json_response STRING)  STORED AS TEXTFILE  LOCATION '/flume/twitter';

hive> select * from RAW_TWEETS limit 10;


Parse tweets

CREATE VIEW vw_parsed_tweets as
SELECT

CAST(get_json_object(json_response, '$.id') as BIGINT) as ID,  get_json_object(json_response, '$.created_at') as CREATED_AT,get_json_object(json_response, '$.text') as TEXT,  get_json_object(json_response, '$.retweeted') as RETWEETED,

get_json_object(json_response, '$.coordinates') COORDINATES,  get_json_object(json_response, '$.source') SOURCE,  CAST (get_json_object(json_response, '$.retweet_count') as INT) RETWEET_COUNT,
get_json_object(json_response, '$.entities.urls[0].display_url') DISPLAY_URL,get_json_object(json_response, '$.user.screen_name') USER_SCREEN_NAME,  get_json_object(json_response, '$.user.name') USER_NAME,
CAST (get_json_object(json_response, '$.user.followers_count') as INT) FOLLOWER_COUNT,  CAST (get_json_object(json_response, '$.user.listed_count') as INT) LISTED_COUNT,  CAST (get_json_object(json_response, '$.user.friends_count') as INT) FRIENDS_COUNT,
get_json_object(json_response, '$.user.lang') USER_LANG,  get_json_object(json_response, '$.user.location') USER_LOCATION,  get_json_object(json_response, '$.user.time_zone') USER_TZ, 
get_json_object(json_response, '$.user.profile_image_url') PROFILE_IMAGE_URL
from raw_tweets;

Optional: Create view in PrestoDB

create or replace view hive.flume.vw_tweets as
SELECT
cast(json_extract_scalar(json_response, '$.created_at') as varchar) created_at,
json_extract_scalar(json_response, '$.source') source,
cast(json_extract_scalar(json_response, '$.retweet_count') as bigint) retweet_count,
json_extract_scalar(json_response, '$.retweeted') retweeted,
json_extract_scalar(json_response, '$.id') id,
json_extract_scalar(json_response, '$.text') text,
json_extract_scalar(json_response, '$.lang') lang,
json_extract_scalar(json_response, '$.favorited') favorited,
json_extract_scalar(json_response, '$.possibly_sensitive') possibly_sensitive,
json_extract_scalar(json_response, '$.coordinates') coordinates,
json_extract_scalar(json_response, '$.truncated') truncated,
json_extract_scalar(json_response, '$.timestamp_ms') timestamp_ms,
json_extract_scalar(json_response, '$.entities.urls[0].display_url') display_ur,
json_extract_scalar(json_response, '$.entities.urls[0].expanded_url') expanded_url,
json_extract_scalar(json_response, '$.entities.urls[0].url') url,
cast(json_extract_scalar(json_response, '$.user.friends_count') as bigint) user_friends_count,
json_extract_scalar(json_response, '$.user.profile_image_url_https') user_profile_image_url_https,
cast(json_extract_scalar(json_response, '$.user.listed_count') as bigint) user_listed_count,
json_extract_scalar(json_response, '$.user.profile_background_image_url') user_profile_background_image_url,
cast(json_extract_scalar(json_response, '$.user.favourites_count') as bigint) user_favourites_count,
json_extract_scalar(json_response, '$.user.description') user_description,
json_extract_scalar(json_response, '$.user.created_at') user_created_at,
json_extract_scalar(json_response, '$.user.profile_background_image_url_https') user_profile_background_image_url_https,
json_extract_scalar(json_response, '$.user.protected') user_protected,
json_extract_scalar(json_response, '$.user.id') user_id,
json_extract_scalar(json_response, '$.user.geo_enabled') user_geo_enabled,
json_extract_scalar(json_response, '$.user.lang') user_lang,
json_extract_scalar(json_response, '$.user.verified') user_verified,
json_extract_scalar(json_response, '$.user.time_zone') user_time_zone,
json_extract_scalar(json_response, '$.user.url') user_url,
json_extract_scalar(json_response, '$.user.contributors_enabled') user_contributors_enabled,
cast(json_extract_scalar(json_response, '$.user.statuses_count') as bigint) user_statuses_count,
cast(json_extract_scalar(json_response, '$.user.followers_count') as bigint) user_followers_count,
json_extract_scalar(json_response, '$.user.name') user_name,
json_extract_scalar(json_response, '$.user.location') user_location
FROM
hive.flume.raw_tweets

No comments: