分布式數據平台,組件篇之-Kafka

無雙譜jerry 發佈 2020-04-03T23:26:51+00:00

用途造實時流數據管道構建實時流式應用概念通過topic對存儲的流數據做分類每條記錄包含一個key,一個value 和一個timestamp四個核心APIProduce API:允許應用發布一串流式數據到一個或多個Kafka topicConsumer API: 允許應用訂閱一個或

簡介




分布式流處理平台,一個是基於訂閱發布的消息系統,由Linkedin發起,用作活動流和運營數據處理管道的基礎消息平台。


用途


  • 造實時流數據管道(message queue)
  • 構建實時流式應用(topic 之間內部變化)


概念

通過topic 對存儲的流數據做分類

每條記錄包含一個key,一個value 和一個timestamp


四個核心API

Produce API:允許應用發布一串流式數據到一個或多個Kafka topic

Consumer API: 允許應用訂閱一個或多個topic,並缺對發布給他們的流式數據進行處理

Stream API:允許一個應用程式作為一個流處理器,消費一個或多個topic產生的輸入流,然後生產一個輸出流到一個或多個topic中取,在輸入和輸出流中進行有效的轉換。

The Connector API 允許構建並運行可重用的生產者或消費者,將Kafka topic 連接到已存在的應用程式或者數據系統。如連接一個關係資料庫,捕捉表(table)的所有變更內容。


Topics和日誌


對每個數據主題Topic,Kafka都會維持一個分區日誌,每個分區都是有序且順序不可變的記錄集,並且不斷地追加到結構化的commit log文件。分區中的每個記錄都會分配一個id來表示順序,稱為offset,用來唯一標識分區中每一條記錄。


分布式


日誌的分區partition分布在kafka集群的伺服器上。每個伺服器在處理數據和請求時,共享這些分區。

每個分區都有一台server 作為leader , 零台或多台server作為follwers。 Leader server處理一切對partition分區的讀寫請求,followers只需要被動同步leader上的數據。Leader宕機時,followers中的一台自動成為新的leader。每台server都會成為某些分區的leaders和某些分區的follower,因此集群的負載時平衡的。


生產者


將數據發布到所選擇的topic中。負責價格記錄分配到topic的哪一個partition中。


消費者


使用一個消費組名稱來進行標識,發布到topic中的每條記錄被分配給訂閱消費組中的一個消費實例。消費者實例可以分布在多個進程中或多個機器上。


Kafka只保證分區內的記錄是有序的,不保證主題中不同分區的順序·。每個patition分區按照key值排序足以滿足大多數應用程式的需求。


使用



step1:下載

wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.4.1/kafka_2.13-2.4.1.tgz

step2:啟動伺服器

先啟動zookeeper

./bin/zookeeper-server-start.sh config/zookeeper.properties

[2020-03-14 23:48:31,493] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)

再啟動kafka

./bin/kafka-server-start.sh config/server.properties

[2020-03-14 23:49:18,998] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration)

step3:創建topic

./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

Created topic test.

運行list (列表)命令查看topic

./bin/kafka-topics.sh --list --zookeeper

localhost:2181

test

Step4:發送消息

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>This is a message
>This is another message

Step5:啟動一個consumer

輸出消息

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

This is a message

This is another message

another message

Step6:設置多集群代理

本地機器把集群擴展到三個節點

>cp config/server.properties config/server-1.properties

>cp config/server.properties config/server-1.properties

編輯屬性

config/server-1.properties:

broker.id=1

listeners=PLAINTEXT://:9093

log.dir=/tmp/kafka-logs-1

config/server-2.properties:

broker.id=2

listeners=PLAINTEXT://:9094

log.dir=/tmp/kafka-logs-2


啟動兩個新節點

bin/kafka-server-start.sh config/server-1.properties &

...

bin/kafka-server-start.sh config/server-2.properties & ...

創建一個3副本topic

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

Topic: my-replicated-topic PartitionCount: 1 ReplicationFactor: 3 Configs:

Topic: my-replicated-topic Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2

第一行表示分區摘要,只有一個分區

  • 「leader」是負責給定分區所有讀寫操作的節點。每個節點都是隨機選擇的部分分區的領導者。
  • 「replicas」是複製分區日誌的節點列表,不管這些節點是leader還是僅僅活著。
  • 「isr」是一組「同步」replicas,是replicas列表的子集,它活著並被指到leader。


發布一些消息給topic

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic

...

my test message 1

my test message 2

^C

消費這些消息

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic

...

my test message 1

my test message 2

^C


小結


Kafka以時間複雜度O(1)提供消息持久化能力,TB級數據保證常數時間複雜度訪問性能。高吞吐率,廉價商用機上也能做到單機10萬條消息/s的傳輸吞吐量。通過消息分區,分布式消費,支持每個Partition內保持順序。Kafka同時支持離線數據處理和實時數據處理。也支持在線水平擴展(Scale out)這些特性,使之成為分布式數據平台中消息交互的首選組件之一。



參考

http://kafka.apachecn.org/

關鍵字: