Manajemen beban data simultan di tabel delta

Manajemen beban data simultan di tabel delta
[ad_1]
Pastikan entri simultan yang andal dengan opsi untuk dicoba lagi
Danau Delta adalah lapisan penyimpanan tangguh yang menawarkan transaksi asam, aplikasi skema, dan versi data. Namun, entri simultan menghasilkan afirmasi karena proses yang berbeda mencoba menulis, memperbarui atau menghapus pada saat yang sama. Proses ini menawarkan mekanisme untuk mencoba terstruktur dengan backoff eksponensial untuk mengelola persaingan di tabel Delta.
Delta Competient Table menulis pertanyaan
Kegagalan kompetisi terjadi ketika beberapa proses berusaha secara bersamaan untuk menulis ke tabel delta yang sama. Skenario kegagalan umum adalah sebagai berikut:
- Kompetisi – Ketika pekerjaan simultan secara bersamaan menambahkan rekaman, dengan kunci yang bertentangan.
- Pesaing OfleTerAreAxception – Jika suatu proses mencoba membaca data yang dihapus oleh proses lain.
- Pesaing Desain – Ketika dua proses mencoba menghapus data yang sama secara bersamaan.
Pertanyaan -pertanyaan ini harus memiliki instalasi untuk mencoba di mana -mana lagi yang membuat entri terjadi dengan konsistensi.
Mekanisme mencoba yang diusulkan untuk entri tabel delta
Prosedur untuk mencoba streaming digunakan untuk mengurangi kegagalan penulisan simultan menggunakan backoff eksponensial.
Kode Python berikut menjelaskan prosesnya:
from datetime import datetime
from time import sleep
from delta.exceptions import (
ConcurrentAppendException,
ConcurrentDeleteReadException,
ConcurrentDeleteDeleteException,
)
import math
def streaming_write_with_concurrent_retry(
stream, max_attempts=3, indefinite=False, table=None, path=None
):
"""
Handles concurrent write operations to a Delta table or path by retrying the operation
in case of specific concurrent exceptions.
:param stream: The data stream to be written.
:param max_attempts: The maximum number of retry attempts. Default is 3.
:param indefinite: If True, will keep retrying indefinitely. Default is False.
:param table: The Delta table to write to.
:param path: The path to write to.
:return: The result of writer.awaitTermination().
"""
attempt = 0 # Initialize attempt counter
while True:
try:
# Choose the writer based on whether table or path is provided
if table:
writer = stream.table(table)
elif path:
writer = stream.start(path)
else:
writer = stream.start()
# Attempt to write and wait for termination
return writer.awaitTermination()
# Handle concurrent exceptions
except (
ConcurrentAppendException,
ConcurrentDeleteReadException,
ConcurrentDeleteDeleteException,
) as e:
# Increment attempt counter
attempt += 1
# If indefinite is False and attempts have reached max_attempts, raise the exception
if not indefinite and attempt >= max_attempts:
raise e from None
# Calculate sleep time using exponential backoff strategy
sleep_time = min(120, math.pow(2, attempt))
# Log the retry attempt
print(f"Retrying {attempt}/{max_attempts if not indefinite else '∞'} after {sleep_time} seconds due to {type(e).__name__}")
# Sleep for the calculated time before retrying
sleep(sleep_time)
Penjelasan tentang strategi mencoba lagi
Kebijakan untuk mencoba lagi mengikuti kemunduran eksponensial:
1. Identifikasi pengecualian
Fungsi penangkapan ConcurrentAppendException
,, ConcurrentDeleteReadException
Dan ConcurrentDeleteDeleteException
.
2. Upaya dan keterbatasan Reese
Menarik kembali maksimal max_attempts
kali sebelum dia gagal. ITU indefinite=True
Parameter memungkinkan upaya tanpa akhir untuk sukses.
3. Perhitungan terbalik eksponensial
Formula Backoff: sleep_time = min(120, 2^attempt)
. Ini memastikan bahwa waktu tunggu untuk mencoba lagi dikembangkan secara eksponensial tetapi dibatasi pada maksimum 120 detik.
Contoh mencoba waktu tunggu:
- Upaya 1 → Tunggu 2 detik
- Upaya 2 → Tunggu selama 4 detik
- Upaya 3 → Tunggu 8 detik
- (hingga maksimum 120 detik)
Lanjutkan aliran
Setelah upaya baru yang berhasil, writer.awaitTermination()
Memungkinkan pekerjaan streaming untuk terus bekerja.
Strategi alternatif Untuk manajemen kompetisi meja delta
Selain resolusi konflik berdasarkan mencoba lagi, Danau Delta menawarkan teknik tambahan:
1. Kontrol Kompetisi Optimis (OCS)
Danau Delta memeriksa konflik sebelum mengatur transaksi. Jika ada konflik, itu akan secara otomatis mencoba operasi.
2. Partisi data untuk isolasi
Operasi penulisan harus ditargetkan pada partisi lain untuk menghindari tabrakan.
df.write.format("delta").mode("overwrite").option("replaceWhere", "date="2025-02-17"").save("/mnt/delta/table")
Ini membatasi pembaruan dari satu partisi, mengurangi afirmasi.
3. Streaming dan Optimasi Mandiri
Memungkinkan Auto-Optimize
Dan Auto-Compact
::
ALTER TABLE delta.`/mnt/delta/table_name` SET TBLPROPERTIES (
'delta.autoOptimize.optimizeWrite' = 'true',
'delta.autoOptimize.autoCompact' = 'true'
)
Perubahan konfigurasi ini mengurangi file kecil dan meningkatkan kinerja penulisan simultan.
4. Gabungkan berdasarkan USERTS
Alih -alih sisipan langsung, gunakan MERGE
Untuk mengelola konflik:
from delta.tables import DeltaTable
delta_table = DeltaTable.forPath(spark, "/mnt/delta/table_name")
delta_table.alias("target").merge(
df.alias("source"),
"target.id = source.id"
).whenMatchedUpdate(set={"target.value": "source.value"}).whenNotMatchedInsertAll().execute()
Proses ini menjamin resolusi konflik di jalur.
Pantau masalah kompetisi debugging
1. Periksa riwayat transaksi pemuatan data:
DESCRIBE HISTORY delta.`/mnt/delta/table_name`;
2. Tampilkan / periksa kunci aktif:
SHOW TBLPROPERTIES delta.`/mnt/delta/table_name`;
3. Aktifkan perubahan data perubahan (CDF) untuk memantau perubahan:
ALTER TABLE delta.`/mnt/delta/table_name` SET TBLPROPERTIES ('delta.enableChangeDataFeed' = true);
Kesimpulan
Tabel delta memungkinkan entri simultan dengan jaminan asam, tetapi konflik dimungkinkan.
- Strategi yang didasarkan pada mencoba mencobanya dengan kemunduran eksponensial membantu mengurangi masalah kompetisi.
- Partisi,
MERGE
DanAuto-Optimize
Juga meningkatkan kinerja penulisan simultan. - Ikuti mekanisme -up seperti
DESCRIBE HISTORY
DanCDF
Ikuti konflik.
Dengan bergabung dengan praktik terbaik ini, proses ini dapat secara efektif melakukan entri delta simultan, mempertahankan integritas data dan mendapatkan optimasi kinerja.
[ad_2]