שיפור עיבוד הנתונים עם Spark 3.0 ו- Delta Lake

צומת המקור: 1013539

איסוף, עיבוד וביצוע ניתוח על הזרמת נתונים, בתעשיות כמו אד-טק כרוכה בהנדסת נתונים אינטנסיבית. הנתונים שנוצרים מדי יום הם עצומים (100 אלפי נתונים של GB) ודורשים זמן עיבוד משמעותי כדי לעבד את הנתונים לשלבים הבאים.

אתגר נוסף הוא הצטרפות של מערכי נתונים כדי להפיק תובנות. לכל תהליך בממוצע יש יותר מ-10 מערכי נתונים ומספר שווה של חיבורים עם מספר מפתחות. גודל המחיצה עבור כל מפתח אינו ניתן לחיזוי בכל הפעלה.

ולבסוף, אם כמות הנתונים עולה בהזדמנויות מסוימות, הזיכרון עלול להיגמר באחסון. המשמעות היא שהתהליך ימות באמצע הכתיבה הסופית, מה שיגרום לצרכנים לקרוא בצורה ברורה את מסגרות נתוני הקלט.

בבלוג זה נסקור סקירה של אגמי דלתא, היתרונות שלו וכיצד ניתן להתגבר על האתגרים לעיל על ידי מעבר לאגם Delta והגירה ל-Spark 3.0 מ-Spark 2.4. 

מהו אגם דלתא?

פותח ב-Databricks, "Delta Lake היא שכבת אחסון נתונים בקוד פתוח הפועלת על Data Lake הקיים ומשתפת פעולה באופן מלא עם אפאצ 'י ספארק ממשקי API. יחד עם היכולת ליישם עסקאות ACID וטיפול במטא נתונים מדרגיים, דלתא לייקס יכולה גם לאחד את ההזרמה ועיבוד הנתונים האצווה". 

דלתא לייק משתמשת בקבצי Parquet בגרסה לאחסון נתונים בענן. לאחר הגדרת מיקום הענן, Delta Lake עוקבת אחר כל השינויים שבוצעו בטבלה או בספריית חנות הכתם כדי לספק עסקאות ACID. 

יתרונות השימוש בדלתא לייקס 

Delta lake מאפשר לאלפי נתונים לרוץ במקביל, לתת מענה לאתגרי אופטימיזציה ומחיצות, פעולות מטא-נתונים מהירות יותר, שומרת על יומן עסקאות וממשיכה לעדכן את הנתונים באופן רציף. להלן נדון בכמה יתרונות מרכזיים: 

יומן עסקאות של אגם Delta

יומני עסקאות של אגם Delta הם קובץ הוספה בלבד ומכיל רישום מסודר של כל העסקאות שבוצעו בטבלה של אגם Delta. יומן העסקאות מאפשר למשתמשים שונים לקרוא ולכתוב לטבלה הנתונה במקביל. הוא פועל כמקור אמת יחיד או המאגר המרכזי שמתעד את כל השינויים שבוצעו בטבלה על ידי המשתמש. הוא שומר על אטומיות וצופה ברציפות בעסקאות המבוצעות באגם דלתא.

כפי שהוזכר לעיל, Spark בודק את יומן הדלתא עבור כל עסקאות חדשות, ולאחר מכן Delta Lake מבטיחה שגרסת המשתמש תמיד מסונכרנת עם רשומת האב. זה גם מבטיח שלא יבוצעו שינויים סותרים בטבלה. אם התהליך קורס לפני עדכון יומן הדלתא, הקבצים לא יהיו זמינים לאף תהליכי קריאה מכיוון שהקריאות תמיד עוברות ביומן העסקאות.


יומן עסקאות עבודה והתחייבויות אטומיות

אגם דלתא עושה מחסום על כל עשר פעולות. קובץ המחסום מכיל את המצב הנוכחי של הנתונים בפורמט Parquet שניתן לקרוא במהירות. כאשר משתמשים מרובים מנסים לשנות את הטבלה בו-זמנית, Delta Lake פותר את הקונפליקטים באמצעות בקרת מקבילות אופטימית.

הסכימה של המטא נתונים היא כדלקמן: 

טור סוּג תיאור
פוּרמָט מחרוזת פורמט הטבלה, כלומר "דלתא".
id מחרוזת מזהה ייחודי של הטבלה
שם מחרוזת שם הטבלה כפי שהוגדרה ב- metastore
תיאור מחרוזת תיאור הטבלה.
מיקום מחרוזת מיקום השולחן
נוצר ב חותם כאשר הטבלה נוצרה
שונה לאחרונה חותם מתי הטבלה שונתה לאחרונה
מחיצהעמודות מערך מיתרים שמות של עמודות המחיצה אם הטבלה מחולקת
numFiles ארוך מספר הקבצים בגרסה העדכנית ביותר של הטבלה
נכסים מפת מחרוזת כל המאפיינים שהוגדרו לטבלה זו
minReaderVersion int גרסה מינימלית של קוראים (לפי פרוטוקול היומן) שיכולים לקרוא את הטבלה.
minWriterVersion int גרסה מינימלית של קוראים (לפי פרוטוקול היומן) שיכולים לכתוב לטבלה.
מקור: GitHub

הוסף והסר קובץ

בכל פעם שמתווסף קובץ או מסירים קובץ קיים, פעולות אלו נרשמות ביומן. נתיב הקובץ הוא ייחודי ונחשב כמפתח העיקרי לקבוצת הקבצים שבתוכו. כאשר קובץ חדש נוסף על נתיב שכבר קיים בטבלה, סטטיסטיקות ומטא נתונים אחרים על הנתיב מתעדכנים מהגרסה הקודמת. באופן דומה, פעולת ההסרה מסומנת על ידי חותמת זמן. פעולת הסרה נשארת בטבלה כמצבה עד שתפוגה. מצבה פג כאשר TTL (Time-To-Live) חורג.

מכיוון שפעולות בתוך קובץ דלתא נתון לא מובטחות שיושמו לפי הסדר, זה לא תקף עבור פעולות קבצים מרובות עם אותו נתיב להתקיים בגרסה אחת.

ניתן להגדיר את דגל ה-dataChange ב-'הוסף' או 'הסר' ל-false כדי למזער את התנגשויות הפעולות במקביל.

הסכימה של פעולת ההוספה היא כדלקמן:

שם שדה סוג מידע תיאור
נתיב מחרוזת נתיב יחסי, משורש הטבלה, לקובץ שאמור להתווסף לטבלה
partitionValues מפה[String,String] מפה מעמודת מחיצה לערך עבור קובץ זה. 
גודל ארוך גודל הקובץ הזה בבתים
זמן שינוי ארוך הזמן שבו הקובץ הזה נוצר, כאלפיות שניות מאז התקופה
שינוי נתונים בוליאני כאשר false הקובץ כבר חייב להיות קיים בטבלה או שהרשומות בקובץ שנוסף חייבות להיכלל בפעולת הסרה אחת או יותר באותה גרסה
סטטיסטיקות מבנה סטטיסטיקה מכיל נתונים סטטיסטיים (לדוגמה, ספירה, ערכי מינימום/מקסימום עבור עמודות) לגבי הנתונים בקובץ זה
תיוגים מפה[String,String] מפה המכילה מטא נתונים על קובץ זה

הסכימה של פעולת ההסרה היא כדלקמן:

שם שדה נתונים סוּג תיאור
נתיב מחרוזת נתיב מוחלט או יחסי לקובץ שיש להסיר מהטבלה
מחיקה חותמת זמן ארוך הזמן שבו התרחשה המחיקה, מיוצג באלפיות שניות מאז התקופה
שינוי נתונים בוליאני כאשר False הרשומות בקובץ שהוסר חייבות להיכלל בפעולת הוספה אחת או יותר באותה גרסה
extendedFileMetadata בוליאני כאשר נכון, השדות partitionValues, גודל ותגים קיימים
partitionValues מפה[מחרוזת, מחרוזת] מפה מעמודת מחיצה לערך עבור קובץ זה. ראה גם סדרת ערכי מחיצה
גודל ארוך גודל הקובץ הזה בבתים
תיוגים מפה[מחרוזת, מחרוזת] מפה המכילה מטא נתונים על קובץ זה
מקור: GitHub

הסכימה של המטא-נתונים מכילה את נתיב הקובץ בכל פעולת הוספה/הסרה ותהליך הקריאה של Spark אינו צריך לבצע סריקה מלאה כדי לקבל את רשימות הקבצים.

אם כתיבה נכשלת מבלי לעדכן את יומן העסקאות, מכיוון שקריאת הצרכן תעבור תמיד דרך המטא נתונים, יתעלמו מהקבצים הללו. 

יתרונות המעבר ל-Spark 3.0

מלבד מינוף היתרונות של Delta Lake, המעבר ל-Spark 3.0 שיפר את עיבוד הנתונים בדרכים הבאות:

אופטימיזציית הצטרפות מוטה

הטיית נתונים הוא מצב שבו נתוני הטבלה מחולקים בצורה לא אחידה בין המחיצות באשכול ועלולים לשדרג לאחור בצורה חמורה את הביצועים של שאילתות, במיוחד אלה עם חיבורים. עיוות יכול להוביל לחוסר איזון קיצוני באשכול ובכך להגדיל את זמן עיבוד הנתונים.

ניתן לטפל במצב הטיית הנתונים בעיקר על ידי שלוש גישות.

  1. שימוש בתצורה "spark.sql.shuffle.partitions" להגברת ההקבלה על נתונים מפוזרים באופן שווה יותר.
  2. הגדלת סף הצטרפות הגיבוב לשידור באמצעות התצורה spark.sql.autoBroadcastJoinThreshold לגודל המקסימלי בבייטים עבור הטבלה שיש לשדר לכל צמתי העבודה במהלך ביצוע הצטרפות.
  3. המלחת מפתחות (הוסף קידומת למפתחות המוטות כדי להפוך את אותו מפתח לשונה ולאחר מכן התאם את התפלגות הנתונים).

Spark 3.0 הוסיפה אופטימיזציה לטיפול אוטומטי ב-Skew join המבוסס על נתונים סטטיסטיים של זמן הריצה עם מסגרת הביצוע האדפטיבית החדשה.

מצב מחיצה מוטה

לאתגר של מחיצות מוטות שהיה קיים בגרסה הקודמת של Spark 2.4 הייתה השפעה עצומה על זמן הרשת וזמן הביצוע של משימה מסוימת. יתרה מכך, השיטות להתמודד איתו היו לרוב ידניות. Spark 3.0 מתגבר על האתגרים הללו.

למחיצה המוטה תהיה השפעה על תעבורת הרשת ועל זמן ביצוע המשימה, מכיוון שלמשימה הספציפית הזו יהיו הרבה יותר נתונים לעבד. אתה גם צריך לדעת איך זה משפיע על אבטחת הסייבר, שכן נפח התעבורה ברשת הוא משהו שהאקרים מנצלים אותו.

מחיצת ההצטרפות המוטה מחושבת לפי גודל הנתונים וספירת השורות מתוך הנתונים הסטטיסטיים של מפת זמן הריצה.

אופטימיזציה

הסתגלות מ:אפאצ'י ספארק ג'ירה

מהטבלה שלמעלה, מסעות הפרסום של Dataframe מצטרפים לארגוני Dataframe. אחת המחיצות (מחיצה 0) של ארגונים גדולה ומעוותת. מחיצה 0 היא תוצאה של 9 מפות מהשלב הקודם (מפה-0 עד מפה-8). כלל OptimizeSkewedJoin של Spark יפצל את המחיצה ל-3 ולאחר מכן יצור 3 משימות נפרדות שכל אחת מהן היא מחיצה חלקית ממחיצה 0 (Map-0 למפה-2, Map-3 to Map-5, ומפה-6 עד Map-9) ומצטרף למחיצת מסעות הפרסום 0. גישה זו גורמת לעלות נוספת על ידי קריאת מחיצה 0 של מסעות פרסום בטבלה השווה למספר המחיצות החלקיות מארגון הטבלה.

תוצאה סופית

באמצעות Delta Lake ו-Spark 3.0, אפשרנו את התוצאות הבאות עבור חברת טכנולוגיית הפרסום:

  • זמן עיבוד הנתונים הצטמצם מ-15 שעות ל-5-6 שעות
  • הפחתה של 50% בעלות AWS EMR
  • מניעת אובדן נתונים ומוות של תהליכים שהיה אירוע תדיר כאשר המערכת יצאה מהזיכרון או שהעיבוד הופסק עקב תקלה במערכת
  • תכונות ניטור והתראה הותקנו כדי להודיע ​​במקרה שהתהליך נכשל
  • תזמור שלם באמצעות Airflow להשגת אוטומציה מלאה וניהול תלות בין תהליכים

מקור: https://www.smartdatacollective.com/improving-data-processing-with-spark-3-delta-lake/

בול זמן:

עוד מ קולקטיב SmartData