标准 专业
多元 极客

Kafka研究院(2)——消息的异步发送

在上一篇文章中我们提到,KafkaProducer会将每次发送的消息,追加到一个RecordAccumulator中,由RecordAccumulator负责消息的实际发送。

本篇文章将对RecordAccumulator进行原理分析。

过长流程的代码,会逐段进行分析。

消息追加

KafkaProducer#doSend()方法中,我们通过调用RecordAccumulator#append()方法,将消息追加到RecordAccumulator中:

// 累加器的计数器操作
appendsInProgress.incrementAndGet();
ByteBuffer buffer = null;
// 默认是一个空header
if (headers == null) headers = Record.EMPTY_HEADERS;

首先是全局计数器和当前消息头header的默认设置。

// 校验我们是已有一个在运行中的batch deque
Deque<ProducerBatch> dq = getOrCreateDeque(tp);
// 进行同步操作
synchronized (dq) {
    if (closed)
        throw new KafkaException("Producer closed while send in progress");
    // 尝试写入到batch中
    RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
    if (appendResult != null)
        return appendResult;
}

接下来,我们会获取当前partition的存储消息的双向队列

如果成功获取存储当前partition消息的存储队列,则尝试将消息追加到此队列中,并返回追加结果。

getOrCreateDeque()

既然需要获取partition在缓冲区存储的消息队列,那么必然存在创建的情况,我们继续看getOrCreateDeque()方法的具体实现:

private Deque<ProducerBatch> getOrCreateDeque(TopicPartition tp) {
    // 从缓存中获取
    Deque<ProducerBatch> d = this.batches.get(tp);
    if (d != null)
        return d;
    // 没有指定的分区的batch deque,创建一个新的
    d = new ArrayDeque<>();
    // 避免出现并发的情况,获取覆盖的值,如果其他线程已经创建了新deque,则使用此deque,否则使用当前线程创建的deque
    Deque<ProducerBatch> previous = this.batches.putIfAbsent(tp, d);
    if (previous == null)
        return d;
    else
        return previous;
}
  1. 先从缓存中获取指定partition的消息队列,如果指定的消息队列存在,则直接返回此消息队列。
  2. 如果不存在指定的消息队列,则需要为指定的partition创建一个新的消息队列
  3. 消息队列的类型是ArrayDeque
  4. 此时其他生产者线程可能也会往给定的partition发送消息,可能已经创建了一个消息队列,此时做一下兼容:
    • 如果已经有线程创建了消息队列,则返回已创建的消息队列。
    • 否则,使用当前线程创建的消息队列。

选择batch追加消息

tryAppend()会尝试将给定的消息写入到当前partition的消息队列中:

private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
                                     Callback callback, Deque<ProducerBatch> deque) {
    // 获取最后一个ProducerBatch,最后一个是要写入的batch
    ProducerBatch last = deque.peekLast();
    if (last != null) {
        FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());
        // 如果返回null,证明最后一个deque已经写不下了
        if (future == null)
            // 关闭当前batch集合的写入
            last.closeForRecordAppends();
        else
            // 组装追加结果
            return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false, false);
    }
    return null;
}

此时我们发现,其实队列中存放的,并不是每一条消息实体,而是一个batch

batch相当于批记录,存储了一定数量的record,一个批记录满了,则不会继续追加新record。

  1. 首先,我们获取最后一个batch。
  2. 如果当前队列没有batch,则本次无法追加消息,直接返回。
  3. 如果存在batch,则尝试向此batch中追加消息。
  4. 如果追加结果返回null,则证明最后一个batch已经满了,无法写入了,返回因为batch已满无法写入的追加结果。

向batch中追加消息

ProducerBatch#tryAppend()用于向给定batch中追加消息:

public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, 				Callback callback, long now) {
    // 校验当前batch还能继续写入
    if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {
        // 不能继续写入,返回null
        return null;
    } else {
        // 向MemoryRecords中追加内容
        Long checksum = this.recordsBuilder.append(timestamp, key, value, headers);
        // 计算当前batch最大的record的大小
        this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(),
                                                                                                        recordsBuilder.compressionType(), key, value, headers));
        // 更新追加时间
        this.lastAppendTime = now;

        FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
                                                               timestamp, checksum,
                                                               key == null ? -1 : key.length,
                                                               value == null ? -1 : value.length,
                                                               Time.SYSTEM);
        // 我们必须确保每个返回给开发者的future,以防batch出现需要分隔成为几个新的batch,然后再重新发送
        thunks.add(new Thunk(callback, future));
        // 操作record计数器
        this.recordCount++;
        return future;
    }
}
  1. 首先,校验当前batch是否仍有写入空间,如果不能继续写入,则返回null。
  2. ProducerBatch使用MemoryRecordsBuilder写入实际的消息。
  3. 计算当前batch中最大的record的大小
  4. 更新最后一次追加消息的时间戳。
  5. 构建消息的发送的异步任务,并添加到异步任务列表中,等待回调执行,此时主要避免因为一个batch过大,导致需要拆分为几个小batch重新发送带来的问题。
  6. 递增当前batch的record计数器。

向MemoryRecords中追加消息

调用MemoryRecordsBuilder#append()方法,会向MemoryRecords中追加消息:

private Long appendWithOffset(long offset, boolean isControlRecord, long timestamp, ByteBuffer key,
                                ByteBuffer value, Header[] headers) {
    try {
        ...
        if (magic > RecordBatch.MAGIC_VALUE_V1) {
            appendDefaultRecord(offset, timestamp, key, value, headers);
            return null;
        } else {
            return appendLegacyRecord(offset, timestamp, key, value, magic);
        }
    } catch (IOException e) {
        throw new KafkaException("I/O exception when writing to the append stream, closing", e);
    }
}

由于Kafka存在KafkaProducer客户端版本不同的问题,此时会分版本进行处理,暂且可以认为:

MAGIC_VALUE_V20.10.2左右的版本。

MAGIC_VALUE_V10.9.0左右的版本。

MAGIC_VALUE_V0:更老的版本。

此时对于的是MAGIC_VALUE_V2MAGIC_VALUE_V0MAGIC_VALUE_V1做了不同的写入处理。

消息版本V2的写入处理

MemoryRecordsBuilder#appendDefaultRecord()用于追加V2版本的消息:

private void appendDefaultRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value,
                                 Header[] headers) throws IOException {
    // 确认数据输出流处于开启状态
    ensureOpenForRecordAppend();
    // 计算相对偏移量
    int offsetDelta = (int) (offset - baseOffset);
    // 计算时间偏移量
    long timestampDelta = timestamp - firstTimestamp;
    // 获取写入的字节数量
    // 注意:写入的时候,写入的是基于baseOffset的相对偏移量,时间戳也是相对时间戳
    int sizeInBytes = DefaultRecord.writeTo(appendStream, offsetDelta, timestampDelta, key, value, headers);
    // 计数,标识位更新(上一次写入的绝对偏移量,时间戳)
    recordWritten(offset, timestamp, sizeInBytes);
}
  1. 首先,确认数据输出流DataOutputStream是否关闭。
  2. 接着,计算追加的消息,相对于当前batch的第一条消息的相对偏移量和偏移时间。
  3. 向数据流中写入消息,注意,此时使用的都是相对的偏移量和时间戳。
  4. 递增当前MemoryRecords中record数量,记录当前最近一次写入的偏移量和时间戳。

我们继续来看如何写入MemoryRecords中的,DefaultRecord#writeTo()

public static int writeTo(DataOutputStream out,
                          int offsetDelta,
                          long timestampDelta,
                          ByteBuffer key,
                          ByteBuffer value,
                          Header[] headers) throws IOException {
    // 获取key,value的字节大小,使用Protobuf压缩后的
    int sizeInBytes = sizeOfBodyInBytes(offsetDelta, timestampDelta, key, value, headers);
    // 使用varint写入消息key,value
    ByteUtils.writeVarint(sizeInBytes, out);
	// 现在record attributes还没有使用
    byte attributes = 0;
    out.write(attributes);
    // 相对时间戳,varint类型
    ByteUtils.writeVarlong(timestampDelta, out);
    // 相对偏移量,varint类型
    ByteUtils.writeVarint(offsetDelta, out);

    if (key == null) {
        // 如果没有key,则写入-1作为占位,也是varint类型
        ByteUtils.writeVarint(-1, out);
    } else {
        // 使用varint写入key的大小,再正常的写入key
        int keySize = key.remaining();
        ByteUtils.writeVarint(keySize, out);
        Utils.writeTo(out, key, keySize);
    }
    // 写入value
    if (value == null) {
        ByteUtils.writeVarint(-1, out);
    } else {
        int valueSize = value.remaining();
        ByteUtils.writeVarint(valueSize, out);
        Utils.writeTo(out, value, valueSize);
    }

    if (headers == null)
        throw new IllegalArgumentException("Headers cannot be null");
    // 写入header的长度
    ByteUtils.writeVarint(headers.length, out);
    // 写入header信息
    for (Header header : headers) {
        String headerKey = header.key();
        if (headerKey == null)
            throw new IllegalArgumentException("Invalid null header key found in headers");

        byte[] utf8Bytes = Utils.utf8(headerKey);
        ByteUtils.writeVarint(utf8Bytes.length, out);
        out.write(utf8Bytes);

        byte[] headerValue = header.value();
        if (headerValue == null) {
            ByteUtils.writeVarint(-1, out);
        } else {
            ByteUtils.writeVarint(headerValue.length, out);
            out.write(headerValue);
        }
    }
    // record的总大小(已经算上了varint)+ sizeInBytes的varint的大小
    return ByteUtils.sizeOfVarint(sizeInBytes) + sizeInBytes;
}

代码很长,有点占篇幅,其实主要将消息大小消息属性相对时间戳相对偏移量key的大小key值value大小value值headers依次使用Protobuf写入DataOutputStream中,最后返回消息大小消息内容压缩后的大小

MemoryRecordsBuilder中的DataOutputStream

DataOutputStream用于装饰其他输出流,是底层FilterOutputStream的高级封装,可以直接写入Java的基本类型。

MemoryRecordsBuilder的创建的分析,将在下文给出。

更老的消息写入处理

更老的消息写入并非采用了其他序列化手段,而是直接将消息转化为ByteBuffer后,写入数据流中,MemoryRecordsBuilder#appendLegacyRecord()

private long appendLegacyRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value, byte 			magic) throws IOException {
    ensureOpenForRecordAppend();
    if (compressionType == CompressionType.NONE && timestampType == TimestampType.LOG_APPEND_TIME)
        timestamp = logAppendTime;

    int size = LegacyRecord.recordSize(magic, key, value);
    AbstractLegacyRecordBatch.writeHeader(appendStream, toInnerOffset(offset), size);

    if (timestampType == TimestampType.LOG_APPEND_TIME)
        timestamp = logAppendTime;
    long crc = LegacyRecord.write(appendStream, magic, timestamp, key, value, CompressionType.NONE, timestampType);
    recordWritten(offset, timestamp, size + Records.LOG_OVERHEAD);
    return crc;
}
  1. 首先,也需要确认数据流是否关闭,此数据和V2版本使用的数据流是相同的。
  2. 接着,计算消息版本类型+key大小+value大小的消息总大小。
  3. 然后,向消息的头部写入相对偏移量和消息大小。
  4. 如果用于指定了消息追加的类型是LOG_APPEND_TIME类型,则添加时间戳。
  5. 然后,一次向数据流中写入消息版本类型消息追加时间key值value值压缩类型消息追加时间戳类型
  6. 递增当前MemoryRecords中record数量,记录当前最近一次写入的偏移量和时间戳。

消息追加成功的结果

在消息追加成功后,会返回一个FutureRecordMetadata对象,此对象实现了Future<RecordMetadata>类型的接口:

FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
                                                                   timestamp, checksum,
                                                                   key == null ? -1 : key.length,
                                                                   value == null ? -1 : value.length,
                                                                   Time.SYSTEM);

主要包含如下内容:

  • 当前batch的生产消息的请求结果
  • 当前消息的相对偏移量
  • 创建record的时间戳
  • 校验码
  • 序列化后的key的大小
  • 序列化后的value的大小
  • 时间戳
  • 由于可能存在batch过大,被分为多个batch的情况,需要一个分离batch后的record数据的连接点

这些异步任务会放在当前batch的缓存中,在batch发送后,并获取Kafka服务端的响应后,由专门的线程负责执行。

消息追加失败的结果

消息追加后,如果返回的异步任务是null,证明消息没有追加成功,此时证明当前batch已满,无法继续追加record,需要对当前batch进行关闭:

public void closeForRecordAppends() {
    recordsBuilder.closeForRecordAppends();
}

public void closeForRecordAppends() {
    if (appendStream != CLOSED_STREAM) {
        try {
            appendStream.close();
        } catch (IOException e) {
            throw new KafkaException(e);
        } finally {
            appendStream = CLOSED_STREAM;
        }
    }
}

关闭当前batch即为直接将batch的数据流进行关闭

消息追加失败,并非到此就结束了,KafkaProducer还会继续尝试创建新的batch,用于追加消息。

创建新的batch

计算新batch的大小

我们继续看RecordAccumulator#append()方法的接下来的部分:

// 获取producer最大的batch大小
byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
// 估算消息的字节数大小
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
// 申请一块用于追加当前record的对外内存
buffer = free.allocate(size, maxTimeToBlock);

由于之前存在的batch已经不允许追加消息了,那么接下来我们需要创建一个新的,在创建一个新的batch时,首先要做的就是计算新batch的大小

这里batchSize是我们设定的每个batch的大小,同时我们也要预估当前需要追加的消息大小,取二者之间的一个最大值

接下来,用这个最大值申请一块堆外内存。

创建新batch

synchronized (dq) {
    // 获取batch队列锁之后,需要重新校验producer的状态
    if (closed)
        throw new KafkaException("Producer closed while send in progress");
    // 再度尝试追加record
    RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
    if (appendResult != null) {
        // 此时可能是Sender线程将batch发送,多出一个空间
        return appendResult;
    }
    // 此时仍没有可追加的空间
    // 则使用给定内存创建一个新的batch,并向新batch中追加record
    MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
    ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
    FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
                                                                         callback, time.milliseconds()));
    // 将batch添加到当前partition的batch队列中
    dq.addLast(batch);
    // 此batch为新batch,必定为未发送的batch
    incomplete.add(batch);

    // 由于在锁中,batch还在使用这块对外内存,所以此时不要进行对外内存的接触分配
    buffer = null;
    return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false);
}
  1. 由于向当前partition的消息队列中放入新的batch,所以创建新batch的过程将使用同步操作
  2. 由于此时可能发送线程将当前partition的消息队列发送到Kafka服务端,所以再度尝试追加一次消息
  3. 如果追加成功,直接返回消息追加的异步任务。
  4. 如果仍然没有当前消息的内存空间,则开始着手创建一个新的batch。
  5. 我们先使用当前消息的版本号,创建一个MemoryRecordsBuilder
  6. 接着使用创建好的MemoryRecordsBuilder创建ProducerBatch
  7. 然后尝试向新创建的batch中追加消息
  8. 最后将创建好的batch放入到当前partition的消息队列中。
  9. 由于此batch是新创建的,必定为未发送的batch,则添加到未收到发送响应的缓存中。

消息版本号一般由当前运行的Kafka版本决定。

构建MemoryRecordsBuilder

private MemoryRecordsBuilder recordsBuilder(ByteBuffer buffer, byte maxUsableMagic) {
    // Kafka事务版本规则校验
	...
    return MemoryRecords.builder(buffer, maxUsableMagic, compression, TimestampType.CREATE_TIME, 0L);
}

构建MemoryRecordsBuilder时,使用的是我们刚刚申请的一块堆外内存消息版本号压缩类型创建时间戳类型,和基础偏移量

构建ProducerBatch

public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long createdMs, boolean isSplitBatch) {
    this.createdMs = createdMs;
    this.lastAttemptMs = createdMs;
    this.recordsBuilder = recordsBuilder;
    this.topicPartition = tp;
    this.lastAppendTime = createdMs;
    this.produceFuture = new ProduceRequestResult(topicPartition);
    this.retry = false;
    this.isSplitBatch = isSplitBatch;
    float compressionRatioEstimation = CompressionRatioEstimator.estimation(topicPartition.topic(),
                                                                            recordsBuilder.compressionType());
    recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation);
}

构建ProducerBatch时,使用的是当前topic-partition对象,我们构建好的MemoryRecordsBuilder,以及当前时间戳

  • 初始化ProducerBatch的创建时间,最近一次尝试的时间和最近一次追加的时间均为当前时间戳。
  • 根据当前topic-partition信息,构建生产请求结果,生产请求结果用于组装每次追加消息返回的异步任务。
  • 由于内存空间不足而创建的新batch,必然不是因为batch过大而分离出的batch。
  • 计算并设置当前topic和压缩类型的压缩率。

新batch的消息追加也是调用ProducerBatch#append()方法,也就是和第一次追加时调用的方法的相同,在此不再赘述。

通过上面的原理分析,我们了解到,我们发送的消息经由RecordAccumulator,存储在消息所需要发送的partition的专属消息队列中,消息队列的最小集合单位是batch,使用的是堆外内存进行存储,返回给我们的是一个包含消息存储信息和发送请求结果的异步任务。

消息发送

前半部分对消息的写入缓冲区进行了原理分析,接下来将对消息的发送流程进行分析。

构建发送线程Sender

我们在构建KafkaProducer时,会实例化一个后台线程Sender

KafkaProducer(Map<String, Object> configs,
				  Serializer<K> keySerializer,
				  Serializer<V> valueSerializer,
				  ProducerMetadata metadata,
				  KafkaClient kafkaClient,
				  ProducerInterceptors interceptors,
				  Time time) {
    ...
    this.sender = newSender(logContext, kafkaClient, this.metadata);   
        
}

继续看KafkaProducer#newSender()方法:

Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) {
    int maxInflightRequests = configureInflightRequests(producerConfig, transactionManager != null);
    // 配置的发送请求超时时间
    int requestTimeoutMs = producerConfig.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
    // 构建
    ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(producerConfig, time);
    // 生产者的计数器
    ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics);
    Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics);
    // 如果已有和Kafka服务端建立连接的客户端,则使用此客户端,否则新创建一个客户端
    KafkaClient client = kafkaClient != null ? kafkaClient : new NetworkClient(
        new Selector(producerConfig.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
                     this.metrics, time, "producer", channelBuilder, logContext),
        metadata,
        clientId,
        maxInflightRequests,
        producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
        producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
        producerConfig.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
        producerConfig.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
        requestTimeoutMs,
        ClientDnsLookup.forConfig(producerConfig.getString(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG)),
        time,
        true,
        apiVersions,
        throttleTimeSensor,
        logContext);
    // 是否需要进行重试
    int retries = configureRetries(producerConfig, transactionManager != null, log);
    // 发送请求需要的Kafka服务端副本的响应数量
    short acks = configureAcks(producerConfig, transactionManager != null, log);
    // 构建发送Sender线程
    return new Sender(logContext,
                      client,
                      metadata,
                      this.accumulator,
                      maxInflightRequests == 1,
                      producerConfig.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
                      acks,
                      retries,
                      metricsRegistry.senderMetrics,
                      time,
                      requestTimeoutMs,
                      producerConfig.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
                      this.transactionManager,
                      apiVersions);
}

这里包含了很多我们了解Kafka时的配置:

  • KafkaProducer每次发送时的超时时间
  • 与Kafka服务端进行交互的KafkaClient对象。
  • 如果发送失败,是否需要进行重试
  • 生产消息,Kafka服务端副本的响应数量
  • 构建发送线程。

继续来看Sender的构造方法:

public Sender(LogContext logContext,
              KafkaClient client,
              ProducerMetadata metadata,
              RecordAccumulator accumulator,
              boolean guaranteeMessageOrder,
              int maxRequestSize,
              short acks,
              int retries,
              SenderMetricsRegistry metricsRegistry,
              Time time,
              int requestTimeoutMs,
              long retryBackoffMs,
              TransactionManager transactionManager,
              ApiVersions apiVersions) {
    this.log = logContext.logger(Sender.class);
    // 同Kafka服务端进行交互的客户端
    this.client = client;
    // 关联的RecordAccumulator
    this.accumulator = accumulator;
    // KafkaProducer的元数据
    this.metadata = metadata;
    // 是否需要保证消息的顺序性,与max.in.flight.requests.per.connection参数设置有关
    this.guaranteeMessageOrder = guaranteeMessageOrder;
    // 请求允许的最大字节数
    this.maxRequestSize = maxRequestSize;
    // 线程的运行状态
    this.running = true;
    // 集群副本的必须响应数量
    this.acks = acks;
    // 消息发送失败的重试次数
    this.retries = retries;
    // 创建时间戳
    this.time = time;
    // 发送线程计数器
    this.sensors = new SenderMetrics(metricsRegistry, metadata, client, time);
    // 发送请求的等待时间
    this.requestTimeoutMs = requestTimeoutMs;
    // 发送失败后的等待时间
    this.retryBackoffMs = retryBackoffMs;
    // KafkaProducer的版本
    this.apiVersions = apiVersions;
    // 如果开启了,则需要传递事务管理器
    this.transactionManager = transactionManager;
    // 初始化Kafka集群未响应的batch集合
    this.inFlightBatches = new HashMap<>();
}

发送线程主要包含了以下参数:

  • 请求相关
    • 单次请求设定的最大请求字节数。
    • 发送请求的等待时间。
    • 发送请求失败后下一次发送请求的等待时间。
  • 消息相关
    • 用于存储消息的RecordAccumulator消息累加器。
    • 是否需要保证消息的发送顺序。
    • 消息发送到Kafka集群后,需要落地的副本数量。
    • 消息发送失败的重试次数。
  • Sender线程相关的
    • 与Kafka集群进行交互的KafkaClient。
    • KafkaProducer的元数据。
    • 线程运行状态更新为正在运行中。
    • 发送线程专属的计数器。
    • 初始化用于存储未收到Kafka集群响应的batch集合。

有关参数设定给大家列一下,方便理解:

参数 定义 默认值
retries 消息发送失败的重试次数 0
max.in.flight.requests.per.connection 批量发送的batch个数 5

启动发送线程Sender

KafkaProducer的构造函数中实例化发送线程Sender后,Kafka会使用KafkaThreadSender线程包装起来,并启动线程:

String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
  1. 线程名称命名为:”kafka-producer-network-thread” + cliendId
  2. 使用KafkaThread包装创建好的Sender线程。

KafkaThread是对Thread类的简单继承,目的在于设定线程的Daemon属性,和对UncaughtExceptionHandler进行处理

clientId如果没有在配置中设定,则会在当前KafkaProducer进程中递增。

Sender线程的运行

调用KafkaThread#start()之后,会执行Thread实现的run()方法,Sender#run()

执行任务

Sender线程正常情况下会执行任务:

// 主要循环,循环到调用#close()方法
while (running) {
    try {
        runOnce();
    } catch (Exception e) {
        log.error("Uncaught error in kafka producer I/O thread: ", e);
    }
}

我们初始化Sender时,就已经设定running=true,也就在线程无中断,running状态不发生变化,主线程未退出等情况下,会一直不断的执行发送任务。

Sender#runOnce()方法用于执行一次发送任务,同时需要对启动事务的发送请求做一些特殊处理。

事务发送前的特殊处理

发送前,主要针对事务做了以下的处理:

if (transactionManager != null) {
    try {
        // 在需要的情况下,重置producerId
        transactionManager.resetProducerIdIfNeeded();
        // 如果事务管理器没有处于事务状态
        if (!transactionManager.isTransactional()) {
            // 此时是一个幂等的producer,确保要有producerId
            maybeWaitForProducerId();
        } else if (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError()) {
            // 如果当前事务管理器中还有未发送完的sequence,并且事务管理器的状态不是FETAL_ERROR,则将事务管理器状态更新值FATAL_ERROR
            transactionManager.transitionToFatalError(
                new KafkaException("The client hasn't received acknowledgment for " +
                    "some previously sent messages and can no longer retry them. It isn't safe to continue."));
        } else if (maybeSendAndPollTransactionalRequest()) {
            // 需要发送和拉取事务请求,则直接返回,等待下一次请求发送
            return;
        }

        // 在事务管理器状态为FATAL_ERROR,或者在开启幂等功能条件下没有producer id,则无法继续发送
        if (transactionManager.hasFatalError() || !transactionManager.hasProducerId()) {
            RuntimeException lastError = transactionManager.lastError();
            // 获取事务管理器最后一次发生的错误
            if (lastError != null)
                // 如果累加器还有未发送的batch,则中断这些batch的发送
                maybeAbortBatches(lastError);
            // 拉取Kafka集群返回的响应,并结束本次发送请求
            client.poll(retryBackoffMs, time.milliseconds());
            return;
        } else if (transactionManager.hasAbortableError()) {
            // 如果当前事务管理器有中断错误
            // 中断RecordAccumulator中还没有发送出去的batch
            accumulator.abortUndrainedBatches(transactionManager.lastError());
        }
    } catch (AuthenticationException e) {
        // 已经以error的方式记录,但是传播到这里来进行一些清理
        log.trace("Authentication exception while processing transactional request: {}", e);
        // 设置事务管理器校验失败及异常信息
        transactionManager.authenticationFailed(e);
    }
}
  1. 在需要重置producerId的情况下,重置producerId。
  2. 如果事务管理器存在,但是没有处于事务状态,证明开启了幂等功能幂等功能需要producerId
  3. 如果当前事务管理器中还有未发送完的sequence序号,并且事务管理器的状态不是FATAL_ERROR,此时事务管理器处于异常状态,需要记录异常,并将事务管理器的状态切换至FATAL_ERROR
  4. 如果事务管理器有正在准备发送的请求,则本次发送请求不做任何事情,直接返回。

接下是针对事务管理器处于异常状态下的判断:

  1. 如果当前事务管理器处于FATAL_ERROR状态,或者开启了幂等功能,则无法继续本次的发送请求,如果事务管理器存在异常信息,则还会中断RecordAccumulator中还未发送的batch,同时执行拉取Kafka集群的请求,结束本次发送请求。
  2. 如果当前事务管理器中存在中断错误,则需要中断RecordAccumulator中还未发送出去的batch。

构建生产数据

校验完事务管理器的状态和事务的必要参数后,接下来就进入核心阶段,组装生产请求并发送:

// 获取当前时间戳
long currentTimeMs = time.milliseconds();
// 发送生产数据,这里这是构建发送数据请求,然后将请求放入到Channel中,等待执行
long pollTimeout = sendProducerData(currentTimeMs);

获取当前的时间戳之后,接下来继续看Sender#sendProducerData()

获取已经准备好发送的batch

首先,会从所有的partition中,筛选出已经准备好发送的batch:

// 从连接metadata中获取集群信息
Cluster cluster = metadata.fetch();
// 从累加器中获取处于准备发送状态的分区record累加器
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

继续看RecordAccumulator#ready()方法:

public ReadyCheckResult ready(Cluster cluster, long nowMs) {
    Set<Node> readyNodes = new HashSet<>();
    long nextReadyCheckDelayMs = Long.MAX_VALUE;
    Set<String> unknownLeaderTopics = new HashSet<>();
    // 是否有等待获取内存的线程
    boolean exhausted = this.free.queued() > 0;
    // 遍历每个partition的消息队列
    for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
        Deque<ProducerBatch> deque = entry.getValue();
        // 对双向队列进行同步操作,避免还有追加的操作
        synchronized (deque) {
            // 当向比较多的分区生产消息时,当前调用是个热区域
            // 我们通常校验第一个batch,用于避免更昂贵代价的校验
            // 获取队列中第一个封装好的batch
            ProducerBatch batch = deque.peekFirst();
            if (batch != null) {
                // 获取分区信息
                TopicPartition part = entry.getKey();
                // 从集群信息中获取当前partition的leader节点
                Node leader = cluster.leaderFor(part);
                if (leader == null) {
                    // 如果当前分区不存在leader节点,添加到未知leader节点的topic集合中
                    // leader节点不存在的情况下,也是可以进行消息发送的
                    // 需要注意的是,即使双向队列为空,也不会将元素从batch中移除
                    unknownLeaderTopics.add(part.topic());
                } else if (!readyNodes.contains(leader) && !isMuted(part, nowMs)) {
                    // 此时partition存在leader节点
                    // 如果就绪节点集合中没有当前leader节点,并且当前partition不处于静默状态

                    // 获取当前batch的已经等待的时间
                    long waitedTimeMs = batch.waitedTimeMs(nowMs);
                    // 是否需要进行重试
                    boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
                    long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
                    // 判断batch是否处于满的状态,需要消息队列有至少两个batch
                    boolean full = deque.size() > 1 || batch.isFull();
                    // 是否超过等待时间
                    boolean expired = waitedTimeMs >= timeToWaitMs;
                    // 判断是否可发送
                    boolean sendable = full || expired || exhausted || closed || flushInProgress();
                    // 如果可发送,且无需进行重试,则将此partition对应的leader节点置为准备就绪状态
                    if (sendable && !backingOff) {
                        readyNodes.add(leader);
                    } else {
                        // 如果不可发送,或者需要进行重试
                        // 计算剩余等待时间
                        long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
                        // 注意:这是一个保守的估计结果
                        // 因为一个不可发送数据的分区可能会有一个临界点,不久之后会发现拥有可发送的数据
                        // 然而这已经足够好了,因为我们只是唤醒一下,然后就会继续sleep剩余的持续时间
                        // 所以取剩余等待时间和下一次就绪状态检查时间的最小值
                        nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
                    }
                }
            }
        }
    }
    // 包装就绪检查结果
    // 参数包括,就绪的节点,下一次就绪检查的时间,以及没有首领节点的topic集合
    return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
}
  1. 遍历每一个RecordAccumulator中存储的partition的消息队列,操作每个消息队列时需要进行同步操作
  2. 为了节约资源,我们每次只需校验每个partition的第一个batch即可。
  3. 首先,需要校验partition对应的leader节点是否存活
  4. 接着,需要判断当前batch已经等待发送的时间剩余等待发送的时间是否需要进行重试,重试的等待时间,是否已经过了需要发送的时间
  5. 然后,根据上一步计算出的各种条件,判断当前是否满足发送条件,如果满足发送条件,则将partition对应的leader节点添加到准备就绪的节点中。
  6. 否则,不满足条件的情况下,需要计算下一次重试发送的时间。

我们最后发现,其实返回的并不是准备好发送的batch,而是准备就绪的leader节点

这样做的意义是,由于leader节点存在多个partition的情况,所以我们在和同一个leader节点建立连接后,可以把leader节点上的partition消息队列一次全部传输过去。

尝试更新集群元数据

由于在获取准备就绪的leader节点过程中,存在未知leader节点的问题,所以我们需要重新拉取一次集群的元数据,看剩余的topic是否已经拥有leader节点:

if (!result.unknownLeaderTopics.isEmpty()) {
   // 这个集合既包括了正在选举主节点的topic,也包括已经失效的topic,
   // 将它们继续添加到metadata中,以确保被metadata包含在内,然后进行一次metadata更新,因为有消息仍需要发送到这些topic上
   for (String topic : result.unknownLeaderTopics)
      this.metadata.add(topic);

   log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",
         result.unknownLeaderTopics);
   // 进行一次metadata更新
   this.metadata.requestUpdate();
}
对leader节点进行筛选

接下来,对已经准备就绪的leader节点进行一次筛选,排除那些可能无法连接的leader节点

Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
    Node node = iter.next();
    if (!this.client.ready(node, now)) {
        iter.remove();
        notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
    }
}
汇总每个leader节点需要发送的batch

接下来就将batch的维度从partition转换到leader节点:

// 创建生产请求,汇总每个leader节点需要发送的batch列表
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
// 添加到运行中的batch集合中
addToInflightBatches(batches);

我们继续看RecordAccumulator#drain()方法:

public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) {
    if (nodes.isEmpty())
        return Collections.emptyMap();

    Map<Integer, List<ProducerBatch>> batches = new HashMap<>();
    for (Node node : nodes) {
        // 汇总每个节点需要发送的batch列表
        List<ProducerBatch> ready = drainBatchesForOneNode(cluster, node, maxSize, now);
        batches.put(node.id(), ready);
    }
    return batches;
}

继续RecordAccumulator#drainBatchesForOneNode()方法:

private List<ProducerBatch> drainBatchesForOneNode(Cluster cluster, Node node, int maxSize, long now) {
    int size = 0;
    // 获取当前节点的partition信息
    List<PartitionInfo> parts = cluster.partitionsForNode(node.id());

    List<ProducerBatch> ready = new ArrayList<>();
    // 为了减少竞争,循环不从索引0开始
    int start = drainIndex = drainIndex % parts.size();
    do {
        // 获取指定索引的partition
        PartitionInfo part = parts.get(drainIndex);
        // 创建topic-partition对象,用于存储关联信息
        TopicPartition tp = new TopicPartition(part.topic(), part.partition());
        // 计算下一个需要进行排空的索引
        this.drainIndex = (this.drainIndex + 1) % parts.size();

        // 如果当前partition请求处于静默状态,继续遍历下一个partition
        if (isMuted(tp, now))
            continue;
        // 获取当前partition的batch队列
        Deque<ProducerBatch> deque = getDeque(tp);
        if (deque == null)
            // 如果队列为空,继续遍历下一个partition
            continue;
        // 队列不为空的情况下,需要对队列进行同步操作
        synchronized (deque) {
            // 获取当前partition batch队列的第一个batch,但不取出
            ProducerBatch first = deque.peekFirst();
            // 如果不存在第一个batch,继续遍历下一个partition
            if (first == null)
                continue;

            // 如果存在第一个batch
            // 判断是否需要进行失败重试
            boolean backoff = first.attempts() > 0 && first.waitedTimeMs(now) < retryBackoffMs;
            // 需要进行失败重试,继续遍历下一个partition
            if (backoff)
                continue;
            // 如果大小总和已经超过当次发送总大小的阈值
            if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) {
                // 不需要继续发送了,跳出循环
                break;
            } else {
                // 如果没有超出发送总大小阈值
                // 首先对是否停止排空进行验证,主要是对事务状态进行校验
                if (shouldStopDrainBatchesForPartition(first, tp))
                    break;
                // 判断当前partition是否处于事务状态
                boolean isTransactional = transactionManager != null && transactionManager.isTransactional();
                // 生成Kafka集群需要用到的producer id和epoch版本
                ProducerIdAndEpoch producerIdAndEpoch =
                    transactionManager != null ? transactionManager.producerIdAndEpoch() : null;
                // 此时需要获取第一个batch
                ProducerBatch batch = deque.pollFirst();
                // 如果需要produce id以及epoch,同时batch不需要序列号
                if (producerIdAndEpoch != null && !batch.hasSequence()) {
                    // 如果当前batch已经分配了一个序列号,我们就不要去修改produce id和序列号,因为可能会产生重复的情况
                    // 尤其是在前一次存在可能被接受的情况,并且如果我们修改了producer id和序列号,本次尝试也可能会被接受,引发了重复
                    // 除此之外,我们会更新为当前partition更新下一个序列号,同时也可以用事务管理器记录batch,这样即使在接收到顺序之外的请求,我们也能根据序列号继续确保维持序列号的顺序
                    batch.setProducerState(producerIdAndEpoch, transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
                    transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount);
                    log.debug("Assigned producerId {} and producerEpoch {} to batch with base sequence " +
                              "{} being sent to partition {}", producerIdAndEpoch.producerId,
                              producerIdAndEpoch.epoch, batch.baseSequence(), tp);

                    transactionManager.addInFlightBatch(batch);
                }
                // 关闭batch
                batch.close();
                // 修改传输的字节数大小
                size += batch.records().sizeInBytes();
                // 在就绪batch集合中添加当前batch
                ready.add(batch);
                // 设置当前的排空时间
                batch.drained(now);
            }
        }
    } while (start != drainIndex);
    return ready;
}
  1. 此时情况反过来,我们需要以当前leader节点上关联的partition集合,去RecordAccumulator中寻找指定的partition的消息队列。
  2. 由于可能存在竞争的条件下,则遍历不会从索引0开始,
  3. 在partition处于正常状态下,才可以继续后续动作。
  4. 我们尝试获取但不取出消息队列中的第一个batch,用以判断是否需要进行重试,以及计算是否超出请求的最大值。
  5. 如果满足发送数据大小条件限制,则会准备一些发送请求的参数:
    • 事务管理器状态是否满足条件。
    • 处于事务状态下,则还需要ProducerIdEpoch的必备参数对象。
    • 处于事务状态下,还需要获取当前batch的sequence序号,以及生成下一个batch的sequence序号。
  6. 获取有关事务的必要条件后,我们关闭当前batch,不允许继续写入batch。
  7. 最后进行收尾工作,更新当前leader节点需要传输的字节数,并将batch添加到就绪集合中,并设置当前batch的准备就绪的时间。

sequence序号用于Kafka集群验证幂等性。

此时已经获取所有leader节点和每个leader节点需要发送的batch列表。

调整batch的顺序

在需要保证消息顺序的情况下,我们需要将本次将要发送的partition置为静默状态:

// 在需要保证消息顺序的情况下
if (guaranteeMessageOrder) {
    // 关闭所有排空的partition
    for (List<ProducerBatch> batchList : batches.values()) {
        for (ProducerBatch batch : batchList)
            this.accumulator.mutePartition(batch.topicPartition);
    }
}
处理异常状况的batch

接下来将会对处于已经发送但是等待响应超时,已经失效的batch的进行汇总,合并为失效的batch集合:

// 重置下一个batch的失效时间
accumulator.resetNextBatchExpiryTime();
// 获取已经发送超时的正在运行的batch集合
List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
// 获取当前累加器的所有失效的batch集合
List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
// 汇总两种情况的失效集合
expiredBatches.addAll(expiredInflightBatches);

对于失效的batch,我们会进行如下处理:

// 遍历所有失效的batch,将它们置为失效
for (ProducerBatch expiredBatch : expiredBatches) {
    String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
        + ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
    // 对失效batch执行失败操作
    failBatch(expiredBatch, -1, NO_TIMESTAMP, new TimeoutException(errorMessage), false);
    // 在拥有事务管理器的情况下,并且失效的batch进入了重试
    if (transactionManager != null && expiredBatch.inRetry()) {
        // 标记一下batch还没有发送成功,不允许接着分配sequence序号
        transactionManager.markSequenceUnresolved(expiredBatch.topicPartition);
    }
}
  1. 如果是事务型的KafkaProducer,则会抛出异常。
  2. 如果是非事务型KafkaProducer,但是开启了幂等功能,则需要重置producer id,清空不允许分配sequence序号的缓存。
  3. 记录错误信息。
  4. 执行batch的异步回调任务,将错误信息告知开发者。
  5. 将batch移除正在处理的batch集合中,并释放batch占用的堆外内存
发送生产请求

处理完之前发送的失效的batch之后,接下来需要发送生产请求:

long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
// 等待时间必须大于0
pollTimeout = Math.max(pollTimeout, 0);
// 如果有准备就绪的节点,
if (!result.readyNodes.isEmpty()) {
    log.trace("Nodes with data ready to send: {}", result.readyNodes);
    // 如果一些分区已经准备好发送数据了,select时间为0
    // 其它情况下,如果一些分区拥有累加的数据,但是还没有就绪,那么select时间为当前时间戳和过期时间之差
    // 在上述情况的其它情况下,select时间是当前时间戳和集群metadata过期时间之差
    pollTimeout = 0;
}
// 构建生产消息请求,返回获取结果的等待时间
sendProduceRequests(batches, now);
return pollTimeout;
  1. 首先我们需要计算获取Kafka集群返回响应的等待时间:
    • nextReadyCheckDelayMs为所有batch的剩余等待时间的最小值。
    • notReadyTimeout当前客户端的处理上一次请求的剩余等待时间。
    • RecordAccumulator的剩余失效时间
    • 将取三者之间的最小值,但是等待时间必须≥0。
  2. 如果有leader节点已经准备就绪,直接将等待时间置为0。
  3. 发送生产请求。
  4. 返回需要进行等待的时间。

我们继续看发送生产请求的执行过程,Sender#sendProducerRequests()

private void sendProduceRequests(Map<Integer, List<ProducerBatch>> collated, long now) {
    for (Map.Entry<Integer, List<ProducerBatch>> entry : collated.entrySet())
        sendProduceRequest(now, entry.getKey(), acks, requestTimeoutMs, entry.getValue());
}

发送过程,会对每个leader节点单独执行发送,除了发送的batch列表,还包括当前请求时间请求的超时时间acks参数。

那么是如何发送到指定的leader节点的呢?

我们继续看Sender#sendProduceRequest()

private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
    if (batches.isEmpty())
        return;
    // partition以及partition需要传输的batch集合
    Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size());
    // partition以及partition传输的batch集合
    final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());

    // 获取版本号
    byte minUsedMagic = apiVersions.maxUsableProduceMagic();
    for (ProducerBatch batch : batches) {
        // 向后兼容
        if (batch.magic() < minUsedMagic)
            minUsedMagic = batch.magic();
    }
    // 遍历每个batch
    for (ProducerBatch batch : batches) {
        TopicPartition tp = batch.topicPartition;
        // 获取每个batch的record
        MemoryRecords records = batch.records();

        // 通常,在producer构建batch和发送写入请求之间,是允许出现一段延迟的,我们还可能会基于过时的metadata选择消息的格式
        // 在最坏的情况下,我们将会乐观的选择去用新的消息格式,但是发现broker并不支持,所以我们就需要向下转换,以便broker能支持
        // 向下转换已在处理在集群中中出现的边界case,因为broker可能在不同版本中并不支持同样的消息格式
        // 举个例子,如果一个partition从一个支持的版本,迁移到一个不支持的版本,我们就需要进行转换
        if (!records.hasMatchingMagic(minUsedMagic))
            records = batch.records().downConvert(minUsedMagic, 0, time).records();
        // 向两种集合中添加不同分类的待发送记录
        produceRecordsByPartition.put(tp, records);
        recordsByPartition.put(tp, batch);
    }
    // 获取transaction id
    String transactionalId = null;
    if (transactionManager != null && transactionManager.isTransactional()) {
        transactionalId = transactionManager.transactionalId();
    }
    // 根据版本,构建request
    ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,
                                                                            produceRecordsByPartition, transactionalId);
    // 构建请求完成后的回调任务
    RequestCompletionHandler callback = response -> handleProduceResponse(response, recordsByPartition, time.milliseconds());

    String nodeId = Integer.toString(destination);
    // 创建请求
    ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0,
                                                          requestTimeoutMs, callback);
    // 发送请求,请求完成后,会执行回调
    client.send(clientRequest, now);
    log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
}

代码虽然比较多,但是逻辑比较清晰:

  1. 首先也是要遍历当前leader节点需要发送的batch列表,对其中不符合版本条件的batch进行转换,使其符合发送的版本条件
  2. 在遍历过程中会生成两个集合:
    • topic-partition=>MemoryRecords集合用于发送给Kafka集群。
    • topic-partition=>ProducerBatch用与构建响应的回调任务。
  3. 然后,根据要发送的数据acks请求超时时间事务ID当前消息最低版本类型构建生产消息请求建造者对象
  4. 接着,构建当前leader节点写入消息后返回的响应的回调任务。
  5. 最后,根据生产请求建造者对象和回调任务创建客户端请求,由客户端发送请求。

Sender线程的关闭

当Sender线程的运行状态running置为false后,将不再继续执行runOnce()。

如果我们不是强制关闭,则会有一个关闭的过程:

// 我们停止接收请求,但是可能还会有请求在事务管理器中、累加器中,或者等待ack
// 当前发送线程将一直等待所有请求完成
while (!forceClose && ((this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0) || hasPendingTransactionalRequests())) {
    // 所有请求完成之后,再执行一次
    try {
        runOnce();
    } catch (Exception e) {
        log.error("Uncaught error in kafka producer I/O thread: ", e);
    }
}

此时的情况是,RecordAccumulator中仍有需要发送的batch,我们还会在进行一次发送。

针对事务存在的情况:

// 在任何提交或者中断,没有经过事务管理器的队列情况下,中断事务
while (!forceClose && transactionManager != null && transactionManager.hasOngoingTransaction()) {
    // 事务管理器没有完成的情况下,进行中断
    if (!transactionManager.isCompleting()) {
        log.info("Aborting incomplete transaction due to shutdown");
        transactionManager.beginAbort();
    }
    try {
        // 再执行一次发送
        runOnce();
    } catch (Exception e) {
        log.error("Uncaught error in kafka producer I/O thread: ", e);
    }
}

如果事务处理器也没有处理完所有的事务,也需要再进行一次发送。

如果调用的是强制关闭:

if (forceClose) {
    // 存在事务管理器,则关闭事务管理器
    // 此时,我们需要让所有未完成的事务请求或者batch失败,然后唤醒因为异步请求,现在仍然等待的获取结果的线程
    if (transactionManager != null) {
        log.debug("Aborting incomplete transactional requests due to forced shutdown");
        transactionManager.close();
    }
    log.debug("Aborting incomplete batches due to forced shutdown");
    // 中断没有完成的batch
    this.accumulator.abortIncompleteBatches();
}

则中断所有未完成的batch,粗暴的进行关闭。

随后关闭KafkaClient,此后将不会发送任何请求。

try {
	// 关闭client
	this.client.close();
} catch (Exception e) {
	log.error("Failed to close network client", e);
}

总结

在消息发送上,Kafka采取了异步模型,模块各司其职:

  • RecordAccumulator,负责缓存需要发送的partition的消息队列。
  • Sender,是一个后台线程,生命周期与KafkaProducer线程相同,负责定期将RecordAccumulator中的数据以leader节点=>batch列表的维度,发送给Kafka集群。
  • 接收到Kafka集群返回的响应后,会对batch中的每一条record进行回调处理
赞(5) 投币
未经允许不得转载:随遇而安 » Kafka研究院(2)——消息的异步发送
分享到: 更多 (0)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

慕勋的实验室慕勋的研究院

码字不容易,路过请投币

支付宝扫一扫打赏

微信扫一扫打赏