标准 专业
多元 极客

Kafka研究院(1)——我们视角下的生产者

Kafka生产者有如下作用:

  • 是我们生产消息的入口。
  • 负责将消息根据用户的配置进行组装。
  • 和Kafka服务端进行交互。

本篇文章将会带着大家了解我们视角下Kafka是如何工作的。

Spring Boot with Kafka

使用Kafka的途径有很多,Kafka有自己的原生的两种KafkaProducer实现(新旧两种KafkaProducer分别使用不同的版本和功能,使用前需要注意)、Spring Kafka以及各个公司对Kafka的封装及客户端实现。
在本节中,我们将使用Spring Boot来展现KafkaProducer的使用。
在Spring Boot中使用Kafka,我们需要引入以下依赖:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>${spring.kafka.version}</version>
</dependency>

接下来,我们需要对Kafka做一些配置(application.yml),指定Kafka服务端的地址:

spring:
  kafka:
    bootstrap-servers: [127.0.0.1:9092]

接着,到了我们的项目的硬编码部分,如果我们是第一次使用Kafka,还没有创建topic,那么可以在应用程序端,声明创建一个新的topic:

/**
 * @return 创建topic Bean
 */
@Bean
public NewTopic buildSunshineTopic() {
    return new NewTopic("sunshine", 1, (short) 1);
}

当然,如果你的系统比较微,也可以在已有topic的情况下,在配置文件(application.yml)中指定默认的topic:

spring:
  kafka:
    template:
      default-topic: sunshine

由于应用程序级别,Kafka生产消息属于底层通用服务,即任何业务层都可以调用的层级,所以我们创建Service来实现Kafka生产通用服务代码:

@Service("kafkaProduceService")
public class KafkaProduceServiceImpl implements KafkaProduceService {

	@Autowired
	public KafkaProduceServiceImpl(KafkaTemplate<Object, Object> kafkaTemplate) {
		this.kafkaTemplate = kafkaTemplate;
	}

	private final KafkaTemplate<Object, Object> kafkaTemplate;

	@Override
	public String produce(String message) {
		this.kafkaTemplate.send("sunshine", 8, "", message);
		return "success";
	}
}

我们简单构造一个发送消息的入口:

@RestController
@RequestMapping("/laboratory/kafka")
public class KafkaController {

	@Autowired
	public KafkaController(KafkaProduceService kafkaProduceService) {
		this.kafkaProduceService = kafkaProduceService;
	}

	/**
	 * Kafka生产服务
	 */
	private final KafkaProduceService kafkaProduceService;

	@PostMapping("/produce/{message}")
	public String produce(@PathVariable String message) {
		return kafkaProduceService.produce(message);
	}
}

接着,调用:

/laboratory/kafka/produce/sunshine

发送几条消息进行测试。

测试结果:

Starting offset: 5
baseOffset: 5 lastOffset: 5 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1584334868467 size: 106 magic: 2 compresscodec: NONE crc: 198471501 isvalid: true
| offset: 5 CreateTime: 1584334868467 keysize: 0 valuesize: 10 sequence: -1 headerKeys: [__TypeId__] key:  payload: "sunshine"
baseOffset: 6 lastOffset: 6 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 106 CreateTime: 1584334904169 size: 106 magic: 2 compresscodec: NONE crc: 1400524010 isvalid: true
| offset: 6 CreateTime: 1584334904169 keysize: 0 valuesize: 10 sequence: -1 headerKeys: [__TypeId__] key:  payload: "sunshine"
baseOffset: 7 lastOffset: 7 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 212 CreateTime: 1584334909495 size: 106 magic: 2 compresscodec: NONE crc: 2789437657 isvalid: true
| offset: 7 CreateTime: 1584334909495 keysize: 0 valuesize: 10 sequence: -1 headerKeys: [__TypeId__] key:  payload: "sunshine"

Kafka服务端已经收到并落地我们发送的消息。

KafkaTemplate的作用

我们调用了KafkaTemplate#send()方法,即向指定Kafka集群发送了消息:

@Override
public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V data) {
    ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, data);
    return doSend(producerRecord);
}

无论是简单的直接发送,还是指定自定义时间戳发送,亦或是指定partition序号发送,KafkaTemplate都会将这些信息组装成一个ProdcuerRecord,并继续调用底层的KafkaTemplate#doSend()方法。

ProducerRecord

ProducerRecord供我们所在的producer客户端使用,用于将我们写入的topic、partition序号、消息键值对及时间戳等信息,组装为一个对象。

发送至指定的partition

KafkaTemplate#send()方法调用中可以看出,我们可以发送消息到指定partition,ProducerRecord#partition存储了我们需要发送到的partition序号,默认为null。

partition序号不能指定为负数。

  • 如果我们指定了合法的partition序号,record将会被发送到指定的partition上。
  • 如果我们没有指定partition序号,但是指定了消息的key,Kafka集群会对key做一次hash计算,并将record发送到hash计算出来的值所在的partition上。
  • 如果我们既没有指定partition序号,也没有使用消息的key,那么Kafka集群将会使用算法选取出一个合适的partiton,用于存储record,之前使用的round-robin算法,现在使用的是StickyPartitionCache提供的算法。

时间戳的选择

时间戳和日志的保存策略和日志的切分策略有着密切的关系,并不仅仅是record本身的时间戳,所以对适当的topic选择合适的record时间戳类型,至关重要。

  • 如果我们调用KafkaTemplate#send()方法时给定了时间戳,则使用我们提供的时间戳。
  • 如果我们没有提供时间戳,ProducerRecord会存储当前时间为record的时间戳。
  • 此外Kafka还会根据topic的配置的时间戳策略选取时间戳。
    • topic配置使用{@link org.apache.kafka.common.record.TimestampType#CREATE_TIME CreateTime}类型的时间戳,broker将会使用ProducerRecord#timestamp。
    • topic配置使用{@link org.apache.kafka.common.record.TimestampType#LOG_APPEND_TIME LogAppendTime},broker将会在record追加到日志文件时,覆盖ProducerRecord中存储的时间戳。

Header

消息头是Kafka在KIP-82中新加入的消息部分,加入Header会有以下的优点:

  • HTTP和TCP协议中都有Header部分,所以消息也应该有Header这部分的概念,有助于和其他协议进行对齐。
  • 增加Header可以在集群更自动化的进行路由。
  • 可以在Header中设置事务ID字段,完成端到端的事务流程监控。
  • 可以检查和修改元数据,比如生产record的clientId,幂等时消息的唯一ID,在多集群路由时最初收到此record的clusterId。
  • 消息有时候是需要端到端加密的,一些企业级组件需要访问元数据。
  • 消息是可以进行压缩的,在压缩之后,是无法将元数据添加到K中。

等等。

其他两个参数:

  • key,消息键,在partition选择中已经讲述过,用于在我们未指定partition序号时,使用hash计算出一个partition序号。
  • value,消息主题内容,用于存储具体的消息,相等于HTTP中的payload。

doSend()

在构建完一个ProducerRecord之后,会调用KafkaTemplate#doSend()方法进行发送:

protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
    // 获取对应类型的producer实例
    final Producer<K, V> producer = getTheProducer();
    this.logger.trace(() -> "Sending: " + producerRecord);
    final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>();
    Object sample = null;
    // 开启listener的计时器
    if (this.micrometerEnabled && this.micrometerHolder == null) {
        this.micrometerHolder = obtainMicrometerHolder();
    }
    if (this.micrometerHolder != null) {
        sample = this.micrometerHolder.start();
    }
    // 使用producer发送消息,并构造回调方法
    producer.send(producerRecord, buildCallback(producerRecord, producer, future, sample));
    // 如果开启了autoflush,则执行flush操作
    if (this.autoFlush) {
        flush();
    }
    this.logger.trace(() -> "Sent: " + producerRecord);
    return future;
}
  1. producer的类型分为三种:普通的producer,由Kafka保证幂等性的producer支持事务的producer
  2. 如果我们需要开启listener的计时器,需要将micrometerEnabled设为true,此参数默认为true
  3. 接着,将会使用producer实例发送消息,底层方法KafkaTemplate#send()的返回类型是的Future,KafkaTemplate为我们默认生成了回调任务,用于处理Kafka服务端返回的响应。
  4. 如果开启了autoflush,则会执行flush()操作。

SettableListenableFuture是Spring包装的ListenableFuture,为ListenableFuture又延伸出了set()和setException()两种便捷操作方法。

三种producer类型

三种producer类型底层其实都是KafkaProducer,但是生成KafkaProducer所使用的参数不同。

事务型producer

如我我们在配置中设定了transaction-id-prefix

spring:
  kafka:
    transaction-id-prefix: sunshine

那么就会默认开启事务型的producer。

在开启事务型producer的同时,也会默认同时开启幂等性。

幂等型producer

我们可以配置属性enable.idempotence=true的方式开启幂等型producer,我们可以为ProducerFactory添加此配置。

@Bean
public ProducerFactory<String, String> buildProducerFactory() {
    Map<String, Object> kafkaProducerConfigMap = new HashMap<>();
    kafkaProducerConfigMap.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
    return new DefaultKafkaProducerFactory<>(kafkaProducerConfigMap);
}

ProducerConfig是kafka-clients依赖提供给我们使用的配置属性类,里面有各个客户端的属性配置及用途释义。

幂等型producer将会保证KafkaProducer在重试时不会出现消息重复的现象,也就是将发送语义从至少一次提升至准确一次

普通型Producer

普通型producer即为我们使用场景最多的producer类型。

getTheProducer()

经过上面对producer的类型的简单介绍后,我们继续看KafkaTemplate#getTheProducer()方法:

private Producer<K, V> getTheProducer() {
    // 是否已经配置开启producer的事务性
    boolean transactionalProducer = this.transactional;
    if (transactionalProducer) {
        // 如果已经配置开启prodcuer的事务性,则判断当前场景下,是否已经进入事务
        boolean inTransaction = inTransaction();
        // 校验事务的状态,如果不允许非事务性的生产过长,并且当前调用线程没有处于事务中
        // 进行提醒,建议使用KafkaTemplate#executeInTransaction()或@Transactinal注解
        Assert.state(this.allowNonTransactional || inTransaction,
                     "No transaction is in process; "
                     + "possible solutions: run the template operation within the scope of a "
                     + "template.executeInTransaction() operation, start a transaction with @Transactional "
                     + "before invoking the template method, "
                     + "run in a transaction started by a listener container when consuming a record");
        // 如果允许非事务的生产过程,并且当前调用线程也不处于事务中,则可以不使用事务producer
        if (!inTransaction) {
            transactionalProducer = false;
        }
    }
    // 在当前调用线程处于事务中时,需要使用事务型producer
    if (transactionalProducer) {
        // 如果已经存在producer实例,直接从KafkaTemplate的ThreadLocal中获取producer实例即可
        Producer<K, V> producer = this.producers.get();
        if (producer != null) {
            return producer;
        }
        // 如果事务是刚刚开启,还没有创建producer实例,则使用producerFactory工厂,配置的transactinId和事务默认等待时间创建新的producer实例
        KafkaResourceHolder<K, V> holder = ProducerFactoryUtils
            .getTransactionalResourceHolder(this.producerFactory, this.transactionIdPrefix, this.closeTimeout);
        return holder.getProducer();
    }
    // 接下来的两种情况,均由DefaultKafkaProducerFactory提供实现
    // 如果允许非事务型的producer,则创建非实物型producer实例
    else if (this.allowNonTransactional) {
        return this.producerFactory.createNonTransactionalProducer();
    }
    else {
        // 根据实际情况创建producer实例
        return this.producerFactory.createProducer();
    }
}
  1. 按照producer的优先级顺序来创建:事务型=>幂等型=>普通型。
  2. this.tranactional的值由transaction-id-prefix的值来确定,而当前场景是否使用事务型producer,则根据当前线程的环境、已经实例化的ProducerFactory事务管理器来进行决定。
  3. 如果需要使用事务型producer,则会从当前已经初始化的producer缓存中获取,或者使用ProducerFactory初始化一个事务型producer。
  4. allowNonTransactional属性是2.4.3版本新增的特性的,用于在KafkaTemplate为全局事务型时,在未开启事务时,也可以使用普通型producer发送消息。
  5. 其他情况:
    1. 配置中开启事务,但是当前消息发送上下文没有调用开启事务,也不允许非事务型producer的创建,则也会创建一个事务型producer。
    2. 配置中为开启事务,也不允许非实物型Producer的创建,则会创建一个普通的producer事务。

无论是事务型producer还是非事务型的producer,都会调用DefaultKafkaProducerFactory#doCreateProducer()创建producer实例,浓浓的Spring内味儿。

private Producer<K, V> doCreateProducer(@Nullable String txIdPrefix) {
    // 如果存在事务ID前缀
    if (txIdPrefix != null) {
        // 如果需要避免僵尸实例问题,则可以在ProducerFactory中开启producerPerConsumerPartition配置
        // 僵尸实例问题:某个producer实例临时失联,新的producer会代替失联的实例,而失联的producer恢复后,就会出现两条同样的消息
        if (this.producerPerConsumerPartition) {
            return createTransactionalProducerForPartition(txIdPrefix);
        }
        else {
            // 如果我们可以自己处理重复调用问题,则直接创建事务型producer实例
            return createTransactionalProducer(txIdPrefix);
        }
    }
    // 如果需要为每个线程创建producer
    // 为什么需要为每个线程创建producer
    // 多个线程调用同一个producer实例时,一个线程进行flush()操作,会阻塞其他的线程
    if (this.producerPerThread) {
        CloseSafeProducer<K, V> tlProducer = this.threadBoundProducers.get();
        // 则为每个线程单独创建producer
        if (tlProducer == null) {
            tlProducer = new CloseSafeProducer<>(createKafkaProducer());
            this.threadBoundProducers.set(tlProducer);
        }
        return tlProducer;
    }
    // 此种情况默认为我们经常使用的producer实例
    // 即进程内的线程共用同一个producer
    if (this.producer == null) {
        synchronized (this) {
            // 需要双重校验所保证线程安全
            if (this.producer == null) {
                // 创建producer实例
                this.producer = new CloseSafeProducer<>(createKafkaProducer());
            }
        }
    }
    return this.producer;
}

此时,producer类型的区分取决于是否配置了tranaction-id-prefix属性。

但是此时不仅仅考虑的是创建类型的问题,还要考虑producer实例运行环境的问题。

在创建事务型producer时:

  • 如果我们可以自行处理僵尸实例问题,则直接创建一个向topic发送消息的producer实例。
  • 如果需要Kafka客户端来保证消息发送唯一性,则Kafka客户端会为topic的每个partition维护一个producer实例。

若需要Kafka客户端保证消息发送的唯一性,则需要进行设置defaultKafkaProducerFactory.setProducerPerConsumerPartition(true)的同时,还需要在发送消息是指定transaction-id的后缀:TransactionSupport.setTransactionIdSuffix(“1”)

无论是为全局还是每个partition创建事务型producer,最终都会调用DefaultKafkaProducerFactory#doCreateTxProducer()方法:

private CloseSafeProducer<K, V> doCreateTxProducer(String prefix, String suffix,
                                                   @Nullable Consumer<CloseSafeProducer<K, V>> remover) {
    Producer<K, V> newProducer;
    // producuer的配置
    Map<String, Object> newProducerConfigs = new HashMap<>(this.configs);
    newProducerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, prefix + suffix);
    if (this.clientIdPrefix != null) {
        newProducerConfigs.put(ProducerConfig.CLIENT_ID_CONFIG,
                               this.clientIdPrefix + "-" + this.clientIdCounter.incrementAndGet());
    }
    // 创建producer实例
    newProducer = createRawProducer(newProducerConfigs);
    // 开启producer的事务
    newProducer.initTransactions();
    // 使用Spring包装的一层,可友好关闭的producer实例
    return new CloseSafeProducer<>(newProducer, getCache(prefix), remover,
                                   (String) newProducerConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG));
}
  1. 读取我们手动配置以及默认的Kafka配置,创建一个原生的KafkaProducer。
  2. 如果我们指定了客户端前缀,Spring还会为每个客户端生成一个唯一的编号,供Kafka服务端进行识别。
  3. Spring为了处理Bean的生命周期,还将原生的KafkaProducer包装为CloseSafeProducer,这样可以更友好的关闭KafkaProducer实例。

除了开启事务时,为每个partition创建producer实例,Spring Kafka还支持为每个线程创建KafkaProducer实例。

为什么需要为每个线程创建一个producer实例?

是因为如果多个线程同时调用一个producer实例,那么在其中一个线程调用flush()对缓冲区进行清理时,其他线程均会阻塞等待flush()调用结束。

为每个线程创建KafkaProducer实例的实现思路比较简单,是在KafkaTemplate的ThreadLocal中写入KafkaProducer实例。

如何开启每个线程创建一个producer实例?

defaultKafkaProducerFactory.setProducerPerThread(true);

其他情况下,将会以同步的方式创建一个进程内共享的producer实例,

而在创建KafkaProducer时,直接使用原生的API进行初始化:

protected Producer<K, V> createRawProducer(Map<String, Object> configs) {
    // 直接实例化Kafka底层的API
    return new KafkaProducer<>(configs, this.keySerializerSupplier.get(), this.valueSerializerSupplier.get());
}
赞(3) 投币
未经允许不得转载:随遇而安 » Kafka研究院(1)——我们视角下的生产者
分享到: 更多 (0)

慕勋的实验室慕勋的研究院

码字不容易,路过请投币

支付宝扫一扫打赏

微信扫一扫打赏