Goal
Install Kafka 2.5.0 and its dependencies on Ubuntu 16.04. Configure Kafka to run as a systemd service.
Sub-goals: Run a Hello World test for Kafka. Make a list of useful Kafka commands. Export all messages from a topic to a file.
Contents
- Goal
- Contents
- Notes
- Installation Recipe
- Hello World Test
- Kafka Commands
- Export All Messages From A Topic
Notes
Security: To perform this recipe, you should either be a) logged in over a network via SSH or b) using the machine via a directly-connected keyboard and monitor. You must be logged in as a user with sudo permissions.
In this recipe, the service user is called "franz", but you can use a different name.
Kafka relies on Zookeeper and the Java Virtual Machine (JVM). The Kafka binaries archive will include Zookeeper.
Kafka 2.5.0 requires Zookeeper 3.5.7.
By default, Zookeeper runs on port 2181 and Kafka runs on port 9092.
Excerpt from:
kafka.apache.org/documentation/#java
From a security perspective, we recommend you use the latest released version of JDK 1.8 as older freely available versions have disclosed security vulnerabilities.
Security: Consider configuring the firewall (using e.g.
ufw
) in order to be sure that no unauthorised connection can be made from another machine.Virtual machine specs:
- Hostname: kafkaTest4
- OS: Ubuntu 16.04.6 (LTS) x64
- RAM: 2 GB
- 1 CPU
- Disk: 50 GB SSD
Installation Recipe
### Create new service user.
When setting up a service, it's usually best to create a new user that will manage the service and its resources.
It's important to limit the permissions of this service user account, so that it doesn't have more control over the server than necessary.
I'll give the user sudo permissions while setting up the service and installing various components, and then remove them.
Useful commands:
Get list of users on the server.
cat /etc/passwd | awk -F: '{ print $1}'
Check current user's sudo permissions.
sudo -l
Check another user's sudo permissions.
sudo -l -U franz
1. Create user.
sudo useradd franz --create-home
No password is created (password access will be blocked). A home directory for the user will be created:
/home/franz
2. Give the user sudo permissions.
Run
sudo visudo
.In the visudo interface, add the following line at the end of the file:
franz ALL=(ALL) NOPASSWD: ALL
3. Log in as the user. Only root or a sudo user can do this. To log out, run
logout
.
sudo su -l franz
4. [For later use] After installing and setting up the service, remove sudo permissions from the user.
Log out from the user by running
logout
.Run
sudo visudo
.In the visudo interface, delete the following line:
franz ALL=(ALL) NOPASSWD: ALL
### Install the JVM.
Check if it's already installed.
java -version
Install it if not.
sudo apt-get update
sudo apt-get -y install default-jre
java -version
sudo apt-get -y install default-jre
java -version
Example:
franz@kafkaTest4:~$ java -version
openjdk version "1.8.0_265"
OpenJDK Runtime Environment (build 1.8.0_265-8u265-b01-0ubuntu2~16.04-b01)
OpenJDK 64-Bit Server VM (build 25.265-b01, mixed mode)
### Set up work directories.
# Change directory to franz user's home directory.
cd /home/franz
# Create downloads directory.
mkdir downloads
# Create kafka directory.
mkdir kafka
cd /home/franz
# Create downloads directory.
mkdir downloads
# Create kafka directory.
mkdir kafka
### Get the list of GPG keys for the Apache developers.
cd ~/downloads
wget https://downloads.apache.org/kafka/KEYS
# Check that GPG is installed.
gpg --version
# Import the keys.
gpg --import KEYS
wget https://downloads.apache.org/kafka/KEYS
# Check that GPG is installed.
gpg --version
# Import the keys.
gpg --import KEYS
### Download and verify Kafka binaries archive for Kafka 2.5.0.
Browse to:
downloads.apache.org/kafka/2.5.0
In the list, you should see these items:
- kafka_2.12-2.5.0.tgz
- kafka_2.12-2.5.0.tgz.asc
- kafka_2.12-2.5.0.tgz.md5
- kafka_2.12-2.5.0.tgz.sha1
- kafka_2.12-2.5.0.tgz.sha512
"2.12" is the Scala version. "2.5.0" is the Kafka version.
The .asc file is the GPG signature. The other files contain hash values.
Some details from the documentation:
kafka.apache.org/downloads
2.5.0 is the latest release. The current stable version is 2.5.0.
We build for multiple versions of Scala. This only matters if you are using Scala and you want a version built for the same Scala version you use. Otherwise any version should work (2.12 is recommended).
Kafka 2.5.0 includes a number of significant new features.
[...]
- Upgrade Zookeeper to 3.5.7
- Deprecate support for Scala 2.11
cd ~/downloads
wget https://downloads.apache.org/kafka/2.5.0/kafka_2.12-2.5.0.tgz
wget https://downloads.apache.org/kafka/2.5.0/kafka_2.12-2.5.0.tgz
### Verify MD5 hash.
md5sum kafka_2.12-2.5.0.tgz | cut -d' ' -f1 > md5_calculated.txt
Example:
franz@kafkaTest4:~/downloads$ cat md5_calculated.txt
29ba62ed67483c8b060d7bb758923d5b
wget https://downloads.apache.org/kafka/2.5.0/kafka_2.12-2.5.0.tgz.md5
Example:
franz@kafkaTest4:~/downloads$ cat kafka_2.12-2.5.0.tgz.md5
kafka_2.12-2.5.0.tgz: 29 BA 62 ED 67 48 3C 8B 06 0D 7B B7 58 92 3D 5B
cat kafka_2.12-2.5.0.tgz.md5 | sed 's/kafka_2.12-2.5.0.tgz: //' | sed 's/ //g' | tr [:upper:] [:lower:] > md5_supplied.txt
diff md5_calculated.txt md5_supplied.txt
diff md5_calculated.txt md5_supplied.txt
### Verify SHA512 hash.
sha512sum kafka_2.12-2.5.0.tgz
Example:
franz@kafkaTest4:~/downloads$ sha512sum kafka_2.12-2.5.0.tgz
447a7057bcd9faca98b6f4807bd6019ef73eee90efdc1e7b10005f669e2537a8a190cb8b9c9f4c20db1d95b13d0f0487e9cc560d0759532058439ce7f722c7cd kafka_2.12-2.5.0.tgz
sha512sum kafka_2.12-2.5.0.tgz | cut -d' ' -f1 > sha512_calculated.txt
wget https://downloads.apache.org/kafka/2.5.0/kafka_2.12-2.5.0.tgz.sha512
wget https://downloads.apache.org/kafka/2.5.0/kafka_2.12-2.5.0.tgz.sha512
Example:
franz@kafkaTest4:~/downloads$ cat kafka_2.12-2.5.0.tgz.sha512
kafka_2.12-2.5.0.tgz: 447A7057 BCD9FACA 98B6F480 7BD6019E F73EEE90 EFDC1E7B
10005F66 9E2537A8 A190CB8B 9C9F4C20 DB1D95B1 3D0F0487
E9CC560D 07595320 58439CE7 F722C7CD
cat kafka_2.12-2.5.0.tgz.sha512 | sed 's/kafka_2.12-2.5.0.tgz: //' | tr '\n' ' ' | sed 's/ //g' | tr [:upper:] [:lower:] > sha512_supplied.txt
# Add a newline at the end of the file.
echo "" >> sha512_supplied.txt
# Add a newline at the end of the file.
echo "" >> sha512_supplied.txt
Example:
franz@kafkaTest4:~/downloads$ cat sha512_supplied.txt
447a7057bcd9faca98b6f4807bd6019ef73eee90efdc1e7b10005f669e2537a8a190cb8b9c9f4c20db1d95b13d0f0487e9cc560d0759532058439ce7f722c7cd
diff sha512_calculated.txt sha512_supplied.txt
### Verify GPG signature.
wget https://downloads.apache.org/kafka/2.5.0/kafka_2.12-2.5.0.tgz.asc
gpg --verify kafka_2.12-2.5.0.tgz.asc kafka_2.12-2.5.0.tgz
gpg --verify kafka_2.12-2.5.0.tgz.asc kafka_2.12-2.5.0.tgz
Example:
franz@kafkaTest4:~/downloads$ gpg --verify kafka_2.12-2.5.0.tgz.asc kafka_2.12-2.5.0.tgz
gpg: Signature made Wed 08 Apr 2020 01:18:52 AM UTC using RSA key ID 989E9B3F
gpg: Good signature from "David Arthur (CODE SIGNING KEY) <davidarthur@apache.org>"
gpg: WARNING: This key is not certified with a trusted signature!
gpg: There is no indication that the signature belongs to the owner.
Primary key fingerprint: 63F2 B2B8 D10C CC7A 0CE8 8A72 E9F8 7164 989E 9B3F
The important phrase is "gpg: Good signature from".
### Extract the Kafka binaries archive.
# Change directory to kafka directory.
cd ~/kafka
# Extract the archive.
tar -xvzf ~/downloads/kafka_2.12-2.5.0.tgz --strip-components 1
cd ~/kafka
# Extract the archive.
tar -xvzf ~/downloads/kafka_2.12-2.5.0.tgz --strip-components 1
Note: The
--strip-components 1
will remove the first directory in a target filepath. This means that the archive's contents will be copied directly into the new kafka directory.## Configure the Kafka server.
Two configuration issues:
- By default, Kafka will not allow us to delete a topic.
- By default, Kafka will store its messages and data in the location
/tmp/kafka-logs
.Edit the configuration file in order to modify these settings.
Open the file
~/kafka/config/server.properties
. Change this line:
log.dirs=/tmp/kafka-logs
to:
log.dirs=/var/log/kafka_data
Add this line at the bottom of the "Internal Topic Settings" section:
delete.topic.enable = true
Create the log directory tree.
cd /var/log
sudo mkdir kafka_data
sudo chown franz:franz kafka_data
sudo mkdir kafka_data
sudo chown franz:franz kafka_data
### Configure the Zookeeper server.
Zookeeper is a service that Kafka uses to manage its cluster state and configurations. It's included in the Kafka binaries archive.
cd ~/kafka
Open the file
~/kafka/config/zookeeper.properties
. Change this line:
dataDir=/tmp/zookeeper
to:
dataDir=/var/log/zookeeper_data
Create the log directory.
cd /var/log
sudo mkdir zookeeper_data
sudo chown franz:franz zookeeper_data
sudo mkdir zookeeper_data
sudo chown franz:franz zookeeper_data
### Create the unit file for Zookeeper.
This will allow us to perform common service actions such as starting, stopping, and restarting Zookeeper in a manner consistent with other Linux services.
File location:
/etc/systemd/system/zookeeper.service
Open with
sudo
, e.g.:
sudo vim /etc/systemd/system/zookeeper.service
File content:
[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=simple
User=franz
ExecStart=/home/franz/kafka/bin/zookeeper-server-start.sh /home/franz/kafka/config/zookeeper.properties
ExecStop=/home/franz/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=simple
User=franz
ExecStart=/home/franz/kafka/bin/zookeeper-server-start.sh /home/franz/kafka/config/zookeeper.properties
ExecStop=/home/franz/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
The [Unit] section specifies that Zookeeper requires networking and the filesystem to be ready before it can start.
The [Service] section specifies that systemd should use the
zookeeper-server-start.sh
and
zookeeper-server-stop.sh
shell files for starting and stopping the service. It also specifies that Zookeeper should be restarted automatically if it exits abnormally.### Create the unit file for Kafka.
This will allow us to perform common service actions such as starting, stopping, and restarting Kafka in a manner consistent with other Linux services.
File location:
/etc/systemd/system/kafka.service
Open with
sudo
, e.g.:
sudo vim /etc/systemd/system/kafka.service
File content:
[Unit]
Requires=zookeeper.service
After=zookeeper.service
[Service]
Type=simple
User=franz
ExecStart=/bin/sh -c '/home/franz/kafka/bin/kafka-server-start.sh /home/franz/kafka/config/server.properties > /var/log/kafka/kafka.log 2>&1'
ExecStop=/home/franz/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
Requires=zookeeper.service
After=zookeeper.service
[Service]
Type=simple
User=franz
ExecStart=/bin/sh -c '/home/franz/kafka/bin/kafka-server-start.sh /home/franz/kafka/config/server.properties > /var/log/kafka/kafka.log 2>&1'
ExecStop=/home/franz/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
The [Unit] section specifies that this unit file depends on zookeeper.service. This will ensure that zookeeper gets started automatically when the kafka service starts.
The [Service] section specifies that systemd should use the
kafka-server-start.sh
and
kafka-server-stop.sh
shell files for starting and stopping the service. It also specifies that Kafka should be restarted automatically if it exits abnormally.Note: I've specified the location of the main Kafka log to be
/var/log/kafka/kafka.log
.Create the log directory:
cd /var/log
sudo mkdir kafka
sudo chown franz:franz kafka
sudo mkdir kafka
sudo chown franz:franz kafka
### Start the Kafka service.
sudo systemctl start kafka
Check the journal logs for the kafka unit:
sudo journalctl -u kafka
Example:
franz@kafkaTest4:/var/log$ sudo journalctl -u kafka
-- Logs begin at Mon 2020-08-10 08:17:24 UTC, end at Mon 2020-08-10 08:46:32 UTC. --
Aug 10 08:46:09 kafkaTest4 systemd[1]: Started kafka.service.
Check the status of each service:
sudo service kafka status | grep Active
sudo service zookeeper status | grep Active
sudo service zookeeper status | grep Active
Example:
franz@kafkaTest4:/var/log$ sudo service kafka status | grep Active
Active: active (running) since Mon 2020-08-10 08:46:09 UTC; 52s ago
franz@kafkaTest4:/var/log$ sudo service zookeeper status | grep Active
Active: active (running) since Mon 2020-08-10 08:46:09 UTC; 52s ago
### Kafka application logs directory problem.
Note: Don't run any of the commands in this section.
By default, the Kafka application logs (of various kinds) will be written to the directory
$base_dir/logs
.
$base_dir
is, in this case,
/home/franz/kafka
. The shell script
kafka-server-start.sh
calls another shell script:
kafka-run-class.sh
Excerpt from
bin/kafka-run-class.sh
:# Log directory to use | |
if [ "x$LOG_DIR" = "x" ]; then | |
LOG_DIR="$base_dir/logs" | |
fi |
I have experimented with using this line in
kafka-server-start.sh
, prior to the line in which
kafka-run-class.sh
is called:
export LOG_DIR="/var/log/kafka/logs"
Note: You need to create the directory tree
/var/log/kafka/logs
and change its ownership recursively to the
kafka
user. Result: Partial success. Most logs were written to the new logs directory. However, I have found that some logs were still written to the directory
/home/franz/kafka/logs
. Based on timestamps, I think that when the kafka service started, it would write some logs to
$base_dir/logs
, then switch over to the new logs directory.A situation where the application logs are fractured between multiple locations is undesirable, especially if this fracturing does not occur only once, but on every stop-and-start of the service.
Rather than try to solve this right now, I've settled for a temporary solution: Creating a symlink from
/var/log/kafka/logs
to
/home/franz/kafka/logs
.### Create a symlink from the main kafka log directory to the default kafka instance's log directory.
Because the Kafka service has been started, there is now a
logs
directory in the directory
/home/franz/kafka
. Example:
franz@kafkaTest4:/var/log$ ls -1 ~/kafka/
bin
config
libs
LICENSE
logs
NOTICE
site-docs
franz@kafkaTest4:/var/log$ ls -1 ~/kafka/logs
controller.log
kafka-authorizer.log
kafka-request.log
kafkaServer-gc.log.0.current
log-cleaner.log
server.log
state-change.log
zookeeper-gc.log.0.current
We can now symlink to this directory from our chosen main kafka log directory.
cd /var/log/kafka
ln -s /home/franz/kafka/logs logs
ln -s /home/franz/kafka/logs logs
Example:
franz@kafkaTest4:/var/log/kafka$ ls -1
kafka.log
logs
franz@kafkaTest4:/var/log/kafka$ file logs
logs: symbolic link to /home/franz/kafka/logs
franz@kafkaTest4:/var/log/kafka$ cd logs
franz@kafkaTest4:/var/log/kafka/logs$ ls -1
controller.log
kafka-authorizer.log
kafka-request.log
kafkaServer-gc.log.0.current
log-cleaner.log
server.log
state-change.log
zookeeper-gc.log.0.current
### Set Kafka to start on boot.
sudo systemctl enable kafka
Example:
franz@kafkaTest4:~/kafka$ sudo systemctl enable kafka
Created symlink from /etc/systemd/system/multi-user.target.wants/kafka.service to /etc/systemd/system/kafka.service.
### Service commands
We've finished the installation process.
The following systemd service commands should now work. They can be used by any user with sudo permissions.
Service commands:
sudo service kafka status
sudo service kafka start
sudo service kafka stop
sudo service kafka restart
sudo service kafka start
sudo service kafka stop
sudo service kafka restart
sudo service zookeeper status
sudo service zookeeper start
sudo service zookeeper stop
sudo service zookeeper restart
sudo service zookeeper start
sudo service zookeeper stop
sudo service zookeeper restart
Note: When both services are running, stopping Zookeeper will cause Kafka to stop. When both services are inactive, starting Kafka will start Zookeeper.
Hello World Test
Summary: Publish and consume a "Hello World" message to make sure the Kafka server is behaving correctly.
Publishing messages in Kafka requires:
- A producer, which can publish messages / data to topics.
- A consumer, which reads messages / data from topics.
Change directory to the kafka directory.
cd /home/franz/kafka
Create a topic named TestTopic with a single partition and one replica:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TestTopic
Example:
franz@kafkaTest4:~/kafka$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TestTopic
Created topic TestTopic.
Create a Kafka producer from the command line using the
kafka-console-producer.sh
script. It expects the Kafka server's hostname, port, and a topic name as arguments.Publish the string "Hello, World" to the TestTopic topic.
echo "Hello, World" | bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TestTopic > /dev/null
Create a Kafka consumer using the
kafka-console-consumer.sh
script. It expects the ZooKeeper server's hostname and port and a Kafka topic name as arguments.The following command consumes messages from TestTopic. Note the use of the
--from-beginning
flag, which allows the consumption of messages that were published before the consumer was started.
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TestTopic --from-beginning
Example:
franz@kafkaTest4:~/kafka$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TestTopic --from-beginning
Hello, World
Press Ctrl-C to stop the consumer script.
Example:
franz@kafkaTest4:~/kafka$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TestTopic --from-beginning
Hello, World
^CProcessed a total of 1 messages
We have successfully a) published a message to a topic b) read a message from a topic.
Kafka Commands
You don't have to log in as the
franz
user. These commands can be run by another user with sudo permissions.Before running any of these commands, change directory to the kafka installation directory.
cd /home/franz/kafka
### Get Kafka version.
bin/kafka-topics.sh --version
Example:
franz@kafkaTest4:/home/franz/kafka$ bin/kafka-topics.sh --version
2.5.0 (Commit:66563e712b0b9f84)
### Get Zookeeper version.
I don't know of a command that shows the Zookeeper version, but you can look through the libs directory to see the version number.
franz@kafkaTest4:/home/franz/kafka$ ls libs | grep zookeeper
zookeeper-3.5.7.jar
zookeeper-jute-3.5.7.jar
### List topics.
bin/kafka-topics.sh --zookeeper localhost:2181 --list
Example:
franz@kafkaTest4:/home/franz/kafka$ bin/kafka-topics.sh --zookeeper localhost:2181 --list
TestTopic
__consumer_offsets
### Create a topic.
Create a topic named TestTopic2 with a single partition and one replica:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TestTopic2
Example:
franz@kafkaTest4:/home/franz/kafka$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TestTopic2
Created topic TestTopic2.
### Delete a topic.
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic TestTopic2
Example:
franz@kafkaTest4:/home/franz/kafka$ bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic TestTopic2
Topic TestTopic2 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
### Create a Kafka producer.
Create a Kafka producer from the command line using the
kafka-console-producer.sh
script. It expects the Kafka server's hostname, port, and a topic name as arguments.Publish the string "Hello, World" to the TestTopic topic.
echo "Hello, World" | bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TestTopic > /dev/null
### Create a Kafka consumer.
Create a Kafka consumer using the
kafka-console-consumer.sh
script. It expects the ZooKeeper server's hostname and port and a Kafka topic name as arguments.The following command consumes messages from TestTopic. Note the use of the
--from-beginning
flag, which allows the consumption of messages that were published before the consumer was started.
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TestTopic --from-beginning
Example:
franz@kafkaTest4:~/kafka$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TestTopic --from-beginning
Hello, World
Press Ctrl-C to stop the consumer script.
### List consumer groups.
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
Example:
# Set a consumer running in another terminal.
Hello, World
# Then list consumer groups.
console-consumer-64265
franz@kafkaTest4:~/kafka$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TestTopic --from-beginning
Hello, World
# Then list consumer groups.
franz@kafkaTest4:/home/franz/kafka$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
console-consumer-64265
### Describe consumer group.
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group <consumerGroupName>
Example:
franz@kafkaTest4:~/kafka$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group console-consumer-64265
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
console-consumer-64265 TestTopic 0 - 2 - consumer-console-consumer-64265-1-808d22d9-6462-4d59-9e44-3715786641e8 /127.0.0.1 consumer-console-consumer-64265-1
### Delete a consumer group.
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group <consumerGroupName>
Note: Deleting the consumer group won't work if there is at least one remaining active consumer in that group.
Example:
franz@kafkaTest4:~/kafka$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group console-consumer-64265
Error: Deletion of some consumer groups failed:
* Group 'console-consumer-64265' could not be deleted due to: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: The group is not empty.
# Stop the consumer
console-consumer-64265
in the other terminal by pressing Ctrl-C. # List consumer groups.
franz@kafkaTest4:/home/franz/kafka$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
console-consumer-64265
# Delete the consumer group.
franz@kafkaTest4:/home/franz/kafka$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group console-consumer-64265
Deletion of requested consumer groups ('console-consumer-64265') was successful.
# List consumer groups.
franz@kafkaTest4:~/kafka$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
### Get the latest offset in a topic.
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic TestTopic --time -1
Example:
franz@kafkaTest4:/home/franz/kafka$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic TestTopic --time -1
TestTopic:0:1
The latest offset is 1. Here, "0" is the partitionID.
Send a new message on the topic to see the offset increase.
Publish the string "Hello, Foo" to the TestTopic topic.
franz@kafkaTest4:/home/franz/kafka$ echo "Hello, Foo" | bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TestTopic > /dev/null
Get the new latest offset (which should be "2").
franz@kafkaTest4:/home/franz/kafka$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic TestTopic --time -1
TestTopic:0:2
### Get the earliest offset still in a topic.
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic TestTopic --time -2
Example:
franz@kafkaTest4:/home/franz/kafka$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic TestTopic --time -2
TestTopic:0:0
The earliest offset in the topic TestTopic is 0.
Export All Messages From A Topic
We're going to export all the messages stored in Kafka for a specific topic to a file.
I've logged out from the user
franz
and am now the user
stjohn
. Change to a work directory.
cd ~
mkdir export
mkdir export
Copy the standalone properties file from Kafka config directory.
cp /home/franz/kafka/config/connect-standalone.properties export/exportStandalone.properties
In the file
export/exportStandalone.properties
, comment out these lines by adding a hash sign '#' in front of them:
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
Add these lines below them:
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
This means that our export connector will be able to accept any message value, instead of only JSON values.
Create a file named exportFile.properties.
File location:
/home/stjohn/export/exportFile.properties
File content:
name=fileSink
connector.class=FileStreamSink
tasks.max=1
file=export/export.txt
topics=TestTopic
connector.class=FileStreamSink
tasks.max=1
file=export/export.txt
topics=TestTopic
Export the topic "TestTopic" to the designated export file.
[this runs continously, picks up where it left off, appends to export file]
/home/franz/kafka/bin/connect-standalone.sh export/exportStandalone.properties export/exportFile.properties
There will be lots of output in the terminal.
Example:
# In a terminal, run the export command.
# In a second terminal:
Hello, World
Hello, Foo
# In a third terminal, publish the string "Hello, Foo2" to the TestTopic topic.
# In the second terminal:
Hello, World
Hello, Foo
Hello, Foo2
# In the original terminal, press Ctrl-C to stop the export connector.
# In a second terminal:
stjohn@kafkaTest4:~/export$ cat export.txt
Hello, World
Hello, Foo
# In a third terminal, publish the string "Hello, Foo2" to the TestTopic topic.
stjohn@kafkaTest4:~$ echo "Hello, Foo2" | /home/franz/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TestTopic > /dev/null
# In the second terminal:
stjohn@kafkaTest4:~/export$ cat export.txt
Hello, World
Hello, Foo
Hello, Foo2
# In the original terminal, press Ctrl-C to stop the export connector.
Excellent. We've successfully exported all messages in a Kafka topic to a file.
That's the end of this project.
[start of notes]
The installation recipe is derived from a previous recipe:
www.digitalocean.com/community/tutorials/how-to-install-apache-kafka-on-ubuntu-18-04
- Authors: Hathy A and bsder
- Updated: May 10, 2019
Other information sources:
- kafka.apache.org/quickstart
- ronnieroller.com/kafka/cheat-sheet
[end of notes]