Alasan kita membutuhkan a Kotak Keluar Transaksional adalah bahwa suatu layanan seringkali perlu memposting pesan sebagai bagian dari transaksi yang memperbarui database. Pembaruan basis data dan pengiriman pesan harus terjadi dalam sebuah transaksi. Sebaliknya, jika layanan tidak melakukan kedua operasi ini secara otomatis, kegagalan dapat menyebabkan sistem dalam keadaan tidak konsisten.
ITU Repositori GitHub dengan kode sumber untuk artikel ini.
Pada artikel ini, kami akan mengimplementasikannya menggunakan Reactive Spring dan Kotlin dengan Coroutine. Berikut adalah daftar lengkap dependensi yang digunakan: Kotlin dengan coroutine , Boot Musim Semi 3 , WebFlux , R2DBC , postgres , MongoDB , Kafka , grafana , Prometheus , Zipkin Dan Mikrometer untuk dapat diamati.
ITU Kotak Keluar Transaksional pola memecahkan masalah dalam implementasi di mana biasanya transaksi mencoba memperbarui tabel database dan kemudian memposting pesan ke broker dan melakukan transaksi. Tapi inilah masalahnya: jika langkah terakhir dari transaksi gagal, transaksi akan mengembalikan perubahan database, tetapi acara tersebut telah diposting ke broker. Jadi kita perlu menemukan cara untuk memastikan bahwa kedua database ditulis dan dipublikasikan ke broker. Gagasan tentang bagaimana kita dapat menyelesaikannya adalah dalam satu transaksi, simpan di tabel perintah, dan dalam transaksi yang sama, simpan di tabel kotak keluar dan komit transaksi. Selanjutnya, kita perlu memposting peristiwa yang direkam dari tabel kotak keluar ke broker.
Kami memiliki dua cara untuk melakukan ini; menggunakan sebuah CDC (Edit tangkapan data) alat seperti Debezium yang terus-menerus memantau database Anda dan memungkinkan aplikasi Anda untuk menyiarkan setiap perubahan tingkat baris dalam urutan yang sama dengan komitmennya ke database dan P olling editor . Untuk proyek ini, kami menggunakan editor survei. Sangat disarankan oleh Chris Richardson Buku: Pola layanan mikro , di mana model Kotak Keluar Transaksional dijelaskan dengan sangat baik.
Lebih penting lagi, kita perlu bersiap untuk kasus di mana acara yang sama dapat diposting lebih dari satu kali, sehingga konsumen harus idempoten. Idempotensi menjelaskan keandalan pesan dalam sistem terdistribusi, khususnya penerimaan pesan duplikat. Karena fitur coba ulang atau perantara pesan, pesan yang dikirim satu kali dapat diterima berkali-kali oleh konsumen. Suatu layanan bersifat idempoten jika memproses peristiwa yang sama beberapa kali menghasilkan keadaan dan keluaran yang sama karena memproses peristiwa itu hanya sekali. Menerima peristiwa duplikat tidak mengubah status atau perilaku aplikasi. Sebagian besar waktu, layanan idempoten mendeteksi kejadian ini dan mengabaikannya. Idempotensi dapat diimplementasikan menggunakan pengidentifikasi unik.
Jadi, mari kita terapkan. Logika bisnis dari contoh layanan mikro kami sederhana: pesanan dengan item dari toko produk; itu dua tabel untuk kesederhanaan dan tabel kotak keluar, tentu saja. Biasanya saat tabel kotak keluar terlihat seperti berada di bidang data, kami menyimpan acara bersambung. Yang paling umum adalah format JSON, tetapi itu tergantung pada Anda dan layanan mikro yang konkret. Kita dapat menempatkan sebagai perubahan status bidang data atau hanya menempatkan entitas domain perintah lengkap terakhir yang diperbarui setiap kali; tentu saja, perubahan status membutuhkan ukuran yang jauh lebih kecil, tetapi sekali lagi, itu terserah Anda. Bidang lain dalam tabel kotak keluar biasanya menyertakan jenis peristiwa, stempel waktu, versi, dan metadata lainnya. Itu tergantung pada setiap implementasi konkret, tetapi seringkali itu adalah persyaratan minimum. Bidang versi adalah untuk kontrol konkurensi.
Semua antarmuka UI akan tersedia di port:
UI yang menyombongkan diri URL .
Grafana UI URL .
Zipkin UI URL .
Kafka UI URL .
UI Prometheus URL .
ITU komposisi buruh pelabuhan file dalam artikel ini berisi Postgres, MongoDB, zookeeper, Kafka, Kafka-ui, Zipkin, Prometheus dan Grafana,
Untuk pengembangan lokal: gunakan make local
Atau make develop
jalankan hanya docker-compose terlebih dahulu, lalu sertakan gambar layanan mikro.
version: "3.9"
services:
microservices_postgresql:
image: postgres:latest
container_name: microservices_postgresql
expose:
- "5432"
ports:
- "5432:5432"
restart: always
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=microservices
- POSTGRES_HOST=5432
command: -p 5432
volumes:
- ./docker_data/microservices_pgdata:/var/lib/postgresql/data
networks: [ "microservices" ]
zoo1:
image: confluentinc/cp-zookeeper:7.3.0
hostname: zoo1
container_name: zoo1
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_SERVERS: zoo1:2888:3888
volumes:
- "./zookeeper:/zookeeper"
networks: [ "microservices" ]
kafka1:
image: confluentinc/cp-kafka:7.3.0
hostname: kafka1
container_name: kafka1
ports:
- "9092:9092"
- "29092:29092"
- "9999:9999"
environment:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_JMX_PORT: 9999
KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
depends_on:
- zoo1
volumes:
- "./kafka_data:/kafka"
networks: [ "microservices" ]
kafka-ui:
image: provectuslabs/kafka-ui
container_name: kafka-ui
ports:
- "8086:8080"
restart: always
environment:
- KAFKA_CLUSTERS_0_NAME=local
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka1:19092
networks: [ "microservices" ]
zipkin-all-in-one:
image: openzipkin/zipkin:latest
restart: always
ports:
- "9411:9411"
networks: [ "microservices" ]
mongo:
image: mongo
restart: always
ports:
- "27017:27017"
environment:
MONGO_INITDB_ROOT_USERNAME: admin
MONGO_INITDB_ROOT_PASSWORD: admin
MONGODB_DATABASE: bank_accounts
networks: [ "microservices" ]
prometheus:
image: prom/prometheus:latest
container_name: prometheus
ports:
- "9090:9090"
command:
- --config.file=/etc/prometheus/prometheus.yml
volumes:
- ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:ro
networks: [ "microservices" ]
node_exporter:
container_name: microservices_node_exporter
restart: always
image: prom/node-exporter
ports:
- '9101:9100'
networks: [ "microservices" ]
grafana:
container_name: microservices_grafana
restart: always
image: grafana/grafana
ports:
- '3000:3000'
networks: [ "microservices" ]
networks:
microservices:
  name: microservices
Skema database Postgres untuk proyek ini adalah:
Pengontrol REST domain perintah menggunakan metode berikut:
@RestController
@RequestMapping(path = ["/api/v1/orders"])
class OrderController(private val orderService: OrderService, private val or: ObservationRegistry) {
@GetMapping
@Operation(method = "getOrders", summary = "get order with pagination", operationId = "getOrders")
suspend fun getOrders(
@RequestParam(name = "page", defaultValue = "0") page: Int,
@RequestParam(name = "size", defaultValue = "20") size: Int,
) = coroutineScopeWithObservation(GET_ORDERS, or) { observation ->
ResponseEntity.ok()
.body(orderService.getAllOrders(PageRequest.of(page, size))
.map { it.toSuccessResponse() }
.also { response -> observation.highCardinalityKeyValue("response", response.toString()) }
)
}
@GetMapping(path = ["{id}"])
@Operation(method = "getOrderByID", summary = "get order by id", operationId = "getOrderByID")
suspend fun getOrderByID(@PathVariable id: String) = coroutineScopeWithObservation(GET_ORDER_BY_ID, or) { observation ->
ResponseEntity.ok().body(orderService.getOrderWithProductsByID(UUID.fromString(id)).toSuccessResponse())
.also { response ->
observation.highCardinalityKeyValue("response", response.toString())
log.info("getOrderByID response: $response")
}
}
@PostMapping
@Operation(method = "createOrder", summary = "create new order", operationId = "createOrder")
suspend fun createOrder(@Valid @RequestBody createOrderDTO: CreateOrderDTO) = coroutineScopeWithObservation(CREATE_ORDER, or) { observation ->
ResponseEntity.status(HttpStatus.CREATED).body(orderService.createOrder(createOrderDTO.toOrder()).toSuccessResponse())
.also {
log.info("created order: $it")
observation.highCardinalityKeyValue("response", it.toString())
}
}
@PutMapping(path = ["/add/{id}"])
@Operation(method = "addProductItem", summary = "add to the order product item", operationId = "addProductItem")
suspend fun addProductItem(
@PathVariable id: UUID,
@Valid @RequestBody dto: CreateProductItemDTO
) = coroutineScopeWithObservation(ADD_PRODUCT, or) { observation ->
ResponseEntity.ok().body(orderService.addProductItem(dto.toProductItem(id)))
.also {
observation.highCardinalityKeyValue("CreateProductItemDTO", dto.toString())
observation.highCardinalityKeyValue("id", id.toString())
log.info("addProductItem id: $id, dto: $dto")
}
}
@PutMapping(path = ["/remove/{orderId}/{productItemId}"])
@Operation(method = "removeProductItem", summary = "remove product from the order", operationId = "removeProductItem")
suspend fun removeProductItem(
@PathVariable orderId: UUID,
@PathVariable productItemId: UUID
) = coroutineScopeWithObservation(REMOVE_PRODUCT, or) { observation ->
ResponseEntity.ok().body(orderService.removeProductItem(orderId, productItemId))
.also {
observation.highCardinalityKeyValue("productItemId", productItemId.toString())
observation.highCardinalityKeyValue("orderId", orderId.toString())
log.info("removeProductItem orderId: $orderId, productItemId: $productItemId")
}
}
@PutMapping(path = ["/pay/{id}"])
@Operation(method = "payOrder", summary = "pay order", operationId = "payOrder")
suspend fun payOrder(@PathVariable id: UUID, @Valid @RequestBody dto: PayOrderDTO) = coroutineScopeWithObservation(PAY_ORDER, or) { observation ->
ResponseEntity.ok().body(orderService.pay(id, dto.paymentId).toSuccessResponse())
.also {
observation.highCardinalityKeyValue("response", it.toString())
log.info("payOrder result: $it")
}
}
@PutMapping(path = ["/cancel/{id}"])
@Operation(method = "cancelOrder", summary = "cancel order", operationId = "cancelOrder")
suspend fun cancelOrder(@PathVariable id: UUID, @Valid @RequestBody dto: CancelOrderDTO) = coroutineScopeWithObservation(CANCEL_ORDER, or) { observation ->
ResponseEntity.ok().body(orderService.cancel(id, dto.reason).toSuccessResponse())
.also {
observation.highCardinalityKeyValue("response", it.toString())
log.info("cancelOrder result: $it")
}
}
@PutMapping(path = ["/submit/{id}"])
@Operation(method = "submitOrder", summary = "submit order", operationId = "submitOrder")
suspend fun submitOrder(@PathVariable id: UUID) = coroutineScopeWithObservation(SUBMIT_ORDER, or) { observation ->
ResponseEntity.ok().body(orderService.submit(id).toSuccessResponse())
.also {
observation.highCardinalityKeyValue("response", it.toString())
log.info("submitOrder result: $it")
}
}
@PutMapping(path = ["/complete/{id}"])
@Operation(method = "completeOrder", summary = "complete order", operationId = "completeOrder")
suspend fun completeOrder(@PathVariable id: UUID) = coroutineScopeWithObservation(COMPLETE_ORDER, or) { observation ->
ResponseEntity.ok().body(orderService.complete(id).toSuccessResponse())
.also {
observation.highCardinalityKeyValue("response", it.toString())
log.info("completeOrder result: $it")
}
}
}
Seperti yang saya sebutkan sebelumnya, ide utama implementasi kotak keluar transaksional adalah bahwa pada langkah pertama transaksi, tulis ke perintah dan tabel kotak keluar dan lakukan transaksi, sebagai tambahan, tetapi tidak wajib, untuk pengoptimalan. Kita dapat, dengan metode yang sama, setelah berhasil melakukan transaksi, lalu memposting acara tersebut dan menghapusnya dari tabel kotak keluar. Tapi di sini, jika salah satu langkah memposting ke broker atau menghapus tabel dari kotak keluar gagal, tidak apa-apa karena kami memiliki pembuat jajak pendapat sebagai proses terjadwal. Ini adalah pengoptimalan dan peningkatan kecil, dan tidak wajib menerapkan model kotak keluar. Coba kedua varian dan pilih salah satu yang paling sesuai dengan kasus Anda. Dalam kasus kita, kita menggunakan Kafka, jadi perlu diingat bahwa producer memiliki parameter acks,
Kapan acks=0, produsen menganggap pesan “berhasil ditulis” saat pesan dikirim tanpa menunggu broker menerimanya sama sekali. Jika broker terputus atau pengecualian terjadi, kami tidak akan mengetahuinya dan akan kehilangan data, jadi berhati-hatilah dengan pengaturan ini dan jangan gunakan acks=0.
Kapan terdakwa = 1 produsen menganggap pesan “berhasil ditulis” ketika pesan tersebut hanya dikenali oleh pemimpin.
Kapan ack = semua produsen menganggap pesan “berhasil ditulis” ketika diterima oleh semua replika tersinkronisasi (ISR).
Dalam diagram urutan yang disederhanakan untuk logika bisnis lapisan layanan, langkah 5 dan 6 adalah pengoptimalan opsional dan tidak diperlukan karena kami memiliki editor jajak pendapat Bagaimanapun:
ITU layanan pesanan Penerapan:
interface OrderService {
suspend fun createOrder(order: Order): Order
suspend fun getOrderByID(id: UUID): Order
suspend fun addProductItem(productItem: ProductItem)
suspend fun removeProductItem(orderID: UUID, productItemId: UUID)
suspend fun pay(id: UUID, paymentId: String): Order
suspend fun cancel(id: UUID, reason: String?): Order
suspend fun submit(id: UUID): Order
suspend fun complete(id: UUID): Order
suspend fun getOrderWithProductsByID(id: UUID): Order
suspend fun getAllOrders(pageable: Pageable): Page<Order>
suspend fun deleteOutboxRecordsWithLock()
}
@Service
class OrderServiceImpl(
private val orderRepository: OrderRepository,
private val productItemRepository: ProductItemRepository,
private val outboxRepository: OrderOutboxRepository,
private val orderMongoRepository: OrderMongoRepository,
private val txOp: TransactionalOperator,
private val eventsPublisher: EventsPublisher,
private val kafkaTopicsConfiguration: KafkaTopicsConfiguration,
private val or: ObservationRegistry,
private val outboxEventSerializer: OutboxEventSerializer
) : OrderService {
override suspend fun createOrder(order: Order): Order = coroutineScopeWithObservation(CREATE, or) { observation ->
txOp.executeAndAwait {
orderRepository.insert(order).let {
val productItemsEntityList = ProductItemEntity.listOf(order.productsList(), UUID.fromString(it.id))
val insertedItems = productItemRepository.insertAll(productItemsEntityList).toList()
it.addProductItems(insertedItems.map { item -> item.toProductItem() })
Pair(it, outboxRepository.save(outboxEventSerializer.orderCreatedEventOf(it)))
}
}.run {
observation.highCardinalityKeyValue("order", first.toString())
observation.highCardinalityKeyValue("outboxEvent", second.toString())
publishOutboxEvent(second)
first
}
}
override suspend fun addProductItem(productItem: ProductItem): Unit = coroutineScopeWithObservation(ADD_PRODUCT, or) { observation ->
txOp.executeAndAwait {
val order = orderRepository.findOrderByID(UUID.fromString(productItem.orderId))
order.incVersion()
val updatedProductItem = productItemRepository.upsert(productItem)
val savedRecord = outboxRepository.save(
outboxEventSerializer.productItemAddedEventOf(
order,
productItem.copy(version = updatedProductItem.version).toEntity()
)
)
orderRepository.updateVersion(UUID.fromString(order.id), order.version)
.also { result -> log.info("addOrderItem result: $result, version: ${order.version}") }
savedRecord
}.run {
observation.highCardinalityKeyValue("outboxEvent", this.toString())
publishOutboxEvent(this)
}
}
override suspend fun removeProductItem(orderID: UUID, productItemId: UUID): Unit = coroutineScopeWithObservation(REMOVE_PRODUCT, or) { observation ->
txOp.executeAndAwait {
if (!productItemRepository.existsById(productItemId)) throw ProductItemNotFoundException(productItemId)
val order = orderRepository.findOrderByID(orderID)
productItemRepository.deleteById(productItemId)
order.incVersion()
val savedRecord = outboxRepository.save(outboxEventSerializer.productItemRemovedEventOf(order, productItemId))
orderRepository.updateVersion(UUID.fromString(order.id), order.version)
.also { log.info("removeProductItem update order result: $it, version: ${order.version}") }
savedRecord
}.run {
observation.highCardinalityKeyValue("outboxEvent", this.toString())
publishOutboxEvent(this)
}
}
override suspend fun pay(id: UUID, paymentId: String): Order = coroutineScopeWithObservation(PAY, or) { observation ->
txOp.executeAndAwait {
val order = orderRepository.getOrderWithProductItemsByID(id)
order.pay(paymentId)
val updatedOrder = orderRepository.update(order)
Pair(updatedOrder, outboxRepository.save(outboxEventSerializer.orderPaidEventOf(updatedOrder, paymentId)))
}.run {
observation.highCardinalityKeyValue("order", first.toString())
observation.highCardinalityKeyValue("outboxEvent", second.toString())
publishOutboxEvent(second)
first
}
}
override suspend fun cancel(id: UUID, reason: String?): Order = coroutineScopeWithObservation(CANCEL, or) { observation ->
txOp.executeAndAwait {
val order = orderRepository.findOrderByID(id)
order.cancel()
val updatedOrder = orderRepository.update(order)
Pair(updatedOrder, outboxRepository.save(outboxEventSerializer.orderCancelledEventOf(updatedOrder, reason)))
}.run {
observation.highCardinalityKeyValue("order", first.toString())
observation.highCardinalityKeyValue("outboxEvent", second.toString())
publishOutboxEvent(second)
first
}
}
override suspend fun submit(id: UUID): Order = coroutineScopeWithObservation(SUBMIT, or) { observation ->
txOp.executeAndAwait {
val order = orderRepository.getOrderWithProductItemsByID(id)
order.submit()
val updatedOrder = orderRepository.update(order)
updatedOrder.addProductItems(order.productsList())
Pair(updatedOrder, outboxRepository.save(outboxEventSerializer.orderSubmittedEventOf(updatedOrder)))
}.run {
observation.highCardinalityKeyValue("order", first.toString())
observation.highCardinalityKeyValue("outboxEvent", second.toString())
publishOutboxEvent(second)
first
}
}
override suspend fun complete(id: UUID): Order = coroutineScopeWithObservation(COMPLETE, or) { observation ->
txOp.executeAndAwait {
val order = orderRepository.findOrderByID(id)
order.complete()
val updatedOrder = orderRepository.update(order)
log.info("order submitted: ${updatedOrder.status} for id: $id")
Pair(updatedOrder, outboxRepository.save(outboxEventSerializer.orderCompletedEventOf(updatedOrder)))
}.run {
observation.highCardinalityKeyValue("order", first.toString())
observation.highCardinalityKeyValue("outboxEvent", second.toString())
publishOutboxEvent(second)
first
}
}
@Transactional(readOnly = true)
override suspend fun getOrderWithProductsByID(id: UUID): Order = coroutineScopeWithObservation(GET_ORDER_WITH_PRODUCTS_BY_ID, or) { observation ->
orderRepository.getOrderWithProductItemsByID(id).also { observation.highCardinalityKeyValue("order", it.toString()) }
}
override suspend fun getAllOrders(pageable: Pageable): Page<Order> = coroutineScopeWithObservation(GET_ALL_ORDERS, or) { observation ->
orderMongoRepository.getAllOrders(pageable).also { observation.highCardinalityKeyValue("pageResult", it.toString()) }
}
override suspend fun deleteOutboxRecordsWithLock() = coroutineScopeWithObservation(DELETE_OUTBOX_RECORD_WITH_LOCK, or) { observation ->
outboxRepository.deleteOutboxRecordsWithLock {
observation.highCardinalityKeyValue("outboxEvent", it.toString())
eventsPublisher.publish(getTopicName(it.eventType), it)
}
}
override suspend fun getOrderByID(id: UUID): Order = coroutineScopeWithObservation(GET_ORDER_BY_ID, or) { observation ->
orderMongoRepository.getByID(id.toString())
.also { log.info("getOrderByID: $it") }
.also { observation.highCardinalityKeyValue("order", it.toString()) }
}
private suspend fun publishOutboxEvent(event: OutboxRecord) = coroutineScopeWithObservation(PUBLISH_OUTBOX_EVENT, or) { observation ->
try {
log.info("publishing outbox event: $event")
outboxRepository.deleteOutboxRecordByID(event.eventId!!) {
eventsPublisher.publish(getTopicName(event.eventType), event.aggregateId.toString(), event)
}
log.info("outbox event published and deleted: $event")
observation.highCardinalityKeyValue("event", event.toString())
} catch (ex: Exception) {
log.error("exception while publishing outbox event: ${ex.localizedMessage}")
observation.error(ex)
}
}
}
Repositori Order dan Product Items Postgres adalah kombinasi dari CoroutineCrudRepository
dan implementasi kustom menggunakan DatabaseClient
Dan R2dbcEntityTemplate
mendukung penguncian optimis dan pesimis, tergantung pada persyaratan metode.
@Repository
interface OrderRepository : CoroutineCrudRepository<OrderEntity, UUID>, OrderBaseRepository
@Repository
interface OrderBaseRepository {
suspend fun getOrderWithProductItemsByID(id: UUID): Order
suspend fun updateVersion(id: UUID, newVersion: Long): Long
suspend fun findOrderByID(id: UUID): Order
suspend fun insert(order: Order): Order
suspend fun update(order: Order): Order
}
@Repository
class OrderBaseRepositoryImpl(
private val dbClient: DatabaseClient,
private val entityTemplate: R2dbcEntityTemplate,
private val or: ObservationRegistry
) : OrderBaseRepository {
override suspend fun updateVersion(id: UUID, newVersion: Long): Long = coroutineScopeWithObservation(UPDATE_VERSION, or) { observation ->
dbClient.sql("UPDATE microservices.orders SET version = (version + 1) WHERE id = :id AND version = :version")
.bind(ID, id)
.bind(VERSION, newVersion - 1)
.fetch()
.rowsUpdated()
.awaitSingle()
.also { log.info("for order with id: $id version updated to $newVersion") }
.also {
observation.highCardinalityKeyValue("id", id.toString())
observation.highCardinalityKeyValue("newVersion", newVersion.toString())
}
}
override suspend fun getOrderWithProductItemsByID(id: UUID): Order = coroutineScopeWithObservation(GET_ORDER_WITH_PRODUCTS_BY_ID, or) { observation ->
dbClient.sql(
"""SELECT o.id, o.email, o.status, o.address, o.version, o.payment_id, o.created_at, o.updated_at,
|pi.id as productId, pi.price, pi.title, pi.quantity, pi.order_id, pi.version as itemVersion, pi.created_at as itemCreatedAt, pi.updated_at as itemUpdatedAt
|FROM microservices.orders o
|LEFT JOIN microservices.product_items pi on o.id = pi.order_id
|WHERE o.id = :id""".trimMargin()
)
.bind(ID, id)
.map { row, _ -> Pair(OrderEntity.of(row), ProductItemEntity.of(row)) }
.flow()
.toList()
.let { orderFromList(it) }
.also {
log.info("getOrderWithProductItemsByID order: $it")
observation.highCardinalityKeyValue("order", it.toString())
}
}
override suspend fun findOrderByID(id: UUID): Order = coroutineScopeWithObservation(FIND_ORDER_BY_ID, or) { observation ->
val query = Query.query(Criteria.where(ID).`is`(id))
entityTemplate.selectOne(query, OrderEntity::class.java).awaitSingleOrNull()?.toOrder()
.also { observation.highCardinalityKeyValue("order", it.toString()) }
?: throw OrderNotFoundException(id)
}
override suspend fun insert(order: Order): Order = coroutineScopeWithObservation(INSERT, or) { observation ->
entityTemplate.insert(order.toEntity()).awaitSingle().toOrder()
.also {
log.info("inserted order: $it")
observation.highCardinalityKeyValue("order", it.toString())
}
}
override suspend fun update(order: Order): Order = coroutineScopeWithObservation(UPDATE, or) { observation ->
entityTemplate.update(order.toEntity()).awaitSingle().toOrder()
.also {
log.info("updated order: $it")
observation.highCardinalityKeyValue("order", it.toString())
}
}
}
interface ProductItemBaseRepository {
suspend fun insert(productItemEntity: ProductItemEntity): ProductItemEntity
suspend fun insertAll(productItemEntities: List<ProductItemEntity>): List<ProductItemEntity>
suspend fun upsert(productItem: ProductItem): ProductItem
}
@Repository
class ProductItemBaseRepositoryImpl(
private val entityTemplate: R2dbcEntityTemplate,
private val or: ObservationRegistry,
) : ProductItemBaseRepository {
override suspend fun upsert(productItem: ProductItem): ProductItem = coroutineScopeWithObservation(UPDATE, or) { observation ->
val query = Query.query(
Criteria.where("id").`is`(UUID.fromString(productItem.id))
.and("order_id").`is`(UUID.fromString(productItem.orderId))
)
val product = entityTemplate.selectOne(query, ProductItemEntity::class.java).awaitSingleOrNull()
if (product != null) {
val update = Update
.update("quantity", (productItem.quantity + product.quantity))
.set("version", product.version + 1)
.set("updated_at", LocalDateTime.now())
val updatedProduct = product.copy(quantity = (productItem.quantity + product.quantity), version = product.version + 1)
val updateResult = entityTemplate.update(query, update, ProductItemEntity::class.java).awaitSingle()
log.info("updateResult product: $updateResult")
log.info("updateResult updatedProduct: $updatedProduct")
return@coroutineScopeWithObservation updatedProduct.toProductItem()
}
entityTemplate.insert(ProductItemEntity.of(productItem)).awaitSingle().toProductItem()
.also { productItem ->
log.info("saved productItem: $productItem")
observation.highCardinalityKeyValue("productItem", productItem.toString())
}
}
override suspend fun insert(productItemEntity: ProductItemEntity): ProductItemEntity = coroutineScopeWithObservation(INSERT, or) { observation ->
val product = entityTemplate.insert(productItemEntity).awaitSingle()
log.info("saved product: $product")
observation.highCardinalityKeyValue("product", product.toString())
product
}
override suspend fun insertAll(productItemEntities: List<ProductItemEntity>) = coroutineScopeWithObservation(INSERT_ALL, or) { observation ->
val result = productItemEntities.map { entityTemplate.insert(it) }.map { it.awaitSingle() }
log.info("inserted product items: $result")
observation.highCardinalityKeyValue("result", result.toString())
result
}
}
Detail penting di sini adalah untuk dapat menangani kasus beberapa instance Pod yang sedang diproses dalam tabel kotak keluar paralel. Kami memiliki konsumen idempoten, tetapi kami perlu menghindari pemrosesan kejadian tabel yang sama berkali-kali. Untuk menghindari beberapa contoh memilih dan memposting acara yang sama, kami menggunakan FOR UPDATE SKIP LOCKED.
Kombinasi ini mencoba untuk memilih sekumpulan kejadian kotak keluar. Jika instance lain telah memilih record ini, pertama-tama kita akan mengabaikan record yang dikunci dan memilih record berikutnya yang tersedia dan tidak terkunci, dan seterusnya.
@Repository
interface OutboxBaseRepository {
suspend fun deleteOutboxRecordByID(id: UUID, callback: suspend () -> Unit): Long
suspend fun deleteOutboxRecordsWithLock(callback: suspend (outboxRecord: OutboxRecord) -> Unit)
}
class OutboxBaseRepositoryImpl(
private val dbClient: DatabaseClient,
private val txOp: TransactionalOperator,
private val or: ObservationRegistry,
private val transactionalOperator: TransactionalOperator
) : OutboxBaseRepository {
override suspend fun deleteOutboxRecordByID(id: UUID, callback: suspend () -> Unit): Long =
coroutineScopeWithObservation(DELETE_OUTBOX_RECORD_BY_ID, or) { observation ->
withTimeout(DELETE_OUTBOX_RECORD_TIMEOUT_MILLIS) {
txOp.executeAndAwait {
callback()
dbClient.sql("DELETE FROM microservices.outbox_table WHERE event_id = :eventId")
.bind("eventId", id)
.fetch()
.rowsUpdated()
.awaitSingle()
.also {
log.info("outbox event with id: $it deleted")
observation.highCardinalityKeyValue("id", it.toString())
}
}
}
}
override suspend fun deleteOutboxRecordsWithLock(callback: suspend (outboxRecord: OutboxRecord) -> Unit) =
coroutineScopeWithObservation(DELETE_OUTBOX_RECORD_WITH_LOCK, or) { observation ->
withTimeout(DELETE_OUTBOX_RECORD_TIMEOUT_MILLIS) {
txOp.executeAndAwait {
dbClient.sql("SELECT * FROM microservices.outbox_table ORDER BY timestamp ASC LIMIT 10 FOR UPDATE SKIP LOCKED")
.map { row, _ -> OutboxRecord.of(row) }
.flow()
.onEach {
log.info("deleting outboxEvent with id: ${it.eventId}")
callback(it)
dbClient.sql("DELETE FROM microservices.outbox_table WHERE event_id = :eventId")
.bind("eventId", it.eventId!!)
.fetch()
.rowsUpdated()
.awaitSingle()
log.info("outboxEvent with id: ${it.eventId} published and deleted")
observation.highCardinalityKeyValue("eventId", it.eventId.toString())
}
.collect()
}
}
}
}
ITU produser survei implementasinya adalah proses terjadwal yang melakukan tugas yang sama untuk memposting dan menghapus acara pada interval tertentu seperti yang diketik sebelumnya dan menggunakan metode layanan yang sama:
@Component
@ConditionalOnProperty(prefix = "schedulers", value = ["outbox.enable"], havingValue = "true")
class OutboxScheduler(private val orderService: OrderService, private val or: ObservationRegistry) {
@Scheduled(initialDelayString = "\${schedulers.outbox.initialDelayMillis}", fixedRateString = "\${schedulers.outbox.fixedRate}")
fun publishAndDeleteOutboxRecords() = runBlocking {
coroutineScopeWithObservation(PUBLISH_AND_DELETE_OUTBOX_RECORDS, or) {
log.debug("starting scheduled outbox table publishing")
orderService.deleteOutboxRecordsWithLock()
log.debug("completed scheduled outbox table publishing")
}
}
companion object {
private val log = LoggerFactory.getLogger(OutboxScheduler::class.java)
private const val PUBLISH_AND_DELETE_OUTBOX_RECORDS = "OutboxScheduler.publishAndDeleteOutboxRecords"
}
}
Biasanya, kotak keluar transaksional lebih sering diperlukan untuk memastikan konsistensi data antar layanan mikro. Di sini, misalnya, konsumen dari layanan mikro yang sama memprosesnya dan menyimpannya ke MongoDB. Detail terpenting di sini, mengingat bahwa kami memproses peristiwa Kafka dalam beberapa proses konsumsi, kemungkinan kasus penggunaan saat urutan pemrosesan peristiwa dapat diacak. Di Kafka kami memiliki fitur kunci, dan ini membantu kami karena mengirim pesan dengan kunci yang sama ke sebuah partisi. Namun jika broker tidak memiliki fitur ini, kita harus mengaturnya secara manual. Kasus di mana, misalnya, pertama, beberapa konsumen mencoba memproses peristiwa #6 sebelum peristiwa #4 dan #5 diproses. Untuk alasan ini, miliki bidang versi entitas domain di acara kotak keluar, jadi kami hanya dapat melihat versi dan memvalidasi jika di database kami memiliki pesanan versi #3, tetapi sekarang penanganan acara dengan versi #6, pertama kami harus menunggu #4, #5 dan proses terlebih dahulu, tetapi tentu saja perincian ini bergantung pada setiap logika bisnis konkret aplikasi, di sini hanya menunjukkan gagasan bahwa ini adalah kasus yang memungkinkan. Dan detail penting lainnya — adalah coba topik lagi. Jika kami perlu mencoba kembali proses pesan, lebih baik membuat topik coba ulang dan menangani coba ulang di sini, berapa lama untuk mencoba kembali dan detail logis lanjutan lainnya sesuai dengan kasus konkret Anda. Dalam contoh, kami memiliki dua pendengar. Di mana salah satunya adalah untuk coba topik lagi pemrosesan pesan:
@Component
class OrderConsumer(
private val kafkaTopicsConfiguration: KafkaTopicsConfiguration,
private val serializer: Serializer,
private val eventsPublisher: EventsPublisher,
private val orderEventProcessor: OrderEventProcessor,
private val or: ObservationRegistry,
) {
@KafkaListener(
groupId = "\${kafka.consumer-group-id:order-service-group-id}",
topics = [
"\${topics.orderCreated.name}",
"\${topics.productAdded.name}",
"\${topics.productRemoved.name}",
"\${topics.orderPaid.name}",
"\${topics.orderCancelled.name}",
"\${topics.orderSubmitted.name}",
"\${topics.orderCompleted.name}",
],
id = "orders-consumer"
)
fun process(ack: Acknowledgment, consumerRecord: ConsumerRecord<String, ByteArray>) = runBlocking {
coroutineScopeWithObservation(PROCESS, or) { observation ->
try {
observation.highCardinalityKeyValue("consumerRecord", getConsumerRecordInfoWithHeaders(consumerRecord))
processOutboxRecord(serializer.deserialize(consumerRecord.value(), OutboxRecord::class.java))
ack.acknowledge()
log.info("committed record: ${getConsumerRecordInfo(consumerRecord)}")
} catch (ex: Exception) {
observation.highCardinalityKeyValue("consumerRecord", getConsumerRecordInfoWithHeaders(consumerRecord))
observation.error(ex)
if (ex is SerializationException || ex is UnknownEventTypeException || ex is AlreadyProcessedVersionException) {
log.error("ack not serializable, unknown or already processed record: ${getConsumerRecordInfoWithHeaders(consumerRecord)}")
ack.acknowledge()
return@coroutineScopeWithObservation
}
if (ex is InvalidVersionException || ex is NoSuchElementException || ex is OrderNotFoundException) {
publishRetryTopic(kafkaTopicsConfiguration.retryTopic.name, consumerRecord, 1)
ack.acknowledge()
log.warn("ack concurrency write or version exception ${ex.localizedMessage}")
return@coroutineScopeWithObservation
}
publishRetryTopic(kafkaTopicsConfiguration.retryTopic.name, consumerRecord, 1)
ack.acknowledge()
log.error("ack exception while processing record: ${getConsumerRecordInfoWithHeaders(consumerRecord)}", ex)
}
}
}
@KafkaListener(groupId = "\${kafka.consumer-group-id:order-service-group-id}", topics = ["\${topics.retryTopic.name}"], id = "orders-retry-consumer")
fun processRetry(ack: Acknowledgment, consumerRecord: ConsumerRecord<String, ByteArray>): Unit = runBlocking {
coroutineScopeWithObservation(PROCESS_RETRY, or) { observation ->
try {
log.warn("processing retry topic record >>>>>>>>>>>>> : ${getConsumerRecordInfoWithHeaders(consumerRecord)}")
observation.highCardinalityKeyValue("consumerRecord", getConsumerRecordInfoWithHeaders(consumerRecord))
processOutboxRecord(serializer.deserialize(consumerRecord.value(), OutboxRecord::class.java))
ack.acknowledge()
log.info("committed retry record: ${getConsumerRecordInfo(consumerRecord)}")
} catch (ex: Exception) {
observation.highCardinalityKeyValue("consumerRecord", getConsumerRecordInfoWithHeaders(consumerRecord))
observation.error(ex)
val currentRetry = String(consumerRecord.headers().lastHeader(RETRY_COUNT_HEADER).value()).toInt()
observation.highCardinalityKeyValue("currentRetry", currentRetry.toString())
if (ex is InvalidVersionException || ex is NoSuchElementException || ex is OrderNotFoundException) {
publishRetryTopic(kafkaTopicsConfiguration.retryTopic.name, consumerRecord, currentRetry)
log.warn("ack concurrency write or version exception ${ex.localizedMessage},record: ${getConsumerRecordInfoWithHeaders(consumerRecord)}")
ack.acknowledge()
return@coroutineScopeWithObservation
}
if (currentRetry > MAX_RETRY_COUNT) {
publishRetryTopic(kafkaTopicsConfiguration.deadLetterQueue.name, consumerRecord, currentRetry + 1)
ack.acknowledge()
log.error("MAX_RETRY_COUNT exceed, send record to DLQ: ${getConsumerRecordInfoWithHeaders(consumerRecord)}")
return@coroutineScopeWithObservation
}
if (ex is SerializationException || ex is UnknownEventTypeException || ex is AlreadyProcessedVersionException) {
ack.acknowledge()
log.error("commit not serializable, unknown or already processed record: ${getConsumerRecordInfoWithHeaders(consumerRecord)}")
return@coroutineScopeWithObservation
}
log.error("exception while processing: ${ex.localizedMessage}, record: ${getConsumerRecordInfoWithHeaders(consumerRecord)}")
publishRetryTopic(kafkaTopicsConfiguration.retryTopic.name, consumerRecord, currentRetry + 1)
ack.acknowledge()
}
}
}
private suspend fun publishRetryTopic(topic: String, record: ConsumerRecord<String, ByteArray>, retryCount: Int) =
coroutineScopeWithObservation(PUBLISH_RETRY_TOPIC, or) { observation ->
observation.highCardinalityKeyValue("topic", record.topic())
.highCardinalityKeyValue("key", record.key())
.highCardinalityKeyValue("offset", record.offset().toString())
.highCardinalityKeyValue("value", String(record.value()))
.highCardinalityKeyValue("retryCount", retryCount.toString())
record.headers().remove(RETRY_COUNT_HEADER)
record.headers().add(RETRY_COUNT_HEADER, retryCount.toString().toByteArray())
mono { publishRetryRecord(topic, record, retryCount) }
.retryWhen(Retry.backoff(PUBLISH_RETRY_COUNT, Duration.ofMillis(PUBLISH_RETRY_BACKOFF_DURATION_MILLIS))
.filter { it is SerializationException })
.awaitSingle()
}
}
Peran dari prosesor acara perintah ke layanan mikro ini memvalidasi versi dan pembaruan acara MongoDB :
interface OrderEventProcessor {
suspend fun on(orderCreatedEvent: OrderCreatedEvent)
suspend fun on(productItemAddedEvent: ProductItemAddedEvent)
suspend fun on(productItemRemovedEvent: ProductItemRemovedEvent)
suspend fun on(orderPaidEvent: OrderPaidEvent)
suspend fun on(orderCancelledEvent: OrderCancelledEvent)
suspend fun on(orderSubmittedEvent: OrderSubmittedEvent)
suspend fun on(orderCompletedEvent: OrderCompletedEvent)
}
@Service
class OrderEventProcessorImpl(
private val orderMongoRepository: OrderMongoRepository,
private val or: ObservationRegistry,
) : OrderEventProcessor {
override suspend fun on(orderCreatedEvent: OrderCreatedEvent): Unit = coroutineScopeWithObservation(ON_ORDER_CREATED_EVENT, or) { observation ->
orderMongoRepository.insert(orderCreatedEvent.order).also {
log.info("created order: $it")
observation.highCardinalityKeyValue("order", it.toString())
}
}
override suspend fun on(productItemAddedEvent: ProductItemAddedEvent): Unit =
coroutineScopeWithObservation(ON_ORDER_PRODUCT_ADDED_EVENT, or) { observation ->
val order = orderMongoRepository.getByID(productItemAddedEvent.orderId)
validateVersion(order.id, order.version, productItemAddedEvent.version)
order.addProductItem(productItemAddedEvent.productItem)
order.version = productItemAddedEvent.version
orderMongoRepository.update(order).also {
log.info("productItemAddedEvent updatedOrder: $it")
observation.highCardinalityKeyValue("order", it.toString())
}
}
override suspend fun on(productItemRemovedEvent: ProductItemRemovedEvent): Unit =
coroutineScopeWithObservation(ON_ORDER_PRODUCT_REMOVED_EVENT, or) { observation ->
val order = orderMongoRepository.getByID(productItemRemovedEvent.orderId)
validateVersion(order.id, order.version, productItemRemovedEvent.version)
order.removeProductItem(productItemRemovedEvent.productItemId)
order.version = productItemRemovedEvent.version
orderMongoRepository.update(order).also {
log.info("productItemRemovedEvent updatedOrder: $it")
observation.highCardinalityKeyValue("order", it.toString())
}
}
override suspend fun on(orderPaidEvent: OrderPaidEvent): Unit = coroutineScopeWithObservation(ON_ORDER_PAID_EVENT, or) { observation ->
val order = orderMongoRepository.getByID(orderPaidEvent.orderId)
validateVersion(order.id, order.version, orderPaidEvent.version)
order.pay(orderPaidEvent.paymentId)
order.version = orderPaidEvent.version
orderMongoRepository.update(order).also {
log.info("orderPaidEvent updatedOrder: $it")
observation.highCardinalityKeyValue("order", it.toString())
}
}
override suspend fun on(orderCancelledEvent: OrderCancelledEvent): Unit = coroutineScopeWithObservation(ON_ORDER_CANCELLED_EVENT, or) { observation ->
val order = orderMongoRepository.getByID(orderCancelledEvent.orderId)
validateVersion(order.id, order.version, orderCancelledEvent.version)
order.cancel()
order.version = orderCancelledEvent.version
orderMongoRepository.update(order).also {
log.info("orderCancelledEvent updatedOrder: $it")
observation.highCardinalityKeyValue("order", it.toString())
}
}
override suspend fun on(orderSubmittedEvent: OrderSubmittedEvent): Unit = coroutineScopeWithObservation(ON_ORDER_SUBMITTED_EVENT, or) { observation ->
val order = orderMongoRepository.getByID(orderSubmittedEvent.orderId)
validateVersion(order.id, order.version, orderSubmittedEvent.version)
order.submit()
order.version = orderSubmittedEvent.version
orderMongoRepository.update(order).also {
log.info("orderSubmittedEvent updatedOrder: $it")
observation.highCardinalityKeyValue("order", it.toString())
}
}
override suspend fun on(orderCompletedEvent: OrderCompletedEvent): Unit = coroutineScopeWithObservation(ON_ORDER_COMPLETED_EVENT, or) { observation ->
val order = orderMongoRepository.getByID(orderCompletedEvent.orderId)
validateVersion(order.id, order.version, orderCompletedEvent.version)
order.complete()
order.version = orderCompletedEvent.version
orderMongoRepository.update(order).also {
log.info("orderCompletedEvent updatedOrder: $it")
observation.highCardinalityKeyValue("order", it.toString())
}
}
private fun validateVersion(id: Any, currentDomainVersion: Long, eventVersion: Long) {
log.info("validating version for id: $id, currentDomainVersion: $currentDomainVersion, eventVersion: $eventVersion")
if (currentDomainVersion >= eventVersion) {
log.warn("currentDomainVersion >= eventVersion validating version for id: $id, currentDomainVersion: $currentDomainVersion, eventVersion: $eventVersion")
throw AlreadyProcessedVersionException(id, eventVersion)
}
if ((currentDomainVersion + 1) < eventVersion) {
log.warn("currentDomainVersion + 1) < eventVersion validating version for id: $id, currentDomainVersion: $currentDomainVersion, eventVersion: $eventVersion")
throw InvalidVersionException(eventVersion)
}
}
}
Kode repositori MongoDB cukup sederhana:
interface OrderMongoRepository {
suspend fun insert(order: Order): Order
suspend fun update(order: Order): Order
suspend fun getByID(id: String): Order
suspend fun getAllOrders(pageable: Pageable): Page<Order>
}
@Repository
class OrderMongoRepositoryImpl(
private val mongoTemplate: ReactiveMongoTemplate,
private val or: ObservationRegistry,
) : OrderMongoRepository {
override suspend fun insert(order: Order): Order = coroutineScopeWithObservation(INSERT, or) { observation ->
withContext(Dispatchers.IO) {
mongoTemplate.insert(OrderDocument.of(order)).awaitSingle().toOrder()
.also { log.info("inserted order: $it") }
.also { observation.highCardinalityKeyValue("order", it.toString()) }
}
}
override suspend fun update(order: Order): Order = coroutineScopeWithObservation(UPDATE, or) { observation ->
withContext(Dispatchers.IO) {
val query = Query.query(Criteria.where(ID).`is`(order.id).and(VERSION).`is`(order.version - 1))
val update = Update()
.set(EMAIL, order.email)
.set(ADDRESS, order.address)
.set(STATUS, order.status)
.set(VERSION, order.version)
.set(PAYMENT_ID, order.paymentId)
.set(PRODUCT_ITEMS, order.productsList())
val options = FindAndModifyOptions.options().returnNew(true).upsert(false)
val updatedOrderDocument = mongoTemplate.findAndModify(query, update, options, OrderDocument::class.java)
.awaitSingleOrNull() ?: throw OrderNotFoundException(order.id.toUUID())
observation.highCardinalityKeyValue("order", updatedOrderDocument.toString())
updatedOrderDocument.toOrder().also { orderDocument -> log.info("updated order: $orderDocument") }
}
}
override suspend fun getByID(id: String): Order = coroutineScopeWithObservation(GET_BY_ID, or) { observation ->
withContext(Dispatchers.IO) {
mongoTemplate.findById(id, OrderDocument::class.java).awaitSingle().toOrder()
.also { log.info("found order: $it") }
.also { observation.highCardinalityKeyValue("order", it.toString()) }
}
}
override suspend fun getAllOrders(pageable: Pageable): Page<Order> = coroutineScopeWithObservation(GET_ALL_ORDERS, or) { observation ->
withContext(Dispatchers.IO) {
val query = Query().with(pageable)
val data = async { mongoTemplate.find(query, OrderDocument::class.java).collectList().awaitSingle() }.await()
val count = async { mongoTemplate.count(Query(), OrderDocument::class.java).awaitSingle() }.await()
PageableExecutionUtils.getPage(data.map { it.toOrder() }, pageable) { count }
.also { observation.highCardinalityKeyValue("pageResult", it.pageable.toString()) }
}
}
}
Anda akan menemukan detail lebih lanjut dan kode sumber dari proyek lengkap di sini di repositori GitHub . Dalam aplikasi dunia nyata kita perlu mengimplementasikan banyak fitur lain yang diperlukan seperti pemeriksaan kesehatan k8s, pembatas tarif, dll. Bergantung pada proyeknya, ini dapat diimplementasikan dengan cara yang berbeda. Misalnya, Anda dapat menggunakan Kubernetes dan Istio untuk beberapa di antaranya. Saya harap artikel ini bermanfaat dan bermanfaat, dan saya menyambut setiap komentar atau pertanyaan. Jangan ragu untuk menghubungi Aku oleh Surel atau apapun utusan 🙂