Introduction
Apache Kafka is an open source, distributed publish-subscribe messaging system,
mainly designed to persistent messaging, high throughput, support multiple clients and providing real time message visibility to consumers.
Kafka is a solution to the real-time problems of any software solution, that is, to deal with real-time volumes of information and route it to multiple consumers quickly. Kafka provides seamless integration between information of producers and consumers without blocking the producers of the information, and without letting producers know who the final consumers are. It supports parallel data loading in the Hadoop systems.
It was written in Scala and Java and originated at LinkedIn and later became an open sourced Apache project in 2011.
Terminologies
Topic: A stream of messages belonging to a particular category is called a topic. Data is stored in topics.Partition: Topics are split into partitions. For each topic, Kafka keeps a mini-mum of one partition. Each such partition contains messages in an immutable ordered sequence. A partition is implemented as a set of segment files of equal sizes.
Partition offset: Each partitioned message has a unique sequence id called as "offset".
Brokers: Brokers are simple system responsible for maintaining the pub-lished data.
Producers: Producers are the publisher of messages to one or more Kafka topics. Producers send data to Kafka brokers. Every time a producer pub-lishes a message to a broker, the broker simply appends the message to the last segment file. Actually, the message will be appended to a partition. Producer can also send messages to a partition of their choice.
Consumers: Consumers read data from brokers. Consumers subscribes to one or more topics and consume published messages by pulling data from the brokers.
Leader: Leader is the node responsible for all reads and writes for the given partition. Every partition has one server acting as a leader.
Follower: Node which follows leader instructions are called as follower. If the leader fails, one of the follower will automatically become the new leader. A follower acts as normal consumer, pulls messages and up-dates its own data store.
Kafka cluster typically consists of multiple brokers to maintain load balance. Kafka brokers are stateless, so they use ZooKeeper for maintaining their cluster state. One Kafka broker instance can handle hundreds of thousands of reads and writes per second and each bro-ker can handle TB of messages without performance impact.
ZooKeeper is used for managing and coordinating Kafka broker. ZooKeeper service is mainly used to notify producer and consumer about the presence of any new broker in the Kafka system or failure of the broker in the Kafka system. As per the notification received by the Zookeeper regarding presence or failure of the broker then pro-ducer and consumer takes decision and starts coordinating their task with some other broker.
Producers push data to brokers. When the new broker is started, all the producers search it and automatically sends a message to that new broker. Kafka producer doesn’t wait for acknowledgements from the broker and sends messages as fast as the broker can handle.
Kafka brokers are stateless, so consumer has to maintain how many messages have been consumed by using partition offset. If the consumer acknowledges a particular message offset, it implies that the consumer has consumed all prior messages. The consumer issues an asynchronous pull request to the broker to have a buffer of bytes ready to consume. The consumers can rewind or skip to any point in a partition simply by supplying an offset value. Consumer offset value is notified by ZooKeeper.
Apache Kafka - Installation Steps
Step-1: java InstallationStep-2: ZooKeepter InstallationTo install ZooKeeper framework on your machine, visit the following link and download the latest version of ZooKeeper.
http://zookeeper.apache.org/releases.html
Extract tar file, and change ownership if required
[root@en01 hadoopsw]# tar -zxf zookeeper-3.4.9.tar.gz
[root@en01 hadoopsw]# chown -R hdpclient:hadoop_edge zookeeper-3.4.9
Create Configuration File
Open Configuration File named conf/zookeeper.cfg and add all the following parameters to set as starting point.
[hdpclient@en01 zookeeper-3.4.9]$ vi conf/zoo.cfg
[hdpclient@en01 zookeeper-3.4.9]$ cat conf/zoo.cfg
tickTime=2000
dataDir=~/zookeeper/data
clientPort=2181
initLimit=5
syncLimit=2
the basic time unit in milliseconds used by ZooKeeper. It is used to do heartbeats and the minimum session timeout will be twice the tickTime.
dataDir
the location to store the in-memory database snapshots and, unless specified otherwise, the transaction log of updates to the database.
the location to store the in-memory database snapshots and, unless specified otherwise, the transaction log of updates to the database.
clientPort
the port to listen for client connections
Once the configuration file has been saved successfully and return to terminal again, you can start the zookeeper server.
Start ZooKeeper Server
the port to listen for client connections
Create the directory to hold zookeeper data as mentioned in configuration file
[hdpclient@en01 zookeeper-3.4.9]$ mkdir -p ~/zookeeper/data
Start ZooKeeper Server
[hdpclient@en01 zookeeper-3.4.9]$ bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /usr/hadoopsw/zookeeper-3.4.9/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[hdpclient@en01 zookeeper-3.4.9]$ bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/hadoopsw/zookeeper-3.4.9/bin/../conf/zoo.cfg
Mode: standalone
Connecting to ZooKeeper
[hdpclient@en01 zookeeper-3.4.9]$ bin/zkCli.sh
After typing the above command, you will be connected to the zookeeper server and will get the below prompt.
[zk: localhost:2181(CONNECTED) 0]
[zk: localhost:2181(CONNECTED) 0] quit
Quitting...
2017-06-13 16:16:18,815 [myid:] - INFO [main:ZooKeeper@684] - Session: 0x15ca17cf7690001 closed
2017-06-13 16:16:18,817 [myid:] - INFO [main-EventThread:ClientCnxn$EventThread@519] - EventThread shut down for session: 0x15ca17cf7690001
[hdpclient@en01 zookeeper-3.4.9]$ bin/zkServer.sh stop
ZooKeeper JMX enabled by default
Using config: /usr/hadoopsw/zookeeper-3.4.9/bin/../conf/zoo.cfg
Stopping zookeeper ... STOPPED
Step - 3- Apache Kafka Installation
To install Kafka on your machine, download latest kafka from below location.
Extract the tar file
[kafka@en01 ~]$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic connect-test
Created topic "connect-test".
Once the topic has been created, you can get the notification in Kafka broker terminal window and the log for the created topic specified in "/tmp/kafka-logs/" in the config/server.properties file.
List of Topics
[kafka@en01 hadoopsw]$ kafka-topics.sh --list --zookeeper localhost:2181
Topic-1
https://kafka.apache.org/downloads.html
https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.2.0/kafka_2.12-0.10.2.0.tgz
Extract the tar file
[root@en01 hadoopsw]# tar -zxf kafka_2.12-0.10.2.0.tgz
Run below command to take affect
[hdpclient@en01 ~]$ source ~/.bash_profile
Now You can start the kafka server, zookeeper should be running before starting kafka server
Step - 4: Kafka Cluster Setup
Create a Kafka Topic
Created topic "Topic-1".
Change ownership if required ie; I've created a user kafka for kafka binaries
useradd kafka
passwd kafka
useradd kafka
passwd kafka
[root@en01 hadoopsw]# chown -R kafka kafka_2.12-0.10.2.0
Edit bash_profile and add below variables
vi ~/.bash_profile
### ZooKeeper Variables
export ZOOKEEPER_HOME=/usr/hadoopsw/zookeeper-3.4.9
export PATH=$PATH:$ZOOKEEPER_HOME/bin
### Kafka Variables
export KAFKA_HOME=/usr/hadoopsw/kafka_2.12-0.10.2.0
export PATH=$PATH:$KAFKA_HOME/bin
Run below command to take affect
[hdpclient@en01 ~]$ source ~/.bash_profile
[hdpclient@en01 ~]$ zkServer.sh start
ZooKeeper JMX enabled by default
ZooKeeper JMX enabled by default
Using config: /usr/hadoopsw/zookeeper-3.4.9/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[kafka@en01 ~]$ kafka-server-start.sh $KAFKA_HOME/config/server.properties
ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.
...
...
[2017-06-14 15:45:14,470] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2017-06-14 15:45:14,513] INFO Kafka version : 0.10.2.0 (org.apache.kafka.common.utils.AppInfoParser)
[2017-06-14 15:45:14,514] INFO Kafka commitId : 576d93a8dc0cf421 (org.apache.kafka.common.utils.AppInfoParser)
[2017-06-14 15:45:14,514] INFO [Kafka Server 0], started (kafka.server.KafkaServer)
[kafka@en01 hadoopsw]$ kafka-server-stop.sh config/server.properties
Single Node-Single Broker Configuration
Create a Kafka Topic
Created a topic named "Topic-1" with a single partition and one replica factor.
[kafka@en01 hadoopsw]$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic Topic-1
ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.Created topic "Topic-1".
[kafka@en01 ~]$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic connect-test
Created topic "connect-test".
Once the topic has been created, you can get the notification in Kafka broker terminal window and the log for the created topic specified in "/tmp/kafka-logs/" in the config/server.properties file.
[kafka@en01 hadoopsw]$ kafka-topics.sh --list --zookeeper localhost:2181
Topic-1
Start Producer to Send Messages
In order to start the producer, you need to know the broker list where we want to send the messages. Broker list is defined in Config/server.properties. A default broker with id=0 is already available with a port 9092. Producer is created using kafka-console-producer.sh utility.
[kafka@en01 hadoopsw]$ kafka-console-producer.sh --broker-list localhost:9092 --topic Topic-1
kafka-console-producer.sh --broker-list localhost:9092 --topic connect-test
The producer will wait on input from stdin and publishes to the Kafka cluster. By default, every new line is published as a new message then the default producer properties are specified in config/producer.properties file. Now you can type a few lines of messages in the terminal as shown below.
You can pass a complete file to producer too
[kafka@en01 ~]$ kafka-console-producer.sh --broker-list localhost:9092 --topic Topic-1 < test.txt
Using echo
tail reads from the end of the file as it grows or logs are being added to it continuously
-n0 indicates outputlast 0 lines so only new line is selected
-F follows the file by name instead the descriptor, hence it works even if it is rotated
Start Consumer to Receive Messages
[kafka@en01 ~]$ kafka-console-consumer.sh --zookeeper localhost:2181 --topic Topic-1 --from-beginning
This is first Line
[kafka@en01 ~]$ kafka-console-consumer.sh --zookeeper localhost:2181 --topic Topic-1 --from-beginning > output.txt
Use Case: Use Kafka Connect to import/export data
Writing data from the console and writing it back to the console is a convenient place to start, but you'll probably want to use data from other sources or export data from Kafka to other systems. For many systems, instead of writing custom integration code you can use Kafka Connect (connect-standalone.sh)to import or export data.
Kafka Connect is a tool included with Kafka that imports and exports data to Kafka. It is an extensible tool that runs connectors, which implement the custom logic for interacting with an external system. Connect provides a simple plug-in API for reading from source systems or writing to destination systems. Kafka Connect is a framework for scalably and reliably connecting Kafka with external systems such as databases, key-value stores, search indexes, and file systems.
The Sources in Kafka Connect are responsible for ingesting the data from other system into Kafka while the Sinks are responsible for writing the data to other systems. Note that another new feature has been also introduced in Apache Kafka 0.9 is Kafka Streams. It is a client library for processing and analyzing data stored in Kafka. We can filter, transform, aggregate, the data streams. By combining the Kafka Connect with Kafka Streams, we can build prefect data pipelines.
In this post we'll see how to run Kafka Connect with simple connectors that import data from a file to a Kafka topic and export data from a Kafka topic to a file.
Create some seed data to test
vi /tmp/test.txt
1st Line
2nd Line
Start two connectors (source and sink) running in standalone mode
Source connector that reads lines from an input file and produces each to a Kafka topic
A sink connector that reads messages from a Kafka topic and produces each as a line in an output file
First file is the configuration for the Kafka Connect process, containing common configuration such as the Kafka brokers to connect to and the serialization format for data.
below are the contents of the fileCreate some seed data to test
vi /tmp/test.txt
1st Line
2nd Line
[kafka@en01 ~]$ cat /tmp/test.txt
1st Line
2nd Line
Start two connectors (source and sink) running in standalone mode
Source connector that reads lines from an input file and produces each to a Kafka topic
A sink connector that reads messages from a Kafka topic and produces each as a line in an output file
connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/connect-file-source.properties $KAFKA_HOME/config/connect-file-sink.properties
We provided here three configuration files as parameters to kafka connect.
We provided here three configuration files as parameters to kafka connect.
First file is the configuration for the Kafka Connect process, containing common configuration such as the Kafka brokers to connect to and the serialization format for data.
## connect-standalone.properties
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
## connect-file-source.properties
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=/tmp/test.txt
topic=connect-test
## connect-file-sink.properties
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=/tmp/test.sink.txt
topics=connect-test
connect-standalone.sh $KAFKA_HOME/config/connectstandalone.properties $KAFKA_HOME/config/connect-file-source.properties $KAFKA_HOME/config/connect-file-sink.properties
...
...
2017-06-14 16:07:27,273] INFO Successfully joined group connect-local-file-sink with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:375)
[2017-06-14 16:07:27,274] INFO Setting newly assigned partitions [connect-test-0] for group connect-local-file-sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:252)
[2017-06-14 16:07:35,040] INFO Finished WorkerSourceTask{id=local-file-source-0} commitOffsets successfully in 7 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:371)
[2017-06-14 16:07:37,224] INFO WorkerSinkTask{id=local-file-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:272)
[2017-06-14 16:07:37,227] WARN Commit of WorkerSinkTask{id=local-file-sink-0} offsets timed out (org.apache.kafka.connect.runtime.WorkerSinkTask:172)
2
3
4
5
|
echo 'event-1' >> /tmp/test.txt
|
[kafka@en01 ~]$ kafka-console-consumer.sh --zookeeper localhost:2181 --topic connect-test --from-beginning
1
2
3
4
5
|
{"schema":{"type":"string","optional":false},"payload":"event-1"}
|
[hdpsysuser@dn04 ~]$ cat /tmp/test.sink.txt
event-1
event-1
Some Maintenance operations.
1- Update the retention time on the topic
[hdpsysuser@dn04 ~]$ kafka-topics.sh --zookeeper localhost:2181 --alter --topic connect-test --config retention.ms=1000
WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.
Going forward, please use kafka-configs.sh for this functionality
Updated config for topic "connect-test".
2- List all topics[kafka@en01 ~]$ kafka-topics.sh --zookeeper localhost:2181 --list
Topic-1
__consumer_offsets
connect-test
3- Describe a topicGoing forward, please use kafka-configs.sh for this functionality
Updated config for topic "connect-test".
2- List all topics[kafka@en01 ~]$ kafka-topics.sh --zookeeper localhost:2181 --list
Topic-1
__consumer_offsets
connect-test
[kafka@en01 ~]$ kafka-topics.sh --describe --zookeeper localhost:2181 --topic connect-test
Topic:connect-test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: connect-test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
4- Add more partitions to the topic
Below command will add 4 more partitions to the Topic-1 topic. Note that before, the topic has only 1 partition.
[kafka@en01 ~]$ kafka-topics.sh --alter --zookeeper localhost:2181 --partitions 5 --topic Topic-1
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
5- Add configurations to the Kafka topic
The general syntax is:
kafka-topics.sh --alter --zookeeper localhost:2181 --topic Topic-1 --config <key>=<value>
For example, below command will set the max message size = 128000 bytes for the Topic-1 topic.
[kafka@en01 ~]$ kafka-topics.sh --zookeeper localhost:2181 --alter --topic Topic-1 --config max.message.bytes=128000
WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.
Going forward, please use kafka-configs.sh for this functionality
Updated config for topic "Topic-1".
The general syntax is:
kafka-topics.sh --alter --zookeeper localhost:2181 --topic Topic-1 --config <key>=<value>
For example, below command will set the max message size = 128000 bytes for the Topic-1 topic.
[kafka@en01 ~]$ kafka-topics.sh --zookeeper localhost:2181 --alter --topic Topic-1 --config max.message.bytes=128000
WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.
Going forward, please use kafka-configs.sh for this functionality
Updated config for topic "Topic-1".
6- Delete configurations to the Kafka topic
To remove above overridden configuration, we can use command:
[kafka@en01 ~]$ kafka-topics.sh --zookeeper localhost:2181 --alter --topic Topic-1 --delete-config max.message.bytes
WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.
Going forward, please use kafka-configs.sh for this functionality
Updated config for topic "Topic-1".
To remove above overridden configuration, we can use command:
[kafka@en01 ~]$ kafka-topics.sh --zookeeper localhost:2181 --alter --topic Topic-1 --delete-config max.message.bytes
WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.
Going forward, please use kafka-configs.sh for this functionality
Updated config for topic "Topic-1".
1 comment:
Very nice post.Keep updating.
Thank you.
big data hadoop course
big data hadoop training
big data online course
big data online training
Post a Comment