Danh mục:
Tài liệu này tổng hợp quá trình cải thiện Message Dispatcher sử dụng Kotlin Coroutines trong kiến trúc xử lý tin nhắn bất đồng bộ dựa trên Kafka, kèm theo các ví dụ mã nguồn (code) theo từng bước.
Bước 1: Dispatcher cơ bản
Chức năng:
- Nhận tin nhắn thông qua Channel và chuyển đến Handler.
- Chỉ xuất log (ghi nhật ký) khi xảy ra ngoại lệ.
Cấu trúc cơ bản Kafka Consumer → Coroutine Channel → Message Dispatcher → Message Handler
Vấn đề (Hạn chế)
- Không có biện pháp xử lý tiếp theo nào khi xảy ra ngoại lệ.
- Có khả năng tin nhắn bị tiêu thụ lặp lại (duplicate consumption) hoặc bị thất lạc (data loss).
class MessageDispatcher(
private val channel: ReceiveChannel<DomainMessage>,
private val handler: MessageHandler
) {
suspend fun start() {
for (message in channel) {
try {
handler.handle(message)
} catch (e: Exception) {
println("❌ Error: ")
}
}
}
}
Bước 2: Thêm chính sách xử lý ngoại lệ
Chức năng
- Phân loại ngoại lệ thông qua
shouldRetry(e)vàshouldStopConsuming(e). - Có thể phân biệt giữa lỗi tạm thời (transient error) và lỗi nghiêm trọng (fatal error).
interface ExceptionHandlingPolicy { fun shouldRetry(e: Throwable): Boolean fun shouldStopConsuming(e: Throwable): Boolean } class BasicPolicy : ExceptionHandlingPolicy { override fun shouldRetry(e: Throwable) = false override fun shouldStopConsuming(e: Throwable) = e is IllegalStateException }
Bên trong Dispatcher:
if (policy.shouldStopConsuming(e)) { scope.cancel("Fatal", e) }
Bước 3: Áp dụng RetryQueue / DLQ
Chức năng
- Lưu các ngoại lệ tạm thời (như IOException) vào RetryQueue để làm đối tượng thử lại.
- Lưu các ngoại lệ không nghiêm trọng nhưng cũng không thuộc diện cần thử lại vào DeadLetterQueue
interface RetryQueue { suspend fun enqueue(message: DomainMessage, reason: Throwable) } interface DeadLetterQueue { suspend fun publish(message: DomainMessage, reason: Throwable) }
Bước 4: Thêm commit offset khi Retry
Chức năng
- Lưu tin nhắn cần thử lại (retry) vào retryQueue, sau đó thực hiện commit offset lên Kafka.
- Đảm bảo tin nhắn tương tự không bị tiêu thụ lại từ Kafka.
if (policy.shouldRetry(e)) {
retryQueue.enqueue(message, e)
commitOffset(message.offset)
}
Bước 5: Ngăn chặn trùng lặp DLQ và xử lý bỏ qua (Skip)
Chức năng
- Không ghi lại những tin nhắn đã được lưu trong DLQ.
- Dispatcher sẽ bỏ qua các tin nhắn đang có trong DLQ và thực hiện commit offset ngay lập tức.
class InMemoryDeadLetterQueue : DeadLetterQueue { private val recorded = ConcurrentHashMap.newKeySet<Long>() override suspend fun publish(message: DomainMessage, reason: Throwable) { if (recorded.add(message.offset)) { println("DLQ: offset=") } } override fun isRecorded(offset: Long): Boolean = recorded.contains(offset) }
Bên trong Dispatcher:
if (deadLetterQueue.isRecorded(message.offset)) { commitOffset(message.offset) continue }
Bước 6: Cấu trúc xử lý ngoại lệ tích hợp cuối cùng
Ví dụ xử lý ngoại lệ bên trong khối launch của Dispatcher:
launch {
val result = retryHandler.withRetry(message) {
handler.handle(message)
}
result.onSuccess {
commitOffset(message.offset)
}.onFailure { e ->
when {
policy.shouldRetry(e) -> {
retryQueue.enqueue(message, e)
commitOffset(message.offset)
}
policy.shouldStopConsuming(e) -> {
scope.cancel("Stopping consumer due to fatal error", e)
}
else -> {
deadLetterQueue.publish(message, e)
commitOffset(message.offset)
}
}
}
}
Tóm tắt cấu trúc Dispatcher
- Nhận tin nhắn Kafka thông qua Coroutine Channel.
- Xử lý rẽ nhánh dựa trên chính sách ngoại lệ (shouldRetry, shouldStopConsuming).
- Bỏ qua tin nhắn DLQ và ngăn chặn ghi trùng lặp.
- Làm rõ việc commit offset đối với tin nhắn thử lại (retry) và tin nhắn DLQ.
Tóm tắt luồng xử lý tin nhắn
- Xử lý thành công bình thường → commitOffset
- Ngoại lệ tạm thời (có thể thử lại) → RetryQueue.enqueue + commitOffset
- Ngoại lệ cần dừng hệ thống → scope.cancel(...)
- Các lỗi khác (không thể phục hồi) → DeadLetterQueue.publish + commitOffset
Tóm tắt nguyên tắc thiết kế
- Ngăn chặn tiêu thụ trùng lặp tin nhắn (đáp ứng at-least-once).
- Đảm bảo tính ổn định của luồng thông qua việc kiểm soát offset Kafka.
- Kiểm soát Backpressure bất đồng bộ (Semaphore).
- Đảm bảo khả năng mở rộng dựa trên chính sách Retry / DLQ.
Khả năng mở rộng trong tương lai
- Liên kết DLQ, RetryQueue → Kafka Topic.
- Mở rộng RetryHandler → Chiến lược Exponential Backoff (lùi lũy thừa).
- Tăng cường giám sát bằng cách tích hợp Micrometer, Prometheus.
- Tích hợp với Spring Integration/WebFlux.
Ví dụ về các dependency cần thiết khi sử dụng
dependencies {
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core")
testImplementation("org.junit.jupiter:junit-jupiter")
}
Toàn bộ mã nguồn (Full Code)
data class DomainMessage( val key: String, val value: String, val offset: Long )
interface MessageHandler { suspend fun handle(message: DomainMessage) }
interface ExceptionHandlingPolicy { fun shouldRetry(e: Throwable): Boolean fun shouldStopConsuming(e: Throwable): Boolean }
class SeverityBasedPolicy : ExceptionHandlingPolicy { override fun shouldRetry(e: Throwable) = e is IOException || e is TimeoutCancellationException override fun shouldStopConsuming(e: Throwable) = e is IllegalStateException || e is OutOfMemoryError }
interface RetryQueue { suspend fun enqueue(message: DomainMessage, reason: Throwable) }
class InMemoryRetryQueue : RetryQueue { private val retried = mutableListOf<Pair<DomainMessage, Throwable>>() override suspend fun enqueue(message: DomainMessage, reason: Throwable) { println("🔁 RETRY: offset=, reason=") retried.add(message to reason) } }
interface DeadLetterQueue { suspend fun publish(message: DomainMessage, reason: Throwable) fun isRecorded(offset: Long): Boolean }
import java.util.concurrent.ConcurrentHashMap class InMemoryDeadLetterQueue : DeadLetterQueue { private val deadSet = ConcurrentHashMap.newKeySet<Long>() private val messages = mutableListOf<Pair<DomainMessage, Throwable>>() override suspend fun publish(message: DomainMessage, reason: Throwable) { if (deadSet.add(message.offset)) { println("🔴 DLQ: offset=, reason=") messages.add(message to reason) } else { println("Ignore adding since already in: offset=") } } override fun isRecorded(offset: Long): Boolean = deadSet.contains(offset) }
interface RetryHandler { suspend fun <T> withRetry( message: DomainMessage, block: suspend () -> T ): Result<T> }
import java.util.concurrent.ConcurrentHashMap import kotlinx.coroutines.delay class FixedDelayIntervalRetryHandler( private val maxAttempts: Int, private val delayMillis: Long ) : RetryHandler { private val retryCounts = ConcurrentHashMap<String, Int>() override suspend fun <T> withRetry( message: DomainMessage, block: suspend () -> T ): Result<T> { val key = message.offset.toString() val attempts = retryCounts.getOrDefault(key, 0) return try { val result = block() retryCounts.remove(key) Result.success(result) } catch (e: Throwable) { if (attempts + 1 >= maxAttempts) { retryCounts.remove(key) Result.failure(e) } else { retryCounts[key] = attempts + 1 delay(delayMillis) Result.failure(e) } } } }
interface CoroutineMessageDispatcher { suspend fun start(scope: CoroutineScope) }
class DefaultCoroutineMessageDispatcher( private val channel: ReceiveChannel<DomainMessage>, private val handlers: List<MessageHandler>, private val policy: ExceptionHandlingPolicy, private val commitOffset: suspend (Long) -> Unit, private val retryHandler: RetryHandler, private val deadLetterQueue: DeadLetterQueue, private val retryQueue: RetryQueue ) : CoroutineMessageDispatcher { private val lastCommittedOffset = AtomicLong(-1L) override suspend fun start(scope: CoroutineScope) { val semaphore = Semaphore(handlers.size) for (handler in handlers) { scope.launch { for (message in channel) { if (deadLetterQueue.isRecorded(message.offset)) { println("Skip DLQ’d message at offset=") commitOffset(message.offset) lastCommittedOffset.set(message.offset) continue } semaphore.acquire() launch { try { val result = retryHandler.withRetry(message) { println("Handling message: $message") handler.handle(message) } result.onSuccess { commitOffset(message.offset) lastCommittedOffset.set(message.offset) }.onFailure { e -> when { policy.shouldRetry(e) -> { retryQueue.enqueue(message, e) commitOffset(message.offset) lastCommittedOffset.set(message.offset) } policy.shouldStopConsuming(e) -> { println("Fatal error - stop consuming") scope.cancel("Fatal exception. Stopping dispatcher.", e) } else -> { deadLetterQueue.publish(message, e) commitOffset(message.offset) lastCommittedOffset.set(message.offset) } } } } catch (e: Throwable) { println("Dispatcher error: ") scope.cancel("Dispatcher failed", e) } finally { semaphore.release() } } } } } } }
import java.io.IOException import kotlin.time.Duration.Companion.seconds import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay import kotlinx.coroutines.isActive import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.Test class MessageDispatcherTest { @Test fun `dispatcher processes messages and handles exceptions`() = runBlocking { val channel = Channel<DomainMessage>(capacity = 100) val dispatcher = DefaultCoroutineMessageDispatcher( channel = channel, handlers = listOf(object : MessageHandler { override suspend fun handle(message: DomainMessage) { when { message.offset % 5 == 0L -> throw IOException("Temporary error") message.offset % 6 == 0L -> throw RuntimeException("To dead-letter-queue") message.offset % 7 == 0L -> throw IllegalStateException("Fatal error") else -> println("✅ handled offset=") } } }), policy = SeverityBasedPolicy(), commitOffset = { offset -> println("☑️ commit offset=$offset") }, retryHandler = FixedDelayIntervalRetryHandler(3, 100), deadLetterQueue = InMemoryDeadLetterQueue(), retryQueue = InMemoryRetryQueue() ) val scope = CoroutineScope(Dispatchers.Default + SupervisorJob()) scope.launch { var offset = 0L while (isActive) { delay(10) channel.send(DomainMessage("key-$offset", "value-$offset", offset++)) } } dispatcher.start(scope) delay(5.seconds) scope.cancel() } }
Nguồn bài viết ryukato.github.io