Premiers pas avec apache-kafka

Installation ou configuration

Étape 1. Installer Java 7 ou 8

Étape 2. Téléchargez Apache Kafka sur : http://kafka.apache.org/downloads.html

Par exemple, nous allons essayer de télécharger [Apache Kafka 0.10.0.0][1]

Étape 3. Extrayez le fichier compressé.

Sous Linux :

tar -xzf kafka_2.11-0.10.0.0.tgz

Sur fenêtre : Clic droit –> Extraire ici

Étape 4. Démarrer Zookeeper

cd kafka_2.11-0.10.0.0

Linux :

bin/zookeeper-server-start.sh config/zookeeper.properties

Les fenêtres:

bin/windows/zookeeper-server-start.bat config/zookeeper.properties

Étape 5. Démarrer le serveur Kafka

Linux :

bin/kafka-server-start.sh config/server.properties

Les fenêtres:

bin/windows/kafka-server-start.bat config/server.properties

[1] : https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.0.0/kafka_2.11-0.10.0.0.tgz

Introduction

Apache Kafka™ est une plateforme de streaming distribuée.

Ce qui signifie

1-Il vous permet de publier et de vous abonner à des flux d’enregistrements. À cet égard, il est similaire à une file d’attente de messages ou à un système de messagerie d’entreprise.

2-Il vous permet de stocker des flux d’enregistrements de manière tolérante aux pannes.

3-Il vous permet de traiter des flux d’enregistrements au fur et à mesure qu’ils se produisent.

Il est utilisé pour deux grandes classes d’application :

1-Construire des pipelines de données de streaming en temps réel qui obtiennent de manière fiable des données entre les systèmes ou les applications

2-Construire des applications de streaming en temps réel qui transforment ou réagissent aux flux de données

Les scripts de la console Kafka sont différents pour les plateformes Unix et Windows. Dans les exemples, vous devrez peut-être ajoutez l’extension en fonction de votre plateforme. Linux : scripts situés dans bin/ avec l’extension .sh. Windows : scripts situés dans bin\windows\ et avec l’extension .bat.

#Mise en place

Étape 1 : Téléchargez le code et décompressez-le :

tar -xzf kafka_2.11-0.10.1.0.tgz
cd kafka_2.11-0.10.1.0

Étape 2 : démarrez le serveur.

pour pouvoir supprimer des sujets ultérieurement, ouvrez server.properties et définissez delete.topic.enable sur true.

Kafka s’appuie fortement sur le gardien de zoo, vous devez donc le démarrer en premier. Si vous ne l’avez pas installé, vous pouvez utiliser le script pratique fourni avec kafka pour obtenir une instance ZooKeeper à nœud unique rapide et sale.

zookeeper-server-start config/zookeeper.properties
kafka-server-start config/server.properties

__Étape 3 : __ assurez-vous que tout fonctionne

Vous devriez maintenant avoir zookeeper écoutant localhost:2181 et un seul courtier kafka sur localhost:6667.

Créer un sujet

Nous n’avons qu’un seul courtier, nous créons donc un sujet sans facteur de réplication et avec une seule partition :

kafka-topics --zookeeper localhost:2181 \
    --create \
    --replication-factor 1 \
    --partitions 1 \
    --topic test-topic

Vérifiez votre sujet :

kafka-topics --zookeeper localhost:2181 --list
test-topic

kafka-topics --zookeeper localhost:2181 --describe --topic test-topic
Topic:test-topic    PartitionCount:1    ReplicationFactor:1 Configs:
Topic: test-topic   Partition: 0    Leader: 0   Replicas: 0 Isr: 0

envoyer et recevoir des messages

Lancer un consommateur :

kafka-console-consumer --bootstrap-server localhost:9092 --topic test-topic

Sur un autre terminal, lancez un producteur et envoyez des messages. Par défaut, l’outil envoie chaque ligne dans un message séparé au courtier, sans encodage spécial. Écrivez quelques lignes et quittez avec CTRL+D ou CTRL+C :

kafka-console-producer --broker-list localhost:9092 --topic test-topic   
a message
another message
^D

Les messages doivent apparaître dans le terminal consommateur.

Arrêtez kafka

kafka-server-stop 

démarrer un cluster multi-courtiers

Les exemples ci-dessus utilisent un seul courtier. Pour configurer un vrai cluster, il suffit de démarrer plusieurs serveurs kafka. Ils se coordonneront automatiquement.

Étape 1 : pour éviter les collisions, nous créons un fichier server.properties pour chaque courtier et modifions les propriétés de configuration id, port et logfile.

Copie:

cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties

Modifiez les propriétés de chaque fichier, par exemple :

vim config/server-1.properties
broker.id=1
listeners=PLAINTEXT://:9093
log.dirs=/usr/local/var/lib/kafka-logs-1

vim config/server-2.properties
broker.id=2
listeners=PLAINTEXT://:9094
log.dirs=/usr/local/var/lib/kafka-logs-2

Étape 2 : démarrer les trois courtiers :

    kafka-server-start config/server.properties &
    kafka-server-start config/server-1.properties &
    kafka-server-start config/server-2.properties &

Créer un sujet répliqué

kafka-topics --zookeeper localhost:2181 --create --replication-factor 3 --partitions 1 --topic replicated-topic

kafka-topics --zookeeper localhost:2181 --describe --topic replicated-topic
Topic:replicated-topic  PartitionCount:1    ReplicationFactor:3 Configs:
Topic: replicated-topic Partition: 0    Leader: 1   Replicas: 1,2,0 Isr: 1,2,0

Cette fois, il y a plus d’informations :

  • “leader” est le nœud responsable de toutes les lectures et écritures pour la partition donnée. Chaque nœud sera le leader d’une partie des partitions sélectionnée au hasard.
  • “réplicas” est la liste des nœuds qui répliquent le journal pour cette partition, qu’ils soient le leader ou même s’ils sont actuellement actifs.
  • “isr” est l’ensemble des répliques “in-sync”. Il s’agit du sous-ensemble de la liste des répliques qui est actuellement en vie et rattrapé par le leader.

Notez que le sujet précédemment créé reste inchangé.

tester la tolérance aux pannes

Publiez un message dans le nouveau sujet :

kafka-console-producer --broker-list localhost:9092 --topic replicated-topic
hello 1
hello 2
^C

Tuez le chef (1 dans notre exemple). Sous Linux :

ps aux | grep server-1.properties
kill -9 <PID>

Sous Windows :

wmic process get processid,caption,commandline | find "java.exe" | find "server-1.properties" 
taskkill /pid <PID> /f

Voyez ce qui s’est passé :

kafka-topics --zookeeper localhost:2181  --describe --topic replicated-topic
Topic:replicated-topic  PartitionCount:1    ReplicationFactor:3 Configs:
Topic: replicated-topic Partition: 0    Leader: 2   Replicas: 1,2,0 Isr: 2,0

La direction est passée au courtier 2 et “1” n’est plus synchronisé. Mais les messages sont toujours là (utilisez le consommateur pour vérifier par vous-même).

Nettoyer

Supprimez les deux sujets en utilisant :

kafka-topics --zookeeper localhost:2181 --delete --topic test-topic
kafka-topics --zookeeper localhost:2181 --delete --topic replicated-topic