Flazzo memiliki fokus utama untuk menambah nilai bisnis Anda.

Blog

Memproduksi dan Mengkonsumsi Pesan Avro dengan Redpanda Schema Registry

16851955-thumb.jpg
Blog

Memproduksi dan Mengkonsumsi Pesan Avro dengan Redpanda Schema Registry


Jika Anda akrab dengan Apache Kafka®Anda mungkin menemukan registri skema yang kompatibel dengan Kafka, komponen terpisah yang Anda gunakan di luar klaster Kafka Anda, karena Kafka sendiri tidak memilikinya.

Pada dasarnya, skema adalah deskripsi logis tentang bagaimana data Anda diatur, dan oleh karena itu a daftar skema menyediakan pusat penyimpanan untuk skema ini, memungkinkan produsen dan konsumen untuk mengirim dan menerima data satu sama lain dengan mulus. Untuk arsitektur yang digerakkan oleh peristiwa, ini bisa menjadi kompleks dan sulit dikelola saat Anda menskalakan, karena skema data dapat berubah dan berkembang seiring waktu (berpotensi merusak banyak hal).

Di sinilah Redpanda masuk. Redpanda adalah platform data streaming berkemampuan Kafka API yang dirancang dari bawah ke atas agar cepat, sederhana, dan hemat biaya. Sejalan dengan misi ini, Redpanda hadir dengan skema registry bawaan sehingga Anda dapat menyimpan, mengontrol versi, dan memvalidasi skema aplikasi Anda secara real time tanpa menggunakan atau mengelola apa pun selain kluster Redpanda.

Untuk memberi Anda ikhtisar tentang cara kerjanya, artikel ini memandu Anda membuat pemroses aliran klik sederhana menggunakan Registri Skema Redpanda memproduksi dan mengkonsumsi Apache Avro™ pesan. Kami memilih Avro karena merupakan pilihan paling populer untuk serialisasi data dalam format biner yang ringkas dan mendukung evolusi skema. Tutorial ini hanya memiliki lima langkah dan semua yang Anda butuhkan ada di dalamnya repositori GitHub ini.

Siap? Ayo mulai.

Cara Membuat Aplikasi Pengolah Clickstream

Sudah waktunya untuk turun ke kode. Hanya dalam lima langkah, kita akan membuat klaster Redpanda node tunggal Konsol Redpanda dengan Docker Compose, lalu gunakan klien SDK yang kompatibel dengan registri skema (confluent-kafka) untuk membuat dan menggunakan pesan Avro ke Redpanda. SDK akan melakukan sebagian besar pekerjaan berat, termasuk pendaftaran skema dan pemeriksaan kompatibilitas skema.

Singkatnya, aplikasi Python produksi mengumpulkan peristiwa interaksi pengguna dari aplikasi web, membuat serialnya ke Avro, dan menerbitkannya ke topik clickstream Redpanda. Aplikasi Python lain mengkonsumsinya dari Redpanda, melakukan deserialisasi, dan menggunakannya untuk menganalisis perilaku pengguna.

Dari produsen ke konsumen

Format acara ClickStream contoh akan terlihat seperti ini:

{
   "user_id":2323,
   "event_type":"BUTTON_CLICK",
   "ts":"2018-11-12 01:02:03.123456789"
}

Untuk meniru kasus penggunaan di atas, kami akan menulis produser dan konsumen sederhana dengan Python yang menghasilkan dan menggunakan pesan dari topik clickstream Redpanda. Mereka akan menggunakan API REST PandaProxy untuk berkomunikasi dengan registri skema.

Sebelum Anda mulai, pastikan Anda memilikinya Desktop Docker dan Python 3 (dengan pip) terpasang di mesin lokal Anda.

1. Kloning repositori GitHub

Repositori GitHub ini berisi beberapa artefak untuk memulai tutorial. Ini akan membantu Anda memulai dengan segera. Kami akan membuat artefak kode tambahan saat kami melanjutkan tutorial.

Jalankan perintah berikut untuk mengkloning repositori ke komputer lokal Anda.

git clone  code
cd code

2. Instal dependensi Python

Kami akan menginstal beberapa pustaka Python yang digunakan oleh klien Python yang akan Anda lihat nanti di tutorial ini. Anda akan menemukannya di file requirements.txt di tingkat akar repositori. Untuk mencegah konflik dengan dependensi lokal Anda, mari buat lingkungan virtual baru untuk mereka Jum.

Jalankan perintah berikut di jendela terminal.

python3 -m venv env
source env/bin/activate
pip install --upgrade pip
pip install -r requirements.txt

Perhatikan juga bahwa kami menggunakan konfluen-kafka Python SDK untuk semua komunikasi API dengan Redpanda Schema Registry. Ini adalah SDK yang kompatibel dengan Registri Skema yang juga kompatibel dengan Registri Skema Confluent. Untuk alasan ini, confluent-kafka akan melakukan banyak pekerjaan untuk kita, seperti menambahkan padding untuk setiap pesan yang terdiri dari byte ajaib dan schemaID. Juga, secara otomatis dapat mendaftarkan skema dengan registri.

Manfaat lainnya adalah Anda menggunakan Redpanda Schema Registry dengan klien Confluent SDK Anda, tanpa perlu mengubah kode apa pun.

3. Mulai klaster Redpanda

Kemudian kita akan menggunakan Komposer Docker untuk membuat kluster Redpanda.

Temukan itu docker-compose.yml file di tingkat root dari repositori kloning dan jalankan perintah berikut di terminal.

Ini akan membuat cluster Redpanda node tunggal dengan Konsol Redpanda. Node Redpanda ini berisi registri skema bawaan. Anda dapat secara visual menjelajahi definisi skema yang disimpan dalam registri skema dengan konsol Redpanda.

Akses konsol dengan masuk ke Klik Registri Skema di bilah sisi untuk melihat definisi skema.

Registri Skema Redpanda

Anda akan melihat layar kosong karena kami memulai cluster dari awal.

4. Tulis kode produsen

Sekarang kami memiliki kluster Redpanda yang berfungsi, langkah selanjutnya adalah menghasilkan pesan dalam format Avro.

Producer.py di repositori kloning berisi kode Python untuk menghasilkan acara. Kontennya akan terlihat seperti ini:

import json
from uuid import uuid4
from confluent_kafka import KafkaException
from confluent_kafka.avro import AvroProducer 
from confluent_kafka import avro

def delivery_callback(error, message):
    if error:
        print("Failed to send the message: %s" % error)
    else:
        print(f"Message with the key {message.key()} has been produced to the topic {message.topic()}")

def load_avro_schema_from_file():
    key_schema_string = """
    {"type": "string"}
    """

    key_schema = avro.loads(key_schema_string)
    value_schema = avro.load('./schemas/click_event.avsc')

    return key_schema, value_schema

def produce():
    config = {
        'bootstrap.servers' : "localhost:9092",
        'schema.registry.url' : "
    }

    key_schema, value_schema = load_avro_schema_from_file()
    
    producer = AvroProducer(
        config,
        default_key_schema = key_schema,
        default_value_schema = value_schema
    )

    try:
        key = str(uuid4())
        value_str="{"user_id":2,"event_type":"CLICK","ts":"2021-12-12"}"
        value = json.loads(value_str) 

        producer.produce(
            topic = "clickstream",
            key = key,
            value = value,
            on_delivery = delivery_callback
        )

        producer.poll(10000)
        producer.flush()
    
    except KafkaException as e:
        print("Error occurred during message production:", e)
    
    print("Done!")

def main():
    produce()

if __name__ == "__main__":
    main()

Sebagian besar metode hanyalah kode boilerplate dan cukup jelas. Jadi mari kita tinjau metode paling penting untuk serialisasi.

Pertama, kami meneruskan URL skema ke klien SDK dengan menyetel properti konfigurasi, schema.registry.url

Kemudian load_avro_schema_from_file() Metode mengembalikan dua skema untuk peristiwa ClickStream: skema kunci dan skema nilai.

def load_avro_schema_from_file():
    key_schema_string = """
    {"type": "string"}
    """

    key_schema = avro.loads(key_schema_string)
    value_schema = avro.load('./schemas/click_event.avsc')

    return key_schema, value_schema

Perhatikan bahwa skema nilai dimuat dari skema/click_event.avsc file di repositori. File ini berisi definisi skema Avro berikut yang mendefinisikan struktur acara ClickStream.

{
    "type" : "record",
    "namespace" : "com.redpanda.examples.avro",
    "name" : "ClickEvent",
    "fields" : [
       { "name": "user_id", "type" : "int" },
       { "name": "event_type", "type" : "string" },
       { "name": "ts", "type": "string" }
    ]
 }

Setelah dua skema diturunkan, mereka diteruskan sebagai argumen ke serializer SDK (AvroProducer). Jika Anda ingat alur kerja serialisasi di atas, di sinilah tempatnya AvroProducer meminta schemaID untuk topik tersebut, KlikAcara. Di sini, nama subjek diambil dari bidang nama, yang ditentukan dalam definisi skema Avro.

producer = AvroProducer(
        config,
        default_key_schema = key_schema,
        default_value_schema = value_schema
    )

Saat kode produser dijalankan untuk pertama kalinya, file AvroProducer secara otomatis mendaftarkan skema ClickEvent di registri skema dan mengambil schemaID, yang kebetulan adalah 1. ID skema ini unik di kluster Redpanda, dan Anda dapat menggunakannya untuk mengambil skema nanti.

Pemanggilan selanjutnya akan membaca schemaID dari cache.

Kemudian jalankan file tersebut di terminal untuk menghasilkan pesan Avro.

Masuk ke Konsol Redpanda Topik halaman untuk melihat apakah clickstream topik diisi dengan satu peristiwa.

Cuplikan layar halaman Topik di Konsol Redpanda

Cuplikan layar halaman Topik di Konsol Redpanda

Perhatikan bahwa konsol Redpanda dapat membatalkan serialisasi pesan Avro untuk Anda, menunjukkan kepada Anda konten payload (nilai) peristiwa clickstream karena kami menggunakan SDK Python yang kompatibel dengan registri skema. Karena klien produsen dan konsol menggunakan registri skema yang sama, konsol dapat menentukan skema mana yang akan digunakan untuk deserialisasi dengan memeriksa bagian schemaID yang dibawa di setiap pesan.

Kemudian login ke Konsol Redpanda Daftar skema halaman untuk memverifikasi pendaftaran skema. Anda akan melihat clickstream-key dan clickstream-value definisi skema telah terdaftar di registri skema.

Cuplikan layar halaman Registri Skema di Konsol Redpanda

Sekarang kami memiliki pesan berformat Avro di clickstream subjek, mari deserialize mereka dengan konsumen Python.

ITU konsumen.py File dalam repositori berisi kode untuk menggunakan topik aliran klik, deserialisasi pesan, dan mencetak kontennya ke terminal. Isi file akan terlihat seperti ini:

import json
from confluent_kafka import KafkaException
from confluent_kafka.avro import AvroConsumer
from confluent_kafka import avro 

def consume():
    config = {
        "bootstrap.servers": "localhost:9092",
        "schema.registry.url": ",
        "group.id": "my-connsumer1",
        "auto.offset.reset": "earliest"
    }

    consumer = AvroConsumer(config)
    consumer.subscribe(["clickstream"])

    while True:
      try:
        msg = consumer.poll(1)

        if msg is None:
          continue

        print("Key is :" + json.dumps(msg.key()))
        print("Value is :" + json.dumps(msg.value()))
        print("-------------------------")

      except KafkaException as e:
        print('Kafka failure ' + e)

    consumer.close()

def main():
    consume()

if __name__ == '__main__':
    main()

Kode ini cukup jelas. Seperti yang kami lakukan dengan produser, URL registri skema dikonfigurasikan dengan objek konfigurasi dan diteruskan ke deserializer Avro, AvroConsumeryang berlangganan ke clickstream subjek.

Hanya itu yang perlu Anda ketahui. Sisanya, termasuk penemuan ID skema, pengambilan skema, dan akhirnya deserialisasi akan diurus oleh AvroConsumer.

Jalankan file di terminal.

Anda akan melihat satu peristiwa dikembalikan, dengan kontennya dideserialisasi sebagai berikut.

Key is :"39950858-1cfd-4d56-a3ac-2bde1c806f6f"
Value is :{"user_id": 2, "event_type": "CLICK", "ts": "2021-12-12"}

Sederhanakan Pendaftaran Skema di Kafka dengan Redpanda

Jika Anda sudah sampai sejauh ini, beri tepuk tangan pada diri Anda sendiri karena Anda baru saja menggunakan registri skema bawaan Redpanda untuk membuat dan menggunakan pesan Avro! Ini hanyalah salah satu cara kerja Redpanda untuk membuat streaming data di Kafka lebih cepat dan lebih mudah. Tidak ada binari baru untuk diinstal, tidak ada layanan baru untuk digunakan dan dipelihara, dan konfigurasi default hanya bekerja.

Jangan ragu untuk menyesuaikan lebih lanjut contoh pemrosesan clickstream ini dan bermain dengan format skema data lainnya, seperti Protobuf. Untuk mempelajari lebih lanjut tentang Redpanda, lihat dokumentasi kami.