Flazzo memiliki fokus utama untuk menambah nilai bisnis Anda.

Blog

Manajemen beban data simultan di tabel delta

18275403-thumb.jpg
Blog

Manajemen beban data simultan di tabel delta

[ad_1]

Pastikan entri simultan yang andal dengan opsi untuk dicoba lagi

Danau Delta adalah lapisan penyimpanan tangguh yang menawarkan transaksi asam, aplikasi skema, dan versi data. Namun, entri simultan menghasilkan afirmasi karena proses yang berbeda mencoba menulis, memperbarui atau menghapus pada saat yang sama. Proses ini menawarkan mekanisme untuk mencoba terstruktur dengan backoff eksponensial untuk mengelola persaingan di tabel Delta.

Delta Competient Table menulis pertanyaan

Kegagalan kompetisi terjadi ketika beberapa proses berusaha secara bersamaan untuk menulis ke tabel delta yang sama. Skenario kegagalan umum adalah sebagai berikut:

  • Kompetisi – Ketika pekerjaan simultan secara bersamaan menambahkan rekaman, dengan kunci yang bertentangan.
  • Pesaing OfleTerAreAxception – Jika suatu proses mencoba membaca data yang dihapus oleh proses lain.
  • Pesaing Desain – Ketika dua proses mencoba menghapus data yang sama secara bersamaan.

Pertanyaan -pertanyaan ini harus memiliki instalasi untuk mencoba di mana -mana lagi yang membuat entri terjadi dengan konsistensi.

Mekanisme mencoba yang diusulkan untuk entri tabel delta

Prosedur untuk mencoba streaming digunakan untuk mengurangi kegagalan penulisan simultan menggunakan backoff eksponensial.

Kode Python berikut menjelaskan prosesnya:

from datetime import datetime
from time import sleep
from delta.exceptions import (
    ConcurrentAppendException,
    ConcurrentDeleteReadException,
    ConcurrentDeleteDeleteException,
)
import math

def streaming_write_with_concurrent_retry(
    stream, max_attempts=3, indefinite=False, table=None, path=None
):
    """
    Handles concurrent write operations to a Delta table or path by retrying the operation
    in case of specific concurrent exceptions.

    :param stream: The data stream to be written.
    :param max_attempts: The maximum number of retry attempts. Default is 3.
    :param indefinite: If True, will keep retrying indefinitely. Default is False.
    :param table: The Delta table to write to.
    :param path: The path to write to.
    :return: The result of writer.awaitTermination().
    """

    attempt = 0  # Initialize attempt counter

    while True:
        try:
            # Choose the writer based on whether table or path is provided
            if table:
                writer = stream.table(table)
            elif path:
                writer = stream.start(path)
            else:
                writer = stream.start()

            # Attempt to write and wait for termination
            return writer.awaitTermination()

        # Handle concurrent exceptions
        except (
            ConcurrentAppendException,
            ConcurrentDeleteReadException,
            ConcurrentDeleteDeleteException,
        ) as e:

            # Increment attempt counter
            attempt += 1

            # If indefinite is False and attempts have reached max_attempts, raise the exception
            if not indefinite and attempt >= max_attempts:
                raise e from None

            # Calculate sleep time using exponential backoff strategy
            sleep_time = min(120, math.pow(2, attempt))

            # Log the retry attempt
            print(f"Retrying {attempt}/{max_attempts if not indefinite else '∞'} after {sleep_time} seconds due to {type(e).__name__}")

            # Sleep for the calculated time before retrying
            sleep(sleep_time)

Penjelasan tentang strategi mencoba lagi

Kebijakan untuk mencoba lagi mengikuti kemunduran eksponensial:

1. Identifikasi pengecualian

Fungsi penangkapan ConcurrentAppendException,, ConcurrentDeleteReadExceptionDan ConcurrentDeleteDeleteException.

2. Upaya dan keterbatasan Reese

Menarik kembali maksimal max_attempts kali sebelum dia gagal. ITU indefinite=True Parameter memungkinkan upaya tanpa akhir untuk sukses.

3. Perhitungan terbalik eksponensial

Formula Backoff: sleep_time = min(120, 2^attempt). Ini memastikan bahwa waktu tunggu untuk mencoba lagi dikembangkan secara eksponensial tetapi dibatasi pada maksimum 120 detik.

Contoh mencoba waktu tunggu:

  • Upaya 1 → Tunggu 2 detik
  • Upaya 2 → Tunggu selama 4 detik
  • Upaya 3 → Tunggu 8 detik
  • (hingga maksimum 120 detik)

Lanjutkan aliran

Setelah upaya baru yang berhasil, writer.awaitTermination() Memungkinkan pekerjaan streaming untuk terus bekerja.

Strategi alternatif Untuk manajemen kompetisi meja delta

Selain resolusi konflik berdasarkan mencoba lagi, Danau Delta menawarkan teknik tambahan:

1. Kontrol Kompetisi Optimis (OCS)

Danau Delta memeriksa konflik sebelum mengatur transaksi. Jika ada konflik, itu akan secara otomatis mencoba operasi.

2. Partisi data untuk isolasi

Operasi penulisan harus ditargetkan pada partisi lain untuk menghindari tabrakan.

df.write.format("delta").mode("overwrite").option("replaceWhere", "date="2025-02-17"").save("/mnt/delta/table")

Ini membatasi pembaruan dari satu partisi, mengurangi afirmasi.

3. Streaming dan Optimasi Mandiri

Memungkinkan Auto-Optimize Dan Auto-Compact::

ALTER TABLE delta.`/mnt/delta/table_name` SET TBLPROPERTIES (
  'delta.autoOptimize.optimizeWrite' = 'true',
  'delta.autoOptimize.autoCompact' = 'true'
)

Perubahan konfigurasi ini mengurangi file kecil dan meningkatkan kinerja penulisan simultan.

4. Gabungkan berdasarkan USERTS

Alih -alih sisipan langsung, gunakan MERGE Untuk mengelola konflik:

from delta.tables import DeltaTable

delta_table = DeltaTable.forPath(spark, "/mnt/delta/table_name")

delta_table.alias("target").merge(
    df.alias("source"),
    "target.id = source.id"
).whenMatchedUpdate(set={"target.value": "source.value"}).whenNotMatchedInsertAll().execute()

Proses ini menjamin resolusi konflik di jalur.

Pantau masalah kompetisi debugging

1. Periksa riwayat transaksi pemuatan data:

DESCRIBE HISTORY delta.`/mnt/delta/table_name`;

2. Tampilkan / periksa kunci aktif:

SHOW TBLPROPERTIES delta.`/mnt/delta/table_name`;

3. Aktifkan perubahan data perubahan (CDF) untuk memantau perubahan:

ALTER TABLE delta.`/mnt/delta/table_name` SET TBLPROPERTIES ('delta.enableChangeDataFeed' = true);

Kesimpulan

Tabel delta memungkinkan entri simultan dengan jaminan asam, tetapi konflik dimungkinkan.

  • Strategi yang didasarkan pada mencoba mencobanya dengan kemunduran eksponensial membantu mengurangi masalah kompetisi.
  • Partisi, MERGEDan Auto-Optimize Juga meningkatkan kinerja penulisan simultan.
  • Ikuti mekanisme -up seperti DESCRIBE HISTORY Dan CDF Ikuti konflik.

Dengan bergabung dengan praktik terbaik ini, proses ini dapat secara efektif melakukan entri delta simultan, mempertahankan integritas data dan mendapatkan optimasi kinerja.

[ad_2]