二維碼
微來推網(wǎng)

掃一掃關(guān)注

當前位置: 首頁 » 快聞頭條 » 財經(jīng)金融 » 正文

kafka分區(qū)均衡策略_三種Partitioner接

放大字體  縮小字體 發(fā)布日期:2022-04-12 03:36:16    作者:高楊    瀏覽次數(shù):190
導讀

背景KafkaProducer類主要實現(xiàn)send消息到Broker功能,那么如何確定一條消息到底send到目topic得哪個partition呢?保證partition數(shù)據(jù)均衡?保證相同Key得消息send到相同partition?消息到底需不需要攜帶key信息?感謝

背景

KafkaProducer類主要實現(xiàn)send消息到Broker功能,那么如何確定一條消息到底send到目topic得哪個partition呢?保證partition數(shù)據(jù)均衡?保證相同Key得消息send到相同partition?消息到底需不需要攜帶key信息?感謝結(jié)合Kafka源碼,詳細解析梳理相關(guān)流程。

如何Send?

毫無疑問得做法:實例化KafkaProducer類,調(diào)用send方法,將實例化ProducerRecord類發(fā)送到Broker端。

Producer, String> producer = new KafkaProducer<>(props);producer.send(new ProducerRecord, String>(……));

其中,ProducerRecord類如下:

public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterableheaders)如何決定partition?

蕞簡單得使用者可能都是只用到topic和value,即:

new ProducerRecord, String>(“topic”, “value”)

雖然ProducerRecord類中可以直接指定partition,但是這種用法并不常見。如果你想自己將消息歸類,歸類為不同分區(qū)是不推薦得。不如索性直接歸類為不同topic,更為合理。那么問題來了,如果不在ProducerRecord中顯示指定partition,KafkaProducer內(nèi)部是如何決定目得partiiton得呢?

KafkaProducer源碼給出答案: 優(yōu)先選擇ProducerRecord自帶partition,如果沒有,則使用Partitioner接口計算partition。

private int partition(ProducerRecord<< span="">K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) { Integer partition = record.partition(); return partition != null ? partition : partitioner.partition( record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);}

Partitioner接口實現(xiàn)類可以通過KafkaProducer配置項partitioner.class指定,默認值為DefaultPartitioner。

Partitioner接口實現(xiàn)類

早期版本(2.4之前),該接口類包括兩個方法(partition、close),且只有一個實現(xiàn)類,即DefaultPartitioner類,該類功能描述:

如果ProducerRecord指定分區(qū),則直接使用指定分區(qū);如果ProducerRecord沒指定分區(qū),指定key,則基于key hash對分區(qū)數(shù)求余,即為分區(qū);如果ProducerRecord既沒指定分區(qū)也沒指定key,則采用round-robin方式隨機輪詢。

DefaultPartitioner具體實現(xiàn)如下:

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { Listpartitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) {//未攜帶key信息 int nextValue = nextValue(topic); ListavailablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else {//攜帶key信息 // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; }}

其中,nextValue方法使用ThreadLocalRandom生成隨機數(shù)。

后續(xù)版本,該接口類包括三個方法(partition、close、onNewBatch),實現(xiàn)類也增加到三個:

DefaultPartitioner、RoundRobinPartitioner、UniformStickyPartitioner

DefaultPartitioner和UniformStickyPartitioner都涉及StickPartitioner(KIP-480,稍后說明)。如果ProducerRecord未攜帶key信息,兩者是等同得。如果攜帶key信息:DefaultPartitioner繼續(xù)保持之前版本得實現(xiàn)方式,即基于key hash對分區(qū)數(shù)求余;而UniformStickyPartitioner并不關(guān)心key信息,一直使用StickPartitioner。

DefaultPartitioner具體實現(xiàn)如下:

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster, int numPartitions) { if (keyBytes == null) { return stickyPartitionCache.partition(topic, cluster); } // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}

UniformStickyPartitioner具體實現(xiàn)如下:

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { return stickyPartitionCache.partition(topic, cluster);}

而RoundRobinPartitioner也是不關(guān)心key信息,均采用round-robin方式隨機輪詢。具體實現(xiàn)如下:

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { Listpartitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); int nextValue = nextValue(topic); ListavailablePartitions = cluster.availablePartitionsForTopic(topic); if (!availablePartitions.isEmpty()) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; }}StickPartitioner(KIP-480)

Record batches很大程度上會影響records從producer到broker得時延。較小得batches會導致更多得請求、隊列、更高延遲。一般來說,即使linger.ms=0,較大得batches也會減少延遲。在啟用linger.ms(即>0),如果數(shù)據(jù)量不夠填充一個batch(只能等linger.ms達到滿足條件才會觸發(fā)發(fā)送),就會進一步增加時延。如果能夠找到一種方法來增加批處理得大小,以便在linger.ms之前觸發(fā)發(fā)送,這將進一步減少延遲。

如感謝上面所介紹得,早期版本(2.4之前)在沒有指定分區(qū)和鍵得情況下,默認分區(qū)器分區(qū)以循環(huán)方式展開:這意味著一系列連續(xù)記錄中得每個記錄都將被發(fā)送到不同得分區(qū),直到我們用完分區(qū)并重新開始。雖然這會將記錄均勻地分布在分區(qū)中,但也會導致更多得batchs變得更小??梢钥紤]讓所有記錄都使用一個指定得分區(qū)(或幾個分區(qū))并在一個更大得批中一起發(fā)送。

StickPartitioner通過“粘住”分區(qū)直到batches已滿(或在linger.ms啟動時發(fā)送),然后再創(chuàng)建新得batches,這樣得話,與默認分區(qū)器相比會減少時延。即使在linger.ms為0并立即發(fā)送得情況下,也可以看到StickPartitioner會減少時延。發(fā)送一系列batches后,粘性分區(qū)將發(fā)生更改。隨著時間得推移,記錄應該均勻地分布在所有分區(qū)中。Netflix也有類似得做法:創(chuàng)建一個粘性分區(qū)器,在切換到新分區(qū)之前,它可以選擇一個分區(qū)并在給定得時間段內(nèi)將所有記錄發(fā)送給該分區(qū)。另一種方法是在創(chuàng)建新batch時更改粘性分區(qū)。這樣做得目得是蕞大限度地減少分區(qū)切換不及時可能出現(xiàn)得大部分空batch,KafkaProducer StickPartitioner采用此種方式。

StickPartitioner基準測試

KIP基準測試顯示:StickPartitioner可以降低50%時延,可以降低cpu利用率5%-15%。

 
(文/高楊)
打賞
免責聲明
本文為高楊原創(chuàng)作品?作者: 高楊。歡迎轉(zhuǎn)載,轉(zhuǎn)載請注明原文出處:http://nyqrr.cn/news/show-332473.html 。本文僅代表作者個人觀點,本站未對其內(nèi)容進行核實,請讀者僅做參考,如若文中涉及有違公德、觸犯法律的內(nèi)容,一經(jīng)發(fā)現(xiàn),立即刪除,作者需自行承擔相應責任。涉及到版權(quán)或其他問題,請及時聯(lián)系我們郵件:weilaitui@qq.com。
 

Copyright?2015-2023 粵公網(wǎng)安備 44030702000869號

粵ICP備16078936號

微信

關(guān)注
微信

微信二維碼

WAP二維碼

客服

聯(lián)系
客服

聯(lián)系客服:

24在線QQ: 770665880

客服電話: 020-82301567

E_mail郵箱: weilaitui@qq.com

微信公眾號: weishitui

韓瑞 小英 張澤

工作時間:

周一至周五: 08:00 - 24:00

反饋

用戶
反饋