來源:徐雷frank 發(fā)布時間:2019-02-23 16:54:54 閱讀量:1564
Apache Kafka是開源分布式高并發(fā)消息中間件,支持每秒百萬級消息并發(fā),在互聯(lián)網(wǎng)高并發(fā)架構:雙11、電商秒殺搶購、網(wǎng)絡直播、IOT大數(shù)據(jù)采集、聊天App、導航等高并發(fā)架構中大量使用。
生產環(huán)境一般要求搭建Kafka集群。Java開發(fā)Kafka集群需要注意參數(shù)的詳細配置,Kafka參數(shù)的含義在配置集群的時候非常重要,尤其是關系性能和集群的參數(shù)。下面我們一起來看看Kafka的詳細參數(shù)。
broker的身份ID。必須為每個代理設置一個唯一的整數(shù)。集群配置時候非常重要。不能重復。
broker.id = 1
Kafka服務器套接字服務器偵聽的地址。 如果未配置,就會使用java.net.InetAddress.getCanonicalHostName()地址。格式:
#listeners = listener_name:// host_name:port
例如:
listeners = PLAINTEXT://localhost:9091
Broker向生產者和消費者通告的主機名和端口。如果沒有設定,它將使用“偵聽器”的值。否則,它將使用該值從java.net.InetAddress.getCanonicalHostName()返回的地址。
#advertised.listeners = PLAINTEXT://host.name:9092
Kafka偵聽器名稱映射到安全協(xié)議,默認為它們是相同名稱
#listener.security.protocol.map = PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
服務器用于從網(wǎng)絡接收請求并向網(wǎng)絡發(fā)送響應消息的線程數(shù)
num.network.threads = 3
服務器用于處理請求的線程數(shù),可能包括磁盤I / O需要的線程.
num.io.threads = 8
套接字服務器使用的發(fā)送緩沖區(qū)(SO_SNDBUF)大小,字節(jié)。
socket.send.buffer.bytes = 102400
套接字服務器使用的接收緩沖區(qū)(SO_RCVBUF)大小,字節(jié)。
socket.receive.buffer.bytes = 102400
套接字服務器將接受的請求的最大大小(防止OOM內存溢出)大小,字節(jié)。
socket.request.max.bytes = 104857600
逗號分隔的目錄列表,用于存儲日志文件
log.dirs =。/日志
每個主題的默認日志分區(qū)數(shù)。更多分區(qū)允許更大并行處理消息,但這也會導致更多的文件
num.partitions = 1
在啟動時用于日志恢復和在關閉時刷新的每個數(shù)據(jù)目錄文件夾需要的線程數(shù)。
#對于數(shù)據(jù)目錄文件夾位于RAID陣列中的情況,建議增加此線程數(shù)值。
num.recovery.threads.per.data.dir = 1
Topic主題的消費者的group元數(shù)據(jù)復制因子,"__consumer_offsets" 和"__transaction_state" 。
offsets.topic.replication.factor = 1
transaction.state.log.replication.factor = 1
transaction.state.log.min.isr = 1
Kafka消息立即寫入文件系統(tǒng),但默認情況下我們只有fsync()來緩慢延遲地同步操作系統(tǒng)緩存消息到磁盤上。 以下配置參數(shù)控制將消息數(shù)據(jù)刷新到磁盤過程。這里有一些重要的權衡:
1、持久性:如果Kafka不使用復制,則可能會丟失未刷新的數(shù)據(jù)。
2、延遲:當Kafka刷新確實發(fā)生時,非常大的刷新間隔可能會導致延遲峰值,因為會有大量數(shù)據(jù)需要刷新到磁盤,間隔太久緩沖消息越多。
3、吞吐量:Flush通常是最昂貴的操作,并且小的Flush間隔可能導致過多的磁盤IO操作搜索。
下面以下設置允許配置刷新策略以在一段時間后或每N條消息(或兩者)刷新數(shù)據(jù)。這可以在全局范圍內配置完成,也可以針對每個主題的單獨配置。
#強制刷新數(shù)據(jù)到磁盤之前要接受的消息數(shù),10000消息時批量刷盤
#log.flush.interval.messages = 10000
#強制刷新之前消息可以在日志中停留的最長時間 1000毫秒
#log.flush.interval.ms = 1000
以下配置參數(shù)控制Kafka日志段的處理策略,我們可以設置為在一段時間后或在累積給定大小后刪除日志數(shù)據(jù)段。只要滿足一下任意條件,就會刪除一個日志段。刪除總是從日志的末尾開始。
log log的最小時間長度,超過刪除日志。
#由于時間可以刪除的日志文件的最小年齡 168小時
log.retention.hours = 168
基于大小的日志保留策略。 除非剩余的段大小在log.retention.bytes之下,否則將從日志中刪除段。 功能獨立于log.retention.hours限制。
#log.retention.bytes = 1073741824
#日志段文件的最大大小。達到此大小時,將創(chuàng)建新的日志段。
log.segment.bytes = 1073741824
#檢查日志段以查看是否可以刪除日志段的時間間隔
#保留策略
log.retention.check.interval.ms = 300000
Zookeeper連接字符串。默認單臺設置,集群需要多臺,逗號分隔的主機:端口,每個對應一個Zookeeper。例如“127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002”。
#所有kafka znodes的根目錄。集群設置模式
zookeeper.connect =localhost:2181,localhost:2182,localhost:2183
連接到zookeeper的超時時間(以毫秒為單位)
zookeeper.connection.timeout.ms = 6000
以下配置指定GroupCoordinator將延遲初始消費者重新平衡的時間(以毫秒為單位)。
#當新成員加入組時,重新平衡延遲是group.initial.rebalance.delay.ms的值,最多max.poll.interval.ms。
默認值為3秒。我們將此參數(shù)設置為0,方便開發(fā)和測試。但是生產環(huán)境中默認值推薦3秒更合適,因為這有助于避免在應用程序啟動期間不必要且可能很昂貴的重新平衡過程,減少系統(tǒng)資源的消耗。
group.initial.rebalance.delay.ms = 0
在集群和優(yōu)化情況下需要了解每個參數(shù)的確切含義,對于Kafka集群的設置,需要配置多個Zookeeper地址。默認的日志清理、垃圾回收、連接池、線程模型都是非常重要的因素。
使用最新的Java Spring Boot 2.x版本連接Kafka需要在配置文件中修改地址參數(shù):
spring.kafka.consumer.group-id=myGroup spring.kafka.bootstrap-servers=localhost:9091,localhost:9092,localhost:9093