Danh mục:
Saga Pattern là gì?
Trong môi trường kiến trúc Microservices (MSA), để đảm bảo tính nhất quán của giao dịch giữa các dịch vụ bị phân tán, Saga Pattern là mô hình kết hợp Giao dịch cục bộ (Local Transaction) của từng dịch vụ với Giao dịch bồi hoàn (Compensating Transaction) nhằm duy trì sự thống nhất cho toàn bộ quy trình giao dịch.
Tại sao lại cần thiết?
- Trong MSA, mỗi dịch vụ có cơ sở dữ liệu (DB) riêng biệt, nên rất khó để sử dụng Giao dịch phân tán truyền thống (2PC - Two-Phase Commit).
- Các tác vụ như gọi hệ thống bên ngoài không thể thực hiện rollback giao dịch theo cách thông thường.
- Để giải quyết vấn đề này, cần một cấu trúc mà ở đó nếu bất kỳ bước nào thất bại, các giao dịch bồi hoàn sẽ được thực hiện để hoàn tác các bước trước đó.
Thành phần cấu tạo
- SagaStep: Đối tượng quản lý trạng thái của từng bước cụ thể (giao dịch cục bộ).
- SagaService: Dịch vụ domain thực hiện việc xử lý chuyển đổi trạng thái và kiểm tra tính nhất quán.
- SagaOrchestrator: Bộ điều phối trung tâm triển khai luồng công việc dựa trên sự kiện và cập nhật trạng thái.
- Kafka: Hệ thống môi giới tin nhắn (message broker) để truyền tin nhắn/sự kiện giữa các dịch vụ.
- EventPublisher/ EventHandler: Lớp trung gian thực hiện phát hành sự kiện domain và truyền chúng tới Kafka.
Tổng kết luồng hoạt động (Workflow)
- [Domain Service] (Ví dụ: PaymentService - Dịch vụ thanh toán) → Xử lý giao dịch cục bộ (Local Transaction). → Phát hành Sự kiện Domain (Domain Event) tùy theo kết quả thành công hay thất bại.
- [Spring EventPublisher] → Truyền sự kiện đến hệ thống sự kiện nội bộ.
- [Event Handler] (Ví dụ: PaymentEventHandler) → Tiếp nhận sự kiện. → Gửi tin nhắn (message) đến Kafka.
- [Kafka Broker] → Lan tỏa sự kiện (Event Propagation).
- [Saga Orchestrator] (Ví dụ: OrderSagaOrchestrator) → Tiếp nhận tin nhắn từ Kafka. → Cập nhật đối tượng trạng thái Saga (SagaInstance/SagaStep). → Lan tỏa lệnh (command) đến bước tiếp theo hoặc thực thi lệnh bồi hoàn (compensation).
- [Saga Service] → Kiểm tra tính nhất quán của trạng thái Saga, xử lý chuyển đổi trạng thái (state transition), xác định xem tiến trình đã hoàn tất hay chưa.
Thiết kế bảng theo dõi trạng thái Saga
- saga_id: ID duy nhất của Saga (UUID)
- saga_type: Loại Saga (Ví dụ: OrderSaga)
- status: Trạng thái tổng thể của Saga (STARTED, IN_PROGRESS, COMPLETED, FAILED, COMPENSATED, v.v.)
- created_at / updated_at: Dấu thời gian (Timestamp)
Bảng saga_step
- step_id: ID duy nhất của bước
- saga_id: Tham chiếu đến Saga cha
- step_name: Tên bước (Ví dụ: ProcessPayment)
- status: Trạng thái của bước (PENDING, COMPLETED, FAILED, COMPENSATED)
- order_index: Thứ tự thực hiện
- last_error: Lý do thất bại
- compensated_at: Thời gian hoàn tất giao dịch bồi hoàn
Ví dụ mã nguồn: Domain Event → Kafka → Orchestrator
PaymentService.java
@Service @RequiredArgsConstructor public class PaymentService { private final PaymentRepository paymentRepository; private final ApplicationEventPublisher eventPublisher; @Transactional public void processPayment(PaymentRequest request) { try { // 1. Xử lý logic thanh toán cục bộ Payment payment = paymentRepository.save(request.toEntity()); // 2. Phát hành sự kiện thành công nội bộ eventPublisher.publishEvent(new PaymentSucceededEvent(payment.getSagaId())); } catch (Exception e) { // 3. Phát hành sự kiện thất bại nội bộ khi có lỗi eventPublisher.publishEvent(new PaymentFailedEvent(request.getSagaId(), e.getMessage())); throw e; } } }
PaymentEventHandler.java
@Component public class PaymentEventHandler { private final KafkaTemplate<String, Object> kafkaTemplate; @EventListener public void on(PaymentFailedEvent event) { kafkaTemplate.send("payment.failed", event); } @EventListener public void on(PaymentCompletedEvent event) { kafkaTemplate.send("payment.completed", event); } }
OrderSagaOrchestrator.java
@Component public class OrderSagaOrchestrator { private final SagaService sagaService; private final KafkaTemplate<String, Object> kafka; @KafkaListener(topics = "payment.failed") public void handlePaymentFailed(PaymentFailedEvent event) { sagaService.markStepFailed(event.getSagaId(), "ProcessPayment", event.getReason()); kafka.send("order.cancel", new CancelOrderCommand(event.getOrderId())); } @KafkaListener(topics = "payment.completed") public void handlePaymentCompleted(PaymentCompletedEvent event) { sagaService.markStepCompleted(event.getSagaId(), "ProcessPayment"); kafka.send("inventory.reserve", new ReserveInventoryCommand(event.getOrderId())); } }
SagaService.java
public class SagaService { private final SagaRepository sagaRepository; public void markStepCompleted(UUID sagaId, String stepName) { SagaInstance saga = sagaRepository.findByIdWithSteps(sagaId); SagaStep step = saga.getStep(stepName); step.markCompleted(); saga.updateStatusIfNeeded(); sagaRepository.save(saga); } public void markStepFailed(UUID sagaId, String stepName, String reason) { SagaInstance saga = sagaRepository.findByIdWithSteps(sagaId); SagaStep step = saga.getStep(stepName); step.markFailed(reason); saga.markFailed(); sagaRepository.save(saga); } }
Luồng xử lý thử lại (Retry) và Trễ (Delay)
- Khi cần thực hiện lại, PaymentService sẽ tự thực hiện retry nội bộ.
- Sự kiện (Event) chỉ được phát hành khi kết quả Thành công hoặc Thất bại đã được xác định cuối cùng.
- SagaOrchestrator sẽ triển khai luồng xử lý dựa trên thời điểm nhận được sự kiện đó.
Tóm tắt Q&A
Q1: Nếu việc thử lại thanh toán mất nhiều thời gian, liệu luồng Saga có bị lỗi không? Trả lời: Không. Vì SagaOrchestrator xử lý dựa trên ý nghĩa của sự kiện nhận được, nên luồng công việc vẫn có thể tiếp tục mà không bị ảnh hưởng bởi việc thử lại hay độ trễ.
Q2: Tại sao Domain Service không gọi trực tiếp Kafka? Trả lời: Để tách biệt các mối quan tâm (Separation of Concerns). Bằng cách ủy thác trách nhiệm nhắn tin cho tầng hạ tầng (Infrastructure) và để logic domain chỉ thực hiện phát hành sự kiện, việc kiểm thử, bảo trì và mở rộng sẽ trở nên tốt hơn.
Q3: Giao dịch bồi hoàn (Compensating Transaction) được thực hiện như thế nào? Trả lời: Khi phát hiện một bước thất bại, SagaOrchestrator sẽ gửi lệnh bồi hoàn qua Kafka để dịch vụ tương ứng thực hiện các tác vụ hoàn tác (ví dụ: hoàn tiền, khôi phục kho hàng).
Đảm bảo thứ tự tin nhắn Kafka và Thiết kế Partition Key
Kafka đảm bảo thứ tự tin nhắn trong cùng một Partition, nhưng không đảm bảo trên toàn bộ Topic.
Tại sao thứ tự lại quan trọng?
- Trong luồng sự kiện Saga, các vấn đề về thứ tự sau đây có thể xảy ra:
- [PaymentCompletedEvent] → Tiến hành trừ tồn kho.
- [PaymentFailedEvent] → Hủy đơn hàng và hoàn tiền.
❌ Nếu thứ tự bị đảo ngược: Có thể dẫn đến vấn đề tồn kho bị trừ cho một đơn hàng đã bị hủy.
Giải pháp: Chỉ định Key cho tin nhắn Kafka
- Kafka chọn Partition dựa trên Key khi gửi tin nhắn: kafkaTemplate.send("payment.failed", sagaId.toString(), event);
- Nếu chỉ định cùng một sagaId làm Key, tin nhắn sẽ luôn được gửi đến cùng một Partition.
- Kafka đảm bảo thứ tự tuyệt đối bên trong cùng một Partition.
Partition Key đề xuất
- Saga dựa trên đơn hàng: orderId
- Tập trung vào thực thể Saga: sagaId
- Giao dịch theo đơn vị người dùng (Lưu ý): userId
Trong hầu hết các trường hợp, sử dụng sagaId hoặc orderId là an toàn nhất
Lưu ý
Khả năng giảm tính song song: Cùng Key → cùng Partition → cùng Consumer → có khả năng gây nghẽn cổ chai (bottleneck) xử lý.
Lưu ý về độ lệch (Skew): Nếu một ID cụ thể tập trung quá nhiều dữ liệu, một số Consumer có thể bị quá tải.
Duy trì chiến lược khi truyền qua DLQ: Dead Letter Queue cũng phải tuân theo chiến lược Key tương tự để duy trì thứ tự.
KafkaListener và Đảm bảo thứ tự
@KafkaListener(topics = "payment.result", concurrency = "3") public void handlePaymentEvents(PaymentEvent event) { // Nếu cùng một partition, các sự kiện sẽ được xử lý tuần tự trên cùng một thread }
Kafka xử lý tuần tự theo đơn vị Partition, do đó các sự kiện có cùng Key sẽ duy trì chính xác thứ tự của chúng.
Tóm tắt
- Saga Pattern là phương thức thay thế cho giao dịch phân tán, duy trì tính nhất quán thông qua chuyển đổi trạng thái và bồi hoàn.
- Được thiết kế theo cấu trúc dựa trên sự kiện (Event-driven), cốt lõi là sự kết nối giữa Domain Service ↔ Kafka ↔ Orchestrator.
- Thông qua bảng theo dõi trạng thái, hệ thống có thể tái khởi động khi bị gián đoạn, giám sát và phân tích lịch sử thất bại.
- Có khả năng ứng phó linh hoạt về mặt cấu trúc với các xử lý trễ hoặc thử lại (retry).
Nguồn bài viết ryukato.github.io