Please see my other blog for Oracle EBusiness Suite Posts - EBMentors

Search This Blog

Note: All the posts are based on practical approach avoiding lengthy theory. All have been tested on some development servers. Please don’t test any post on production servers until you are sure.

Saturday, June 24, 2017

Installing/Configuring and working with Apache Kafka


Introduction

Apache Kafka is an open source, distributed publish-subscribe messaging system,
mainly designed to persistent messaging, high throughput, support multiple clients and providing real time message visibility to consumers.

Kafka is a solution to the real-time problems of any software solution, that is, to deal with real-time volumes of information and route it to multiple consumers quickly. Kafka provides seamless integration between information of producers and consumers without blocking the producers of the information, and without letting producers know who the final consumers are. It supports parallel data loading in the Hadoop systems.


It was written in Scala and Java and originated at LinkedIn and later became an open sourced Apache project in 2011. 


Terminologies

Topic: A stream of messages belonging to a particular category is called a topic. Data is stored in topics.

Partition: Topics are split into partitions. For each topic, Kafka keeps a mini-mum of one partition. Each such partition contains messages in an immutable ordered sequence. A partition is implemented as a set of segment files of equal sizes.

Partition offset: Each partitioned message has a unique sequence id called as "offset".


Replicas of partition: Replicas are nothing but backups of a partition.

Brokers: Brokers are simple system responsible for maintaining the pub-lished data.

Kafka Cluster: Kafka’s having more than one broker are called as Kafka cluster.
Producers: Producers are the publisher of messages to one or more Kafka topics. Producers send data to Kafka brokers. Every time a producer pub-lishes a message to a broker, the broker simply appends the message to the last segment file. Actually, the message will be appended to a partition. Producer can also send messages to a partition of their choice.

Consumers: Consumers read data from brokers. Consumers subscribes to one or more topics and consume published messages by pulling data from the brokers.

Leader: Leader is the node responsible for all reads and writes for the given partition. Every partition has one server acting as a leader.

Follower: Node which follows leader instructions are called as follower. If the leader fails, one of the follower will automatically become the new leader. A follower acts as normal consumer, pulls messages and up-dates its own data store.


Kafka Cluster Architecture


Kafka cluster typically consists of multiple brokers to maintain load balance. Kafka brokers are stateless, so they use ZooKeeper for maintaining their cluster state. One Kafka broker instance can handle hundreds of thousands of reads and writes per second and each bro-ker can handle TB of messages without performance impact.





ZooKeeper is used for managing and coordinating Kafka broker. ZooKeeper service is mainly used to notify producer and consumer about the presence of any new broker in the Kafka system or failure of the broker in the Kafka system. As per the notification received by the Zookeeper regarding presence or failure of the broker then pro-ducer and consumer takes decision and starts coordinating their task with some other broker.



Producers push data to brokers. When the new broker is started, all the producers search it and automatically sends a message to that new broker. Kafka producer doesn’t wait for acknowledgements from the broker and sends messages as fast as the broker can handle.



Kafka brokers are stateless, so consumer has to maintain how many messages have been consumed by using partition offset. If the consumer acknowledges a particular message offset, it implies that the consumer has consumed all prior messages. The consumer issues an asynchronous pull request to the broker to have a buffer of bytes ready to consume. The consumers can rewind or skip to any point in a partition simply by supplying an offset value. Consumer offset value is notified by ZooKeeper.


Apache Kafka - Installation Steps

Step-1: java Installation


Step-2
: ZooKeepter InstallationTo install ZooKeeper framework on your machine, visit the following link and download the latest version of ZooKeeper.



http://zookeeper.apache.org/releases.html

Extract tar file, and change ownership if required
[root@en01 hadoopsw]# tar -zxf zookeeper-3.4.9.tar.gz

[root@en01 hadoopsw]# chown -R hdpclient:hadoop_edge zookeeper-3.4.9

Create Configuration File

Open Configuration File named conf/zookeeper.cfg and add all the following parameters to set as starting point.


[hdpclient@en01 zookeeper-3.4.9]$ vi conf/zoo.cfg
[hdpclient@en01 zookeeper-3.4.9]$ cat conf/zoo.cfg
tickTime=2000
dataDir=~/zookeeper/data
clientPort=2181
initLimit=5
syncLimit=2

tickTime
the basic time unit in milliseconds used by ZooKeeper. It is used to do heartbeats and the minimum session timeout will be twice the tickTime.

dataDir
the location to store the in-memory database snapshots and, unless specified otherwise, the transaction log of updates to the database.

clientPort
the port to listen for client connections


Create the directory to hold zookeeper data as mentioned in configuration file
[hdpclient@en01 zookeeper-3.4.9]$ mkdir -p  ~/zookeeper/data

Once the configuration file has been saved successfully and return to terminal again, you can start the zookeeper server.


Start ZooKeeper Server

[hdpclient@en01 zookeeper-3.4.9]$ bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /usr/hadoopsw/zookeeper-3.4.9/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

[hdpclient@en01 zookeeper-3.4.9]$ bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/hadoopsw/zookeeper-3.4.9/bin/../conf/zoo.cfg
Mode: standalone

Connecting to ZooKeeper


[hdpclient@en01 zookeeper-3.4.9]$ bin/zkCli.sh
After typing the above command, you will be connected to the zookeeper server and will get the below prompt.

[zk: localhost:2181(CONNECTED) 0]

[zk: localhost:2181(CONNECTED) 0] quit
Quitting...
2017-06-13 16:16:18,815 [myid:] - INFO  [main:ZooKeeper@684] - Session: 0x15ca17cf7690001 closed
2017-06-13 16:16:18,817 [myid:] - INFO  [main-EventThread:ClientCnxn$EventThread@519] - EventThread shut down for session: 0x15ca17cf7690001

[hdpclient@en01 zookeeper-3.4.9]$ bin/zkServer.sh stop
ZooKeeper JMX enabled by default
Using config: /usr/hadoopsw/zookeeper-3.4.9/bin/../conf/zoo.cfg
Stopping zookeeper ... STOPPED


Step - 3- Apache Kafka Installation

To install Kafka on your machine, download latest kafka from below location.

https://kafka.apache.org/downloads.html
https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.2.0/kafka_2.12-0.10.2.0.tgz



Extract the tar file

[root@en01 hadoopsw]# tar -zxf kafka_2.12-0.10.2.0.tgz


Change ownership if required ie; I've created a user kafka for kafka binaries
useradd kafka
passwd kafka

[root@en01 hadoopsw]# chown -R kafka kafka_2.12-0.10.2.0


Edit bash_profile and add below variables
vi ~/.bash_profile

### ZooKeeper Variables
export ZOOKEEPER_HOME=/usr/hadoopsw/zookeeper-3.4.9
export PATH=$PATH:$ZOOKEEPER_HOME/bin

### Kafka Variables
export KAFKA_HOME=/usr/hadoopsw/kafka_2.12-0.10.2.0
export PATH=$PATH:$KAFKA_HOME/bin

Run below command to take affect
[hdpclient@en01 ~]$ source ~/.bash_profile

Now You can start the kafka server, zookeeper should be running before starting kafka server



[hdpclient@en01 ~]$ zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /usr/hadoopsw/zookeeper-3.4.9/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED


[kafka@en01 ~]$ kafka-server-start.sh $KAFKA_HOME/config/server.properties
ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.
...
...
[2017-06-14 15:45:14,470] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2017-06-14 15:45:14,513] INFO Kafka version : 0.10.2.0 (org.apache.kafka.common.utils.AppInfoParser)
[2017-06-14 15:45:14,514] INFO Kafka commitId : 576d93a8dc0cf421 (org.apache.kafka.common.utils.AppInfoParser)
[2017-06-14 15:45:14,514] INFO [Kafka Server 0], started (kafka.server.KafkaServer)

[kafka@en01 hadoopsw]$ kafka-server-stop.sh config/server.properties

Step - 4:  Kafka Cluster Setup

Single Node-Single Broker Configuration


Create a Kafka Topic



Created a topic named "Topic-1" with a single partition and one replica factor.

[kafka@en01 hadoopsw]$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic Topic-1
ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.
Created topic "Topic-1".


[kafka@en01 ~]$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic connect-test
Created topic "connect-test".

Once the topic has been created, you can get the notification in Kafka broker terminal window and the log for the created topic specified in "/tmp/kafka-logs/" in the config/server.properties file.

List of Topics
[kafka@en01 hadoopsw]$ kafka-topics.sh --list --zookeeper localhost:2181
Topic-1


Start Producer to Send Messages


In order to start the producer, you need to know the broker list where we want to send the messages. Broker list is defined in Config/server.properties. A default broker with id=0 is already available with a port 9092. Producer is created using kafka-console-producer.sh utility. 

[kafka@en01 hadoopsw]$ kafka-console-producer.sh --broker-list localhost:9092 --topic Topic-1

kafka-console-producer.sh --broker-list localhost:9092 --topic connect-test

The producer will wait on input from stdin and publishes to the Kafka cluster. By default, every new line is published as a new message then the default producer properties are specified in config/producer.properties file. Now you can type a few lines of messages in the terminal as shown below.

You can pass a complete file to producer too


[kafka@en01 ~]$ kafka-console-producer.sh --broker-list localhost:9092 --topic Topic-1  < test.txt

Using echo
[kafka@en01 ~]$ echo "Salam" | kafka-console-producer.sh --broker-list localhost:9092 --topic Topic-1

Explanation
tail reads from the end of the file as it grows or logs are being added to it continuously
-n0 indicates outputlast 0 lines so only new line is selected
-F follows the file by name instead the descriptor, hence it works even if it is rotated



Start Consumer to Receive Messages


Similar to producer, the default consumer properties are specified in config/consumer.proper-ties file. Open a new terminal and type the below syntax for consuming messages.


[kafka@en01 ~]$ kafka-console-consumer.sh --zookeeper localhost:2181 --topic Topic-1 --from-beginning

This is first Line





Now you are able to enter messages from the producer’s terminal and see them appearing in the consumer’s terminal.

You could redirect the output to another file also.

[kafka@en01 ~]$ kafka-console-consumer.sh --zookeeper localhost:2181 --topic Topic-1 --from-beginning > output.txt


 kafka-console-consumer.sh --zookeeper localhost:2181 --topic connect-test --from-beginning

Use Case: Use Kafka Connect to import/export data

Writing data from the console and writing it back to the console is a convenient place to start, but you'll probably want to use data from other sources or export data from Kafka to other systems. For many systems, instead of writing custom integration code you can use Kafka Connect (connect-standalone.sh)to import or export data.


Kafka Connect is a tool included with Kafka that imports and exports data to Kafka. It is an extensible tool that runs connectors, which implement the custom logic for interacting with an external system. Connect provides a simple plug-in API for reading from source systems or writing to destination systems. Kafka Connect is a framework for scalably and reliably connecting Kafka with external systems such as databases, key-value stores, search indexes, and file systems.
A Connect instance connecting to a set of data systems might look like this:



The Sources in Kafka Connect are responsible for ingesting the data from other system into Kafka while the Sinks are responsible for writing the data to other systems. Note that another new feature has been also introduced in Apache Kafka 0.9 is Kafka Streams. It is a client library for processing and analyzing data stored in Kafka. We can filter, transform, aggregate, the data streams. By combining the Kafka Connect with Kafka Streams, we can build prefect data pipelines.




In this post we'll see how to run Kafka Connect with simple connectors that import data from a file to a Kafka topic and export data from a Kafka topic to a file.


Create some seed data to test

vi /tmp/test.txt
1st Line
2nd Line

[kafka@en01 ~]$ cat /tmp/test.txt
1st Line
2nd Line


Start two connectors (source and sink) running in standalone mode
Source connector that reads lines from an input file and produces each to a Kafka topic
A sink connector that reads messages from a Kafka topic and produces each as a line in an output file

connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/connect-file-source.properties $KAFKA_HOME/config/connect-file-sink.properties
We provided here three configuration files as parameters to kafka connect.

First file is the configuration for the Kafka Connect process, containing common configuration such as the Kafka brokers to connect to and the serialization format for data.
below are the contents of the file

## connect-standalone.properties
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

The remaining configuration files each specify a connector to create. These files include a unique connector name, the connector class to instantiate, and any other configuration required by the connector.

These sample configuration files, included with Kafka, use the default local cluster configuration you started earlier and create two connectors.

## connect-file-source.properties
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=/tmp/test.txt
topic=connect-test


## connect-file-sink.properties
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=/tmp/test.sink.txt
topics=connect-test

Note that after below  command, the connector is ready for reading the content from test.txt file

connect-standalone.sh $KAFKA_HOME/config/connectstandalone.properties $KAFKA_HOME/config/connect-file-source.properties $KAFKA_HOME/config/connect-file-sink.properties

...
...
2017-06-14 16:07:27,273] INFO Successfully joined group connect-local-file-sink with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:375)
[2017-06-14 16:07:27,274] INFO Setting newly assigned partitions [connect-test-0] for group connect-local-file-sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:252)
[2017-06-14 16:07:35,040] INFO Finished WorkerSourceTask{id=local-file-source-0} commitOffsets successfully in 7 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:371)
[2017-06-14 16:07:37,224] INFO WorkerSinkTask{id=local-file-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:272)
[2017-06-14 16:07:37,227] WARN Commit of WorkerSinkTask{id=local-file-sink-0} offsets timed out (org.apache.kafka.connect.runtime.WorkerSinkTask:172)


Now write some content to the test.txt file


2
3
4
5
echo 'event-1' >> /tmp/test.txt

Check whether the Source connector feed the test.txt content into the topic connect-test or not

[kafka@en01 ~]$ kafka-console-consumer.sh --zookeeper localhost:2181 --topic connect-test --from-beginning 

The output on the console is:
1
2
3
4
5
{"schema":{"type":"string","optional":false},"payload":"event-1"}


Check whether the Sink Connector write content to the test.sink.txt or not



[hdpsysuser@dn04 ~]$ cat /tmp/test.sink.txt

event-1



Some Maintenance operations.


1- Update the retention time on the topic

[hdpsysuser@dn04 ~]$ kafka-topics.sh --zookeeper localhost:2181 --alter --topic connect-test --config retention.ms=1000
WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.
         Going forward, please use kafka-configs.sh for this functionality
Updated config for topic "connect-test".

2- List all topics[kafka@en01 ~]$ kafka-topics.sh --zookeeper localhost:2181 --list
Topic-1
__consumer_offsets
connect-test
3- Describe a topic

[kafka@en01 ~]$ kafka-topics.sh --describe --zookeeper localhost:2181 --topic connect-test
Topic:connect-test      PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: connect-test     Partition: 0    Leader: 0       Replicas: 0     Isr: 0

4- Add more partitions to the topic
Below command will add 4 more partitions to the Topic-1 topic. Note that before, the topic has only 1 partition.
[kafka@en01 ~]$ kafka-topics.sh --alter --zookeeper localhost:2181 --partitions 5 --topic Topic-1
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

5- Add configurations to the Kafka topic
The general syntax is:
kafka-topics.sh --alter --zookeeper localhost:2181 --topic Topic-1 --config <key>=<value>

For example,  below command will set the max message size = 128000 bytes for the Topic-1 topic.
[kafka@en01 ~]$ kafka-topics.sh --zookeeper localhost:2181 --alter --topic Topic-1 --config max.message.bytes=128000
WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.
         Going forward, please use kafka-configs.sh for this functionality

Updated config for topic "Topic-1".


6- Delete configurations to the Kafka topic
To remove above overridden configuration, we can use command:
[kafka@en01 ~]$ kafka-topics.sh --zookeeper localhost:2181 --alter --topic Topic-1 --delete-config max.message.bytes
WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.
         Going forward, please use kafka-configs.sh for this functionality
Updated config for topic "Topic-1".