簡介
分布式流處理平台,一個是基於訂閱發布的消息系統,由Linkedin發起,用作活動流和運營數據處理管道的基礎消息平台。
用途
- 造實時流數據管道(message queue)
- 構建實時流式應用(topic 之間內部變化)
概念
通過topic 對存儲的流數據做分類
每條記錄包含一個key,一個value 和一個timestamp
四個核心API
Produce API:允許應用發布一串流式數據到一個或多個Kafka topic
Consumer API: 允許應用訂閱一個或多個topic,並缺對發布給他們的流式數據進行處理
Stream API:允許一個應用程式作為一個流處理器,消費一個或多個topic產生的輸入流,然後生產一個輸出流到一個或多個topic中取,在輸入和輸出流中進行有效的轉換。
The Connector API 允許構建並運行可重用的生產者或消費者,將Kafka topic 連接到已存在的應用程式或者數據系統。如連接一個關係資料庫,捕捉表(table)的所有變更內容。
Topics和日誌
對每個數據主題Topic,Kafka都會維持一個分區日誌,每個分區都是有序且順序不可變的記錄集,並且不斷地追加到結構化的commit log文件。分區中的每個記錄都會分配一個id來表示順序,稱為offset,用來唯一標識分區中每一條記錄。
分布式
日誌的分區partition分布在kafka集群的伺服器上。每個伺服器在處理數據和請求時,共享這些分區。
每個分區都有一台server 作為leader , 零台或多台server作為follwers。 Leader server處理一切對partition分區的讀寫請求,followers只需要被動同步leader上的數據。Leader宕機時,followers中的一台自動成為新的leader。每台server都會成為某些分區的leaders和某些分區的follower,因此集群的負載時平衡的。
生產者
將數據發布到所選擇的topic中。負責價格記錄分配到topic的哪一個partition中。
消費者
使用一個消費組名稱來進行標識,發布到topic中的每條記錄被分配給訂閱消費組中的一個消費實例。消費者實例可以分布在多個進程中或多個機器上。
Kafka只保證分區內的記錄是有序的,不保證主題中不同分區的順序·。每個patition分區按照key值排序足以滿足大多數應用程式的需求。
使用
step1:下載
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.4.1/kafka_2.13-2.4.1.tgz
step2:啟動伺服器
先啟動zookeeper
./bin/zookeeper-server-start.sh config/zookeeper.properties
[2020-03-14 23:48:31,493] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
再啟動kafka
./bin/kafka-server-start.sh config/server.properties
[2020-03-14 23:49:18,998] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration)
step3:創建topic
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic test.
運行list (列表)命令查看topic
./bin/kafka-topics.sh --list --zookeeper
localhost:2181
test
Step4:發送消息
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>This is a message
>This is another message
Step5:啟動一個consumer
輸出消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message
another message
Step6:設置多集群代理
本地機器把集群擴展到三個節點
>cp config/server.properties config/server-1.properties
>cp config/server.properties config/server-1.properties
編輯屬性
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2
啟動兩個新節點
bin/kafka-server-start.sh config/server-1.properties &
...
bin/kafka-server-start.sh config/server-2.properties & ...
創建一個3副本topic
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic: my-replicated-topic PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
第一行表示分區摘要,只有一個分區
- 「leader」是負責給定分區所有讀寫操作的節點。每個節點都是隨機選擇的部分分區的領導者。
- 「replicas」是複製分區日誌的節點列表,不管這些節點是leader還是僅僅活著。
- 「isr」是一組「同步」replicas,是replicas列表的子集,它活著並被指到leader。
發布一些消息給topic
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
...
my test message 1
my test message 2
^C
消費這些消息
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C
小結
Kafka以時間複雜度O(1)提供消息持久化能力,TB級數據保證常數時間複雜度訪問性能。高吞吐率,廉價商用機上也能做到單機10萬條消息/s的傳輸吞吐量。通過消息分區,分布式消費,支持每個Partition內保持順序。Kafka同時支持離線數據處理和實時數據處理。也支持在線水平擴展(Scale out)這些特性,使之成為分布式數據平台中消息交互的首選組件之一。
參考
http://kafka.apachecn.org/