kafkaflink(kafkaflink1136支持kafka3)

本篇文章给大家谈谈kafkaflink,以及kafkaflink1136支持kafka3对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。

本文目录一览:

4.一文搞定:Flink与Kafka之间的精准一次性

在上一篇文章当中,也算是比较详细且通俗的聊了聊Flink是如何通过checkpoint机制来完成数据精准一次性的实现的。并且也在上一章的结尾表示,要在接下来聊一聊Flink与它的铁哥们Kafaka之间,是如何实现数据的精准一次性消费的。

本次的聊法,还是要通过以kafka(source)-Flink,Flink(source)-Kafka来分别展开讨论。

kafka是一个具有数据保存、数据回放能力的消息队列,说白了就是kafka中的每一个数据,都有一个专门的标记作为标识。而在Flink消费kafka传入的数据的时候,source任务就能够将这个偏移量以算子状态的角色进行保存,写入到设定好的检查点中。这样一旦发生故障,Flink中的FlinkKafkaProduce连接器就i能够按照自己保存的偏移量,自己去Kafka中重新拉取数据,也正是通过这种方式,就能够确保Kafka到Flink之间的精准一次性。

在上一篇文章当中,已经表明了,如果想要让输出端能够进行精准一次性消费,就需要使用到幂等性或者是事务。而事务中的两阶段提交是所有方案里面最好的实现。

其实Flink到Kafak之间也是采用了这种方式,具体的可以看一下ctrl进到FlinkKafkaProduce连接器内部去看一看:

这也就表明了,当数据通过Flink发送给sink端Kafka的时候,禅枣是经历了两个阶段的处理的。第一阶段就是Flink向Kafka中插入数据,进入预提交阶段。当JobManager发送的Ckeckpoint保存成功信号过来之后,才会提交事务进行正式的数据发送,也就是让原来不可用的数据可以被使用了。

这个实现过程到目前阶段就很清晰了,它的主体流程无非就是在开启检查点之后,由JobManager向各个阶段的处理逻辑发送有关于检查点的barrier。所有的计算任务接收到之后,就会根据自己当前的状态做一个检查点隐袭茄保存。而当这个barrier来到sink任务的时候,sink就会开启一个事务,然后通过这个事务向外预写数据。直到Jobmanager来告诉它这一次的检查点已经保存完成了,sink就会进行第二次提交,数据也就算是成功写出了。

1.必须要保证检查点被打开了,如果检查点没有打开,那么之前说的一切话都是空谈。因为Flink默认检查点是关着的。

2.在FlinkKafakProducer连接器的构造函数中要传入参数,这个参数就是用来保证状态一致性的。就是在构造函数的最后一个参数输入如下:

3.配置Kafka读取数据的隔离级别

在kafka中有个配置,这个配置用来管理Kafka读取数据的级别。而这个配置默认是能够读取预提交阶段的数据的,所以如果你没改这个配置,那两阶段提交的第一阶段就是白费了。所以需要改一下这个配置,来更换一下隔离级别:

4.事务超时时间

这个配置也很有意思,大家试想一下。如果要进行两阶段提交,就要保证sink端支持事务,Kafka是支持事务的,但是像这个组件对于很多机制都有一个超时时间的概念,也就是说如果时间到了这个界限还没完成工作,那就会默认这个工作失败。Kafka中由这个概念,Flink中同样由这个概念。但是flink默认超时时间是1小时,而Kafka默认是15分钟,这就有可能出现检查点保存东西的时间大于15分钟,假如说是16分钟保存完成然后给sink发送检查点保存陈功可以提交事务的信号,但是这个时候Kafka已经认为事务失败,把之前的数据都扔了。那数据不就是丢失了么。所以说Kafka的超时时间要大于Flink的超时时间才好。

截止到目前为止,基本上把有关于状态维护的一些东西都灶察说完了,有状态后端、有检查点。还通过检查点完成可端到端的数据精准一次性消费。但是想到这我又感觉,如果有学习进度比我差一些的,万一没办法很好的理解怎么办。所以在下一篇文章当中我就聊聊Flink中的“状态”到底是个什么东西,都有什么类型,都怎么去用。

kafka与Flink集成问题记录

Caused by: java.lang.ClassCastException: cannot assign instance of org.apache.commons.collections.map.LinkedMap to field org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.pendingOffsetsToCommit of type org.apache.commons.collections.map.LinkedMap in instance of org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233)

at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2291)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)

at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)

at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:566)

at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:552)

at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:540)

at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:501)

at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:224)

... 4 more

Kafka库与Flink的反向类加载方法不兼容,修改 conf/flink-conf.yaml 并重启Flink

classloader.resolve-order: parent-first

flink与kafka结合

flink提供了一个瞎铅禅特有的kafka connector去读写kafka topic的数据。flink消费kafka数据,并不是完全通过跟踪kafka消费组的offset来实现去保证exactly-once的语义,而是flink内部去跟踪offset和做checkpoint去实现exactly-once的语义

flink与kafka整合,相应版本对于的maven依赖如下表

maven依赖举例

flink.version1.7.0/flink.version

scala.binary.version2.11/scala.binary.version

scala.version2.11.12/scala.version

dependency

  groupIdorg.apache.flink/groupId

  artifactIdflink-streaming-scala_${scala.binary.version}/artifactId

  version${flink.version}/version

  scopeprovided/scope

/dependency

flink利用FlinkKafkaConsumer来读取访问kafka, 根据kafka版本不同FlinkKafkaConsumer的类名也会变化,会变为FlinkKafkaConsumer

[08,09,10...]后面的数字就是对于的kafka的大版本号 。

初始化FlinkKafkaConsumer 需要如下参数

1、topic名字,用来指定消费一个或者多个topic的数据

2、kafka的配置信息,如zk地址端激销口,kafka地址端口等

3、反序列化器(schema),对消费数据选择一个反序列化器进行反序列化。

flink kafka的消磨尘费端需要知道怎么把kafka中消息数据反序列化成java或者scala中的对象。用户通过使用DeserializationSchema,每一条kafka的消息都会作用于DeserializationSchema的eserialize(byte[] message)方法。来将kafka的消息转换成用户想要的结构。

用户通过自定义schema将接入数据转换成自定义的数据结构,主要通过实现KeyedDeserializationSchema或者DeserializationSchema接口来完成,可以自定义。flink内置的 对DeserializationSchema 的实现有

public class SimpleStringSchema implements DeserializationSchemaString

public class TypeInformationSerializationSchemaT implements DeserializationSchemaT

对 KeyedDeserializationSchema的实现有

public class TypeInformationKeyValueSerializationSchemaK, V implements KeyedDeserializationSchemaTuple2K, V

public class JSONKeyValueDeserializationSchema implements KeyedDeserializationSchemaObjectNode

例如:

val myConsumer = new FlinkKafkaConsumer010[String]("topic",new SimpleStringSchema,p)

public class MySchema implements KeyedDeserializationSchemaKafkaMsgDTO {

    @Override

    public KafkaMsgDTO deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {

        String msg = new String(message, StandardCharsets.UTF_8);

        String key = null;

        if(messageKey != null){

            key = new String(messageKey, StandardCharsets.UTF_8);

        }

        return new KafkaMsgDTO(msg,key,topic,partition,offset);

    }

    @Override

    public boolean isEndOfStream(KafkaMsgDTO nextElement) {

        return false;

    }

    @Override

    public TypeInformationKafkaMsgDTO getProducedType() {

        return getForClass(KafkaMsgDTO.class);

    }

}

dependency

  groupIdorg.apache.flink/groupId

  artifactIdflink-connector-kafka-base_2.11/artifactId

  version1.7.0/version

/dependency

public class KafkaMsgDTO {

    private String topic;

    private int partition;

    private long offset;

    private String mesg;

    @Override

    public String toString() {

        return "KafkaMsgDTO{" +

                "topic='" + topic + '\'' +

                ", partition=" + partition +

                ", offset=" + offset +

                ", mesg='" + mesg + '\'' +

                ", key='" + key + '\'' +

                '}';

    }

    private String key;

    public KafkaMsgDTO(){

    }

    public KafkaMsgDTO(String mesg,String key,String topic,int partition,long offset){

        this.mesg = mesg;

        this.key = key;

        this.topic = topic;

        this.partition = partition;

        this.offset = offset;

    }

    public String getKey() {

        return key;

    }

    public void setKey(String key) {

        this.key = key;

    }

    public String getTopic() {

        return topic;

    }

    public void setTopic(String topic) {

        this.topic = topic;

    }

    public int getPartition() {

        return partition;

    }

    public void setPartition(int partition) {

        this.partition = partition;

    }

    public long getOffset() {

        return offset;

    }

    public void setOffset(long offset) {

        this.offset = offset;

    }

    public String getMesg() {

        return mesg;

    }

    public void setMesg(String mesg) {

        this.mesg = mesg;

    }

}

val myConsumer = new FlinkKafkaConsumer010[KafkaMsgDTO]("topic",new MySchema(),p)

//    myConsumer.setStartFromEarliest()     

//从最早开始消费,消费过的数据会重复消费,从kafka来看默认不提交offset.

//    myConsumer.setStartFromLatest()       

//从最新开始消费,不消费流启动前未消费的数据,从kafka来看默认不提交offset.

      myConsumer.setStartFromGroupOffsets()

//从消费的offset位置开始消费,kafka有提交offset,这是默认消费方式

//如果没有做checkpoint 数据进入sink就会提交offset,如果sink里面逻辑失败。offset照样会提交,程序退出,如果重启流,消费失败的数据不会被重新消费

//如果做了checkpoint 会保证数据的端到端精准一次消费。sink里面逻辑失败不会提交offset

env.enableCheckpointing(5000);

val stream = env.addSource(myConsumer)

stream.addSink(x={

  println(x)

  println(1/(x.getMesg.toInt%2))//消息是偶数就会报错,分母为0

  println(x)

})

val stream = env.addSource(myConsumer)

//实验表明如果sink处理逻辑有一部线程在跑,如果异步线程失败。offset照样会提交。

stream.addSink(x={

  println(x)

  new Thread(new Runnable {

    override def run(): Unit = {

      println(1/(x.getMesg.toInt%2))//消息是偶数就会报错,分母为0

    }

  }).start()

  println(x)

})

val specificStartOffsets = new java.util.HashMap[KafkaTopicPartition, java.lang.Long]()

specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L)

specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L)

specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L)

myConsumer.setStartFromSpecificOffsets(specificStartOffsets)

[img]

flink消费kafka细节

# flink消费kafka细节

Apache kafka connector提供对Kafka服务的事件流的访问。Flink提供了特殊的Kafka连接器,用于从Kafka主题读写数据。 Flink Kafka Consumer与Flink的检查点机制集成在一起,以提供一次精确的处理语义。 为此,Flink不仅仅依赖于Kafka的消费者群体偏移量跟踪,还内部跟踪和检查这些好樱偏移量。

请为您的用例和环境选择一厅袜含个包(Maven项目ID)和类名扮笑。 对于大多数用户来说,FlinkKafkaConsumer08(flink-connector-kafka的一部分)是合适的。

| Maven Dependency                | Supported since | Consumer and Producer Class name            | Kafka version | Notes                                                        |

| :------------------------------ | :-------------- | :------------------------------------------ | :------------ | :----------------------------------------------------------- |

| flink-connector-kafka-0.8_2.11  | 1.0.0          | FlinkKafkaConsumer08 FlinkKafkaProducer08  | 0.8.x        | Uses the [SimpleConsumer]() API of Kafka internally. Offsets are committed to ZK by Flink. |

| flink-connector-kafka-0.9_2.11  | 1.0.0          | FlinkKafkaConsumer09 FlinkKafkaProducer09  | 0.9.x        | Uses the new [Consumer API]() Kafka. |

| flink-connector-kafka-0.10_2.11 | 1.2.0          | FlinkKafkaConsumer010 FlinkKafkaProducer010 | 0.10.x        | This connector supports [Kafka messages with timestamps]() both for producing and consuming. |

| flink-connector-kafka-0.11_2.11 | 1.4.0          | FlinkKafkaConsumer011 FlinkKafkaProducer011 | 0.11.x        | Since 0.11.x Kafka does not support scala 2.10. This connector supports [Kafka transactional messaging]() to provide exactly once semantic for the producer. |

| flink-connector-kafka_2.11      | 1.7.0          | FlinkKafkaConsumer FlinkKafkaProducer      | = 1.0.0      | This universal Kafka connector attempts to track the latest version of the Kafka client. The version of the client it uses may change between Flink releases. Starting with Flink 1.9 release, it uses the Kafka 2.2.0 client. Modern Kafka clients are backwards compatible with broker versions 0.10.0 or later. However for Kafka 0.11.x and 0.10.x versions, we recommend using dedicated flink-connector-kafka-0.11_2.11 and flink-connector-kafka-0.10_2.11 respectively. |

在创建kafka consumer时,需要指定一些参数

```java

Properties properties = new Properties();

// kafka broker地址,用逗号隔开

properties.setProperty("bootstrap.servers", "localhost:9092");

// zookeeper机器地址,仅在Kafka 0.8用到

properties.setProperty("zookeeper.connect", "localhost:2181");

// kafka消费者的group.id

properties.setProperty("group.id", "test");

DataStreamString stream = env

.addSource(new FlinkKafkaConsumer08("topic", new SimpleStringSchema(), properties));

```

### flink消费kafka时的容错

启用flink检查点之后,flink会定期checkpoint offset,万一作业失败,Flink将把流式程序恢复到最新检查点的状态,并从存储在检查点的偏移量开始重新使用Kafka的记录。

```java

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.enableCheckpointing(5000); // checkpoint every 5000 msecs

```

### flink worker和kafka partition对应关系

partition会分配给flink并行的task,当task比partition数量多时,会有task进程闲置

![Fewer Partitions](./fewer_partition.png)

当kafka的partition比flink task多时,一个task会分配到多个partition

![More Partitions](./more_partition.png)

### flink如何保证kafka的恰好一次处理

flink kafka consumer和flink的检查点机制紧密集成,flink每次从kafka队列读到新数据都会更新offset到state中,flink kafka consumer是一个stateful operator,其state存的就是offset。

### 从Kafka主题阅读时,Flink如何处理背压?

如果下游无法以相同的速度处理所有传入数据,则像Flink这样的流媒体系统必须能够减慢flink kafka consumer消费的速度。这就是所谓的反压处理。 Flink的Kafka consumer自带处理backpressure的机制:一旦下游无法跟上Kafka消费的速度,Flink就会放慢来自Kafka的消息的消费,从而减少来自代理的请求。由于代理将所有消息都保留在磁盘上,因此它们也可以提供过去的消息。一旦操作员再次加速,Flink将全速消耗累积的消息。这种行为使Kafka非常适合作为流源和Flink之间的缓冲区,因为它为负载高峰时的事件提供了持久的缓冲区。

### kafka生产者API的使用

flink 中已经预置了 kafka 相关的数据源实现 FlinkKafkaConsumer010 ,先看下具体的实现:

kafka 的 Consumer 有一堆实现,不过最终都是继承自 FlinkKafkaConsumerBase ,而这个抽象类则是继承 RichParallelSourceFunction ,是不是很眼熟,跟自定义 mysql 数据源继承的抽象类 RichSourceFunction 很类似液空。

可以看到,这里有很多构造函数,我们直接使用即可。

说明:铅岩

a、这里直接使用 properties 对象来设置 kafka 相关配置,比如 brokers 、 zk 、 groupId 、 序列化 、 反序列化 等。

b、使用 FlinkKafkaConsumer010 构造函数,指定 topic 、 properties 配置

c、 SimpleStringSchema 仅针对 String 类型数据的序列化及反序列化,如果 kafka 中消息的内容不是 String ,则会报错;看下 SimpleStringSchema 的定义:

d、这里直接把获取槐埋御到的消息打印出来。

关于kafkaflink和kafkaflink1136支持kafka3的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。

标签列表