kafkakey(kafka可以脱离zookeeper单独使用吗)
本篇文章给大家谈谈kafkakey,以及kafka可以脱离zookeeper单独使用吗对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。
本文目录一览:
- 1、深入理解kafka(五)日志存储
- 2、key为null时Kafka会将消息发送给哪个分区
- 3、如何确定Kafka的分区数,key和consumer线程数
- 4、kafka配置参数详解
- 5、kafka中的key有啥作用?
- 6、如何查看kafka消息的key
深入理解kafka(五)日志存储
5.1文件目录布局
根目录下有以下5个checkpoint文件: cleaner-offset-checkpoint, log-start-offset-checkpoint, meta.properties, recovery-point-offset-checkpoint, replication-offset-checkpoint
分区目录下有以下目录: 0000xxx.index(偏移量为64位长整形,长度固定为20位), 0000xxx.log, 0000xxx.timeindex.
还有可能包含.deleted .cleaned .swap等临时文件, 以及可能的.snapshot .txnindex leader-epoch-checkpoint
5.2日志格式演变
5.2.1 v0版本
kafka0.10.0之前
RECORD_OVERHEAD包括offset(8B)和message size(4B)
RECORD包括:
crc32(4B):crc32校验值
magic(1B):消息版本号0
attributes(1B):消息属性。低3位表示压缩类型:0-NONE 1-GZIP 2-SNAPPY 3-LZ4(0.9.x引入)
key length(4B):表示消息的key的长度。-1代表null
key: 可选
value length(4B):实际消息体的长度。-1代表null
value: 消息体。可以为空,如墓族颤碑消息
5.2.2 v1版本
kafka0.10.0-0.11.0
比v0多了timestamp(8B)字段,表示消息的时间戳
attributes的第4位也被利用起来,0表示timestamp的类型为CreateTime,1表示timestamp的类型为LogAppendTime
timestamp类型由broker端参数log.message.timestamp.type来配置,默认为CreateTime,即采用生产者创建的时间戳
5.2.3 消息压缩
保证端到端的压缩,服务端配置compression.type,默认为"producer",表示保留生产者使用的压缩方式,还可以配置为"gzip","snappy","lz4"
多条消息压缩至value字段,以提高压缩率
5.2.4 变长字段
变长整缺隐形(Varints):每一个字节都有一个位于最高位的msb位(most significant bit),除了最后一个字节为1,其余都为0,字节倒序排列
为了使编码更加高效,Varints使用ZigZag编码:sint32对应 (n1)^(n31) sint64对应 (n1)^(n63)
5.2.5 v2版本
Record Batch
first offset:
length:
partition leader epoch:
magic:固定为2
attributes:两个字节。低3位表示压缩格式,第4位表示时间戳类型,第5位表示事务(0-非事务1-事务),第6位控制消息(0-非控制1控制)
first timestamp:
max timestamp:
producer id:
producer epoch:
first sequence:
records count:
v2版本的消息去掉了crc字段,另外增加了length(消息总长度)、timestamp delta(时间戳增量)、offset delta(位移增伏穗厅量)和headers信息,并且弃用了attributes
Record
length:
attributes:弃用,但仍占据1B
timestamp delta:
offset delta:
headers:
5.3日志索引
稀疏索引(sparse index):每当写入一定量(broker端参数log.index.interval.bytes指定,默认为4096B),偏移量索引文件和时间索引文件分别对应一个索引项
日志段切分策略:
1.大小超过broker端参数log.segment.bytes配置的值,默认为1073741824(1GB)
2.当前日志段消息的最大时间戳与当前系统的时间戳差值大于log.roll.ms或者log.roll.hours,ms优先级高,默认log.roll.hours=168(7天)
3.索引文件或者时间戳索引文件的大小大于log.index.size.max.bytes配置的值,默认为10485760(10MB)
4.偏移量差值(offset-baseOffset)Integer.MAX_VALUE
5.3.1 偏移量索引
每个索引项占用8个字节,分为两个部分:1.relativeOffset相对偏移量(4B) 2.position物理地址(4B)
使用kafka-dump-log.sh脚本来解析.index文件(包括.timeindex、.snapshot、.txnindex等文件),如下:
bin/kafka-dump-log.sh --files /tmp/kafka-logs/topicId-0/00……00.index
如果broker端参数log.index.size.max.bytes不是8的倍数,内部会自动转换为8的倍数
5.3.2 时间戳索引
每个索引项占用12个字节,分为两个部分:1.timestamp当前日志分段的最大时间戳(12B) 2.relativeOffset时间戳对应的相对偏移量(4B)
如果broker端参数log.index.size.max.bytes不是12的倍数,内部会自动转换为12的倍数
5.4日志清理
日志清理策略可以控制到主题级别
5.4.1 日志删除
broker端参数log.cleanup.policy设置为delete(默认为delete)
检测周期broker端参数log.retention.check.interval.ms=300000(默认5分钟)
1.基于时间
broker端参数log.retention.hours,log.retention.minutes,log.retention.ms,优先级msminuteshours
删除时先增加.delete后缀,延迟删除根据file.delete.delay.ms(默认60000)配置
2.基于日志大小
日志总大小为broker端参数log.retention.bytes(默认为-1,表示无穷大)
日志段大小为broker端参数log.segment.bytes(默认为1073741824,1GB)
3.基于日志起始偏移量
DeleteRecordRequest请求
1.KafkaAdminClient的deleteRecord()
2.kafka-delete-record.sh脚本
5.4.2 日志压缩
broker端参数log.cleanup.policy设置为compact,且log.cleaner.enable设置为true(默认为true)
5.5磁盘存储
相关测试:一个由6块7200r/min的RAID-5阵列组成的磁盘簇的线性写入600MB/s,随机写入100KB/s,随机内存写入400MB/s,线性内存3.6GB/s
5.5.1 页缓存
Linux操作系统的vm.dirty_background_ratio参数用来指定脏页数量达到系统的百分比之后就触发pdflush/flush/kdmflush,一般小于10,不建议为0
vm.dirty_ratio表示脏页百分比之后刷盘,但是阻塞新IO请求
kafka同样提供同步刷盘及间断性强制刷盘(fsync)功能,可以通过log.flush.interval.messages、log.flush.interval.ms等参数来控制
kafka不建议使用swap分区,vm.swappiness参数上限为100,下限为0,建议设置为1
5.5.2 磁盘I/O流程
一般磁盘IO的场景有以下4种:
1.用户调用标准C库进行IO操作,数据流为:应用程序Buffer-C库标准IOBuffer-文件系统也缓存-通过具体文件系统到磁盘
2.用户调用文件IO,数据流为:应用程序Buffer-文件系统也缓存-通过具体文件系统到磁盘
3.用户打开文件时使用O_DIRECT,绕过页缓存直接读写磁盘
4.用户使用类似dd工具,并使用direct参数,绕过系统cache与文件系统直接读写磁盘
Linux系统中IO调度策略有4种:
1.NOOP:no operation
2.CFQ
3.DEADLINE
4.ANTICIPATORY
5.5.3 零拷贝
指数据直接从磁盘文件复制到网卡设备中,不需要经应用程序
对linux而言依赖于底层的sendfile()
对java而言,FileChannal.transferTo()的底层实现就是sendfile()
[img]key为null时Kafka会将消息发送给哪个分区
当你编写铅宽kafka Producer时, 会生成KeyedMessage对象。
KeyedMessageK, V keyedMessage = new KeyedMessage(topicName, key, message)
这里的key值可以为空,在这种情况下, kafka会将这个消息发送到哪个分区上呢?依据Kafka官方的文档, 默认的分区类会随机挑选一个分区:
The third property "partitioner.class" defines what class to use to determine which Partition in the Topic the message is to be sent to. This is optional, but for any non-trivial implementation you are going to want to implement a partitioning scheme. More about the implementation of this class later. If you include a value for the key but haven't defined a partitioner.class Kafka will use the default partitioner. If the key is null, then the Producer will assign the message to a random Partition.
但是这句话相当的误导人。
从字面上来讲,这句话没有问题, 但是这里的随机是指在参数"topic.metadata.refresh.ms"刷新后随机选择一个, 这个时间段内总是使用唯一的分区。 默认情况下每十分钟才可能重新选择一个新的分区。 但是相信大部分的程序员和我一样, 都理解成每个消息都会随机选择一个分区。
可以查看相关散仔的代码:
private def getPartition(topic: String, key: Any, topicPartitionList: Seq[PartitionAndLeader]): Int = {
val numPartitions = topicPartitionList.size if(numPartitions = 0)
throw new UnknownTopicOrPartitionException("Topic "槐掘亮 + topic + " doesn't exist")
val partition =
if(key == null) {
// If the key is null, we don't really need a partitioner
// So we look up in the send partition cache for the topic to decide the target partition
val id = sendPartitionPerTopicCache.get(topic)
id match {
case Some(partitionId) =
// directly return the partitionId without checking availability of the leader,
// since we want to postpone the failure until the send operation anyways
partitionId case None =
val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)
if (availablePartitions.isEmpty)
throw new LeaderNotAvailableException("No leader for any partition in topic " + topic)
val index = Utils.abs(Random.nextInt) % availablePartitions.size
val partitionId = availablePartitions(index).partitionId
sendPartitionPerTopicCache.put(topic, partitionId)
partitionId }
} else
partitioner.partition(key, numPartitions)
if(partition 0 || partition = numPartitions)
throw new UnknownTopicOrPartitionException("Invalid partition id: " + partition + " for topic " + topic +
"; Valid values are in the inclusive range of [0, " + (numPartitions-1) + "]")
trace("Assigning message of topic %s and key %s to a selected partition %d".format(topic, if (key == null) "[none]" else key.toString, partition))
partition }
如果key为null, 它会从sendPartitionPerTopicCache查选缓存的分区, 如果没有,随机选择一个分区,否则就用缓存的分区。
LinkedIn工程师Guozhang Wang在邮件列表中解释了这一问题,
最初kafka是按照大部分用户理解的那样每次都随机选择一个分区, 后来改成了定期选择一个分区, 这是为了减少服务器段socket的数量。不过这的确很误导用户,据称0.8.2版本后又改回了每次随机选取。但是我查看0.8.2的代码还没看到改动。
所以,如果有可能,还是为KeyedMessage设置一个key值吧。
当你编写kafka Producer时, 会生成KeyedMessage对象。
KeyedMessageK, V keyedMessage = new KeyedMessage(topicName, key, message)
这里的key值可以为空,在这种情况下, kafka会将这个消息发送到哪个分区上呢?依据Kafka官方的文档, 默认的分区类会随机挑选一个分区:
The third property "partitioner.class" defines what class to use to determine which Partition in the Topic the message is to be sent to. This is optional, but for any non-trivial implementation you are going to want to implement a partitioning scheme. More about the implementation of this class later. If you include a value for the key but haven't defined a partitioner.class Kafka will use the default partitioner. If the key is null, then the Producer will assign the message to a random Partition.
但是这句话相当的误导人。
如何确定Kafka的分区数,key和consumer线程数
publicstaticvoidconsumer(){Propertiesprops=newProperties();props.put("团弊zk.connect"基或猜,"hadoop-2:2181");props.put("zk.connectiontimeout.ms","1000000");props.put("groupid","fans_group"搏型);//Createtheconnectiontothe
kafka配置参数详解
kafka的配置分为 broker、帆态producter、consumer三个不同的配置
一 BROKER 的全局配置
最为核心的三个配置 broker.id、陵吵log.dir、zookeeper.connect 。
------------------------------------------- 系统 相关 -------------------------------------------
broker.id =1
log.dirs = /tmp/kafka-logs
port =6667
message.max.bytes =1000000
num.network.threads =3
num.io.threads =8
background.threads =4
queued.max.requests =500
host.name
advertised.host.name
advertised.port
socket.send.buffer.bytes =100*1024
socket.receive.buffer.bytes =100*1024
socket.request.max.bytes =100 1024 1024
------------------------------------------- LOG 相关 -------------------------------------------
log.segment.bytes =1024 1024 1024
log.roll.hours =24*7
log.cleanup.policy = delete
log.retention.minutes=7days
指定日志每隔多久检查看是否可以被删除,默认1分钟
log.cleanup.interval.mins=1
log.retention.bytes=-1
log.retention.check.interval.ms=5minutes
log.cleaner.enable=false
log.cleaner.threads =1
log.cleaner.io.max.bytes.per.second=None
log.cleaner.dedupe.buffer.size=500 1024 1024
log.cleaner.io.buffer.size=512*1024
log.cleaner.io.buffer.load.factor =0.9
log.cleaner.backoff.ms =15000
log.cleaner.min.cleanable.ratio=0.5
log.cleaner.delete.retention.ms =1day
log.index.size.max.bytes =10 1024 1024
log.index.interval.bytes =4096
log.flush.interval.messages=None
log.flush.scheduler.interval.ms =3000
log.flush.interval.ms = None
log.delete.delay.ms =60000
log.flush.offset.checkpoint.interval.ms =60000
------------------------------------------- TOPIC 相关 -------------------------------------------
auto.create.topics.enable =true
default.replication.factor =1
num.partitions =1
实例 --replication-factor3--partitions1--topic replicated-topic :名称replicated-topic有一个分尺轿侍区,分区被复制到三个broker上。
----------------------------------复制(Leader、replicas) 相关 ----------------------------------
controller.socket.timeout.ms =30000
controller.message.queue.size=10
replica.lag.time.max.ms =10000
replica.lag.max.messages =4000
replica.socket.timeout.ms=30*1000
replica.socket.receive.buffer.bytes=64*1024
replica.fetch.max.bytes =1024*1024
replica.fetch.wait.max.ms =500
replica.fetch.min.bytes =1
num.replica.fetchers=1
replica.high.watermark.checkpoint.interval.ms =5000
controlled.shutdown.enable =false
controlled.shutdown.max.retries =3
controlled.shutdown.retry.backoff.ms =5000
auto.leader.rebalance.enable =false
leader.imbalance.per.broker.percentage =10
leader.imbalance.check.interval.seconds =300
offset.metadata.max.bytes
----------------------------------ZooKeeper 相关----------------------------------
zookeeper.connect = localhost:2181
zookeeper.session.timeout.ms=6000
zookeeper.connection.timeout.ms =6000
zookeeper.sync.time.ms =2000
配置的修改
其中一部分配置是可以被每个topic自身的配置所代替,例如
新增配置
bin/kafka-topics.sh --zookeeper localhost:2181--create --topic my-topic --partitions1--replication-factor1--config max.message.bytes=64000--config flush.messages=1
修改配置
bin/kafka-topics.sh --zookeeper localhost:2181--alter --topic my-topic --config max.message.bytes=128000
删除配置 :
bin/kafka-topics.sh --zookeeper localhost:2181--alter --topic my-topic --deleteConfig max.message.bytes
二 CONSUMER 配置
最为核心的配置是group.id、zookeeper.connect
group.id
consumer.id
client.id = group id value
zookeeper.connect=localhost:2182
zookeeper.session.timeout.ms =6000
zookeeper.connection.timeout.ms =6000
zookeeper.sync.time.ms =2000
auto.offset.reset = largest
socket.timeout.ms=30*1000
socket.receive.buffer.bytes=64*1024
fetch.message.max.bytes =1024*1024
auto.commit.enable =true
auto.commit.interval.ms =60*1000
queued.max.message.chunks =10
rebalance.max.retries =4
rebalance.backoff.ms =2000
refresh.leader.backoff.ms
fetch.min.bytes =1
fetch.wait.max.ms =100
consumer.timeout.ms = -1
三 PRODUCER 的配置
比较核心的配置:metadata.broker.list、request.required.acks、producer.type、serializer.class
metadata.broker.list
request.required.acks =0
request.timeout.ms =10000
send.buffer.bytes=100*1024
key.serializer.class
partitioner.class=kafka.producer.DefaultPartitioner
compression.codec = none
compressed.topics=null
message.send.max.retries =3
retry.backoff.ms =100
topic.metadata.refresh.interval.ms =600*1000
client.id=""
------------------------------------------- 消息模式 相关 -------------------------------------------
producer.type=sync
queue.buffering.max.ms =5000
queue.buffering.max.messages =10000
queue.enqueue.timeout.ms = -1
batch.num.messages=200
serializer.class= kafka.serializer.DefaultEncoder
kafka中的key有啥作用?
kafka源码ProducerRecord.java类的注释说明了key的作用,注释如下:
A key/value pair to be sent to Kafka. This consists of a topic name to which the record is being sent, an optional partition number, and an optional key and value.
一个k/v对被发送到kafka。这包含被发送记录的主题名字,一个可选的分区编号,一个可选的key和value。
If a valid partition number is specified that partition will be used when sending the record. If no partition is specified but a key is present a partition will be chosen using a hash of the key. If neither key nor partition is present a partition will be assigned in a round-robin fashion.
如果一个有效的partition属性数值被指定,那么在发送记键喊带录时partition属性数值就会被应用。如果没有partition属性数值被指定,而一个key属性被声明的话,一个partition会通过key的hash而被选中。如果既没有key也渗蔽没有partition属性数值被声明稿芦,那么一个partition将会被分配以轮询的方式。
The record also has an associated timestamp. If the user did not provide a timestamp, the producer will stamp the record with its current time. The timestamp eventually used by Kafka depends on the timestamp type configured for the topic.
record也有一个关联的时间戳。如果用户未提供一个时间戳,producer 将会通过当前的时间标记此record。时间戳最终会被kafka应用,其依赖时间戳类型来配置主题。
If the topic is configured to use {@link org.apache.kafka.common.record.TimestampType#CREATE_TIME CreateTime},the timestamp in the producer record will be used by the broker。
如果主题是配置用的CREATE_TIME ,在producer记录中的时间戳将会被broker应用。
If the topic is configured to use {@link org.apache.kafka.common.record.TimestampType#LOG_APPEND_TIME LogAppendTime}, the timestamp in the producer record will be overwritten by the broker with the broker local time when it appends the message to its log.
如果主题被配置用的LogAppendTime,当broker添加消息到它的日志中的时候,producer记录中的时间戳将会被broker覆盖掉,覆盖成以broker本地的时间。
In either of the cases above, the timestamp that has actually been used will be returned to user in {@link RecordMetadata}
对于以上两者之一,确实已被应用的时间戳将会在RecordMetadata中返回给用户。
如何查看kafka消息的key
只有您知道您的账号,然后看一下别人给你发的信息才可以看得到
关于kafkakey和kafka可以脱离zookeeper单独使用吗的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。