نسق خطوط أنابيب XGBoost ML مع تدفقات عمل Amazon المُدارة لتدفق Apache Airflow

عقدة المصدر: 1858893

أصبحت القدرة على توسيع نطاق عمليات التعلم الآلي (MLOps) في المؤسسة بسرعة ميزة تنافسية في الاقتصاد الحديث. عندما بدأت الشركات في الانخراط في تعلم الآلة، كان التركيز فقط على حالات الاستخدام ذات الأولوية القصوى. تطالب الشركات الآن بالمزيد من ممارسي التعلم الآلي: المزيد من الميزات الذكية، وتقديمها بشكل أسرع، وصيانتها باستمرار مع مرور الوقت. تتطلب استراتيجية MLOps الفعالة نظامًا أساسيًا موحدًا يمكنه تنسيق وأتمتة معالجة البيانات المعقدة ومهام التعلم الآلي، ويتكامل مع أحدث الأدوات لإكمال هذه المهام على أفضل وجه.

يوضح هذا المنشور قيمة الاستخدام تدفقات عمل أمازون المدارة لتدفق أباتشي (Amazon MWAA) لتنظيم خط أنابيب ML باستخدام البرنامج الشائع XGBoost خوارزمية (تعزيز التدرج الشديد). للحصول على إمكانات MLOps أكثر تقدمًا وشمولاً، بما في ذلك إطار عمل تنسيق النموذج المبني لهذا الغرض وخدمة التكامل المستمر والتسليم المستمر (CI/CD) لتعلم الآلة، ننصح القراء بمراجعة خطوط أنابيب Amazon SageMaker.

لماذا تدفق الهواء للتنسيق

يختار العملاء أباتشي تدفق الهواء وعلى وجه التحديد Amazon MWAA لعدة أسباب، ولكن ثلاثة منها تبرز:

  • يعتمد تدفق الهواء على لغة بايثون - تتمتع Airflow، باعتبارها أداة تعتمد على لغة Python، بفوائد نموذج البرمجة الحتمية. وهذا يمكّن المطورين من تحديد برمجياً كيف المهام التي يتعين القيام بها. الأدوات التصريحية، مثل وظائف خطوة AWS، اسمح لك فقط بالتعريف ماذا يجب القيام به. عند تنسيق مسارات تعلم الآلة، غالبًا ما تكون القدرة على تحديد تدفق التحكم بشكل مباشر مطلوبة للتنقل في مسارات العمل المعقدة.
  • إدارة سير عمل الرسم البياني الحلقي الموجه (DAG). - يوفر Airflow واجهة DAG كآلية بسيطة لتحديد وتشغيل عمليات سير العمل المعقدة مع التبعيات. يتم تصور سير عمل DAG من خلال واجهة المستخدم الرسومية لإدارة العمليات.
  • المدودية قابلى المد - يوفر مشغلو تدفق الهواء طريقة منظمة لأداء المهام الشائعة باستخدام وحدات قابلة لإعادة الاستخدام. هذه الإمكانية قابلة للتوسيع ويتمتع مقدمو الخدمة بالحرية في تطوير مشغلي Airflow المخصصين الذين يتكاملون مع أدواتهم وخدماتهم. يتم دعم العديد من الخدمات السحابية. توفر عوامل التشغيل هذه تجريدًا مفيدًا وقابلية للتكرار وواجهة برمجة التطبيقات (API). في سياق البيانات الضخمة وتعلم الآلة، تعتبر هذه العوامل ذات قيمة خاصة لأنها توفر طريقة لتنسيق خطوط أنابيب البيانات طويلة المدى أحيانًا أو عمليات تعلم الآلة غير المتزامنة مثل التدريب النموذجي.

قم بإعداد بيئة Amazon MWAA

لإنشاء بيئة Amazon MWAA الخاصة بك، أكمل الخطوات التالية:

  1. في وحدة تحكم Amazon MWAA ، اختر خلق البيئة.
  2. في حالة الاسم، أدخل اسمًا فريدًا.
  3. في حالة نسخة تدفق الهواء، اختر الإصدار الذي تريد استخدامه. في هذا المنشور، نستخدم Airflow v2.0.2. نقوم أيضًا بتضمين رمز لـ Airflow v1.10.12.

  1. في مجلة رمز Dag في Amazon S3 قسم ، حدد خدمة تخزين أمازون البسيطة (Amazon S3) حيث يمكن لـ Amazon MWAA العثور على DAGs، plugins.zip ملف و requirements.txt ملف.

تكوين تدفق الهواء لـ XGBoost

يتطلب نموذج XGBoost تكوينًا محددًا في بيئة تدفق الهواء المُدار. ال core.enable_xcom_pickling يجب تعيين المعلمة على True. والسبب في ذلك هو أن نموذج XGBoost المُدرب يحتاج إلى إجراء تسلسل لحفظه كملف في Amazon S3. لا يمكن إجراء تسلسل لبعض كائنات Python (مثل التاريخ والوقت) دون تحويل التسلسل الهرمي لكائنات Python إلى دفق بايت من خلال عملية تسمى تخليل.

ملف المتطلبات.txt

تحميل requirements.txt الملف إلى موقع Amazon S3 الذي حددته في إعداد Amazon MWAA. لدعم هذه التظاهرة، قام requirements.txt يجب أن يحتوي الملف على الإدخالات التالية:

boto3==1.17.49
sagemaker==1.72.0
s3fs==0.5.1

قم بتنسيق خط أنابيب XGBoost ML

خط أنابيب ML الخاص بنا عبارة عن خط أنابيب مبسط من ثلاث خطوات:

  1. المعالجة المسبقة للبيانات باستخدام غراء AWS. قد تتطلب خطوط الأنابيب الحقيقية العديد من خطوات المعالجة لتنظيف البيانات وإبراز الهندسة. بالرغم من خطوط أنابيب Amazon SageMaker يوفر وظيفة مماثلة، فنحن نستخدم AWS Glue لتوضيح كيفية تنسيق خدمات AWS المختلفة أو أدوات وخدمات الطرف الثالث في مسار واحد.
  2. قم بتدريب نموذج XGBoost باستخدام مهمة تدريب SageMaker.
  3. انشر النموذج المُدرب كنقطة نهاية للاستدلال في الوقت الفعلي.

هندسة الحل

يظهر خط أنابيب ML الخاص بنا في الرسم البياني التالي. نحن نستخدم AWS لامدا إلى استدعاء DAGs باستخدام وظيفة Lambda. نحن نستخدم أيضا أمازون إيفينت بريدج لتشغيل وظائف Lambda. لمزيد من المعلومات، راجع البرنامج التعليمي: جدولة وظائف AWS Lambda باستخدام EventBridge.

قم بإعداد البرنامج النصي AWS Glue

في العرض التوضيحي الخاص بنا، نقوم بإنشاء مهمة AWS Glue ديناميكيًا باستخدام البرنامج النصي PySpark المحفوظ في Amazon S3. انسخ ال glue_etl.py الملف المقدم في الريبو كود المصدر إلى موقع Amazon S3.

قم بتعيين قيم تكوين DAG

لتبسيط الأمور، نستخدم config.py ملف لاستيراد أي تكوينات خاصة بالبيئة بدلاً من تعريفه في البرنامج النصي DAG الرئيسي. يمكنك مشاهدة config.py الملف بكامله على GitHub جيثب:. أفضل الممارسات هي الاستخدام مدير أسرار AWS لتخزين معلومات التكوين والأسرار (حتى كتابة هذه السطور، متجر معلمات مدير أنظمة AWS ليست واجهة خلفية مدعومة على Amazon MWAA). تتوفر وثائق تفصيلية حول كيفية تخزين الأسرار بشكل آمن في AWS Secrets Manager for Amazon MWAA هنا.

تحميل المحدثة config.py الملف إلى دليل DAG.

قم بتجهيز بيانات التدريب الخاصة بالعميل

تم ذكر مجموعة بيانات العملاء في الكتاب اكتشاف المعرفة في البيانات بواسطة دانيال ت. لاروز. ينسبها المؤلف إلى مستودع مجموعات بيانات التعلم الآلي بجامعة كاليفورنيا في إيرفين. مجموعة البيانات متاحة للجمهور ومقدمة في جيثب ريبو.

تحميل customer-churn.csv الملف إلى موقع Amazon S3 الذي حددته في ملف config.py ملف.

بناء داغ

في عرضنا التوضيحي، تتكون DAG من أربعة أقسام أساسية:

  • استيراد البيانات
  • تكوين مشغل DAG
  • تعريفات مهمة DAG
  • تعريف تبعية مهمة DAG

استيراد البيانات

نظرًا لأن Airflow يعتمد على Python، فإن ملف DAG هو ملف Python بسيط ويتم استيراد الوحدات النمطية لـ Airflow تمامًا كما هو الحال مع أي تطبيق Python.

تحتوي بعض الخدمات على مشغلي Airflow الأصليين الذين يديرون مكالمات API غير المتزامنة والاستقصاء لتحديد نجاح أو فشل المهام المنسقة. نوصي باستخدام عوامل التشغيل الأصلية حيثما أمكن ذلك. خدمات AWS التي لا تحتوي على مشغلي Airflow الأصليين، مثل AWS Glue، لا يزال من الممكن تنظيمها في Airflow باستخدام AWS SDKs التي يتم استدعاؤها من العام PythonOperator.

بالنسبة لجميع خدمات AWS تقريبًا، فإن AWS SDK لـ Python (Boto3) يوفر الوصول على مستوى الخدمة إلى واجهات برمجة التطبيقات. يوفر SDK هذا درجة عالية من التحكم، ولكنه يوفر أيضًا مستوى أقل من التجريد. بالنسبة لخطوط تعلم الآلة التي تستخدم SageMaker، يمكنك استخدام ملف SageMaker بيثون SDK. هذا عبارة عن حزمة SDK مبسطة تم استخلاصها خصيصًا لتجربة تعلم الآلة.

تتضمن بيانات الاستيراد التالية وحدات Airflow العامة ومشغليها، ومشغلي Airflow الأصليين لـ SageMaker، وBoto3 وSageMaker SDK:

# Airflow Operators
import airflow
from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import PythonOperator # Airflow Sagemaker Operators
from airflow.providers.amazon.aws.operators.sagemaker_training import SageMakerTrainingOperator
from airflow.providers.amazon.aws.operators.sagemaker_endpoint import SageMakerEndpointOperator
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook # AWS SDK for Python
import boto3 # Amazon SageMaker SDK
import sagemaker
from sagemaker.amazon.amazon_estimator import get_image_uri
from sagemaker.estimator import Estimator
from sagemaker.session import s3_input # Airflow SageMaker Configuration
from sagemaker.workflow.airflow import training_config
from sagemaker.workflow.airflow import model_config_from_estimator
from sagemaker.workflow.airflow import deploy_config_from_estimator # Configuration variables
import config

هناك حاجة إلى بيانات الاستيراد الأخرى لدعم هذا العرض التوضيحي؛ الرجوع إلى جيثب ريبو للحصول على الكود الكامل.

تكوين مشغل DAG

يتم تحديد مهام DAG وDAG بناءً على عوامل التشغيل التي تم استدعاؤها لتشغيل كل مهمة.

بالنسبة لمهمة AWS Glue، فإننا نستدعي الأمر PythonOperator استخدام SDK لـ Python لإنشاء عميل لـ AWS Glue. للحفاظ على كود DAG مرتبًا، نقوم بتجريد كود عميل AWS Glue في وظيفة مساعدة تسمى preprocess_glue. نحن ننظم glue_etl.py (المشار إليه في جيثب ريبو) في Amazon S3 حتى يمكن تحميله عند إنشاء مهمة AWS Glue. انظر الكود التالي:

def preprocess_glue(): """preprocess data using glue for etl""" # not best practice to hard code location glue_script_location = 's3://{}/{}'.format(config.GLUE_JOB_SCRIPT_S3_BUCKET, config.GLUE_JOB_SCRIPT_S3_KEY) glue_client = boto3.client('glue') # instantiate the Glue ETL job response = glue_client.create_job( Name=glue_job_name, Description='PySpark job to extract the data and split in to training and validation data sets', Role=config.GLUE_ROLE_NAME, ExecutionProperty={ 'MaxConcurrentRuns': 2 }, Command={ 'Name': 'glueetl', 'ScriptLocation': glue_script_location, 'PythonVersion': '3' }, DefaultArguments={ '--job-language': 'python' }, GlueVersion='1.0', WorkerType='Standard', NumberOfWorkers=2, Timeout=60 ) # execute the previously instantiated Glue ETL job response = glue_client.start_job_run( JobName=response['Name'], Arguments={ '--S3_SOURCE': config.DATA_S3_SOURCE, '--S3_DEST': config.DATA_S3_DEST, '--TRAIN_KEY': 'train/', '--VAL_KEY': 'validation/' } )

نقوم بإنشاء وظيفة مساعدة تُرجع ARN لدور SageMaker:

def get_sagemaker_role_arn(role_name, region_name): iam = boto3.client("iam", region_name=region_name) response = iam.get_role(RoleName=role_name) return response["Role"]["Arn"]

يتطلب مُقدِّر XGBoost دور SageMaker وصورة الحاوية والمعلمات الفائقة، والتي نجمعها باستخدام رابط في SageMaker:

hook = AwsBaseHook(aws_conn_id="airflow-sagemaker", client_type="sagemaker")
sess = hook.get_session(region_name=config.REGION_NAME)
sagemaker_role = get_sagemaker_role_arn(config.SAGEMAKER_ROLE_NAME, config.REGION_NAME)
container = get_image_uri(sess.region_name, "xgboost")
hyperparameters = { "max_depth":"5", "eta":"0.2", "gamma":"4", "min_child_weight":"6", "subsample":"0.8", "objective":"binary:logistic", "num_round":"100"
}

مع تحديد المعلمات، يمكننا إنشاء كائن المقدر:

xgb_estimator = Estimator( image_name=container, hyperparameters=hyperparameters, role=sagemaker_role, sagemaker_session=sagemaker.session.Session(sess), train_instance_count=1, train_instance_type='ml.m5.4xlarge', train_volume_size=5, output_path=config.SAGEMAKER_MODEL_S3_DEST
)

يعد كائن المقدر هذا معلمة إدخال في تكوين التدريب. نحن بحاجة إلى تحديد معلمات التدريب الأخرى:

# create unique name with guid
sagemaker_taining_job_name=config.SAGEMAKER_TRAINING_JOB_NAME_PREFIX+'-{}'.format(guid) # define S3 locations for training & validation data processed using Glue
sagemaker_training_data = s3_input(config.SAGEMAKER_TRAINING_DATA_S3_SOURCE, content_type=config.SAGEMAKER_CONTENT_TYPE)
sagemaker_validation_data = s3_input(config.SAGEMAKER_VALIDATION_DATA_S3_SOURCE, content_type=config.SAGEMAKER_CONTENT_TYPE) sagemaker_training_inputs = { 'train': sagemaker_training_data, 'validation': sagemaker_validation_data }

دعونا نلقي نظرة فاحصة على الحجج ل sagemaker_training_inputs. تدعم خوارزمية XGBoost كلاً من تنسيقات النصوص LIBSVM وCSV لمجموعات بيانات التدريب والتحقق من الصحة. ومع ذلك، يتم دعم LIBSVM بشكل افتراضي. هذا يعني أنه يجب علينا تحديد ملف CSV بشكل صريح حتى يفسر XGBoost بياناتنا بشكل صحيح. تم تعيين نوع المحتوى كـ text/csv في ملف تكوين DAG المخصص لدينا. نحن نستخدم ملف CSV لأنه تنسيق ملفات البيانات الأكثر شيوعًا والمألوف لدى جميع ممارسي تعلم الآلة.

من خلال تحديد هذه المعلمات، يمكننا إنشاء كائن تكوين التدريب:

training_config = training_config( estimator=xgb_estimator, inputs=sagemaker_training_inputs, job_name=sagemaker_taining_job_name
)

بالنسبة لمشغلي Airflow SageMaker الأصليين، يمكنك إنشاء كائنات تكوين محددة جيدًا والإشارة إليها عند استدعاء المشغلين.

تعريف التكوين التالي مخصص لنقطة نهاية SageMaker:

# create unique name using guid
sagemaker_model_name=config.SAGEMAKER_MODEL_NAME_PREFIX+'-{}'.format(guid)
sagemaker_endpoint_name=config.SAGEMAKER_ENDPOINT_NAME_PREFIX+'-{}'.format(guid)

بالنسبة لهذا الخط البسيط، نستخدم deploy_config_from_estimator خيار واجهة برمجة التطبيقات (API) في SageMaker SDK لتصدير تكوين نشر Airflow مباشرة من مُقدِّر SageMaker XGBoost ( endpoint_name يجب أن تكون المعلمة 63 حرفًا أو أقل):

endpoint_config = deploy_config_from_estimator( estimator=xgb_estimator, task_id="train", task_type="training", initial_instance_count=1, instance_type="ml.m4.xlarge", model_name=sagemaker_model_name, endpoint_name=sagemaker_endpoint_name
)

لمزيد من المعلومات حول كيفية إعداد تدريب النموذج وتكوين النشر، بما في ذلك كيفية استخدامنا لـ SageMaker SDK sagemaker.workflow.airflow واجهات برمجة التطبيقات، راجع جيثب ريبو.

مع اكتمال تكوين المشغل، نحن على استعداد لتجميع كل ذلك معًا لتحديد DAG الخاص بنا.

تعريفات مهمة DAG

بالنسبة لمهمة التدريب على نموذج XGBoost، فإننا نستدعي الأمر SageMakerTrainingOperator. بالنسبة لمهمة نشر نقطة النهاية، نقوم باستدعاء SageMakerEndpointOperator. من المهم ملاحظة الفصل بين الاهتمامات: نقوم بإنشاء نموذج باستخدام SageMakerModelOperator ولكن قم بتكوين نقطة نهاية SageMaker باستخدام SageMakerEndpointConfigOperator. يوفر هذا تحكمًا تفصيليًا إضافيًا في إنشاء النموذج ونشره. انظر الكود التالي:

args = {"owner": "airflow", "start_date": airflow.utils.dates.days_ago(2), 'depends_on_past': False} with DAG( dag_id=config.AIRFLOW_DAG_ID, default_args=args, start_date=days_ago(2), schedule_interval=None, concurrency=1, max_active_runs=1,
) as dag: process_task = PythonOperator( task_id="process", dag=dag, #provide_context=False, python_callable=preprocess_glue, ) train_task = SageMakerTrainingOperator( task_id = "train", config = training_config, aws_conn_id = "airflow-sagemaker", wait_for_completion = True, check_interval = 60, #check status of the job every minute max_ingestion_time = None, #allow training job to run as long as it needs, change for early stop ) endpoint_deploy_task = SageMakerEndpointOperator( task_id = "endpoint-deploy", config = endpoint_config, aws_conn_id = "sagemaker-airflow", wait_for_completion = True, check_interval = 60, #check status of endpoint deployment every minute max_ingestion_time = None, operation = 'create', #change to update if you are updating rather than creating an endpoint )

تعريف تبعية مهمة DAG

بعد أن نحدد المهام، نقوم بتعيين تبعيات المهام. ينفذ تدفق الهواء عامل التشغيل المنطقي الصحيح (>>) لتحديد التبعيات النهائية والمشغل المنطقي للتحول الأيسر (<<) لتحديد تبعيات المنبع. في مثالنا، نحدد فقط التبعيات النهائية:

# set the dependencies between tasks
process_task >> train_task >> endpoint_deploy_task

عندما يتم تحميل DAG المكتمل إلى موقع Amazon S3 المعين، تقوم Amazon MWAA تلقائيًا باستيعاب DAG. يُظهر عرض الرسم البياني بشكل مرئي تبعيات المهمة. يمكنك تشغيل DAG يدويًا من وحدة التحكم أثناء الاختبار التكراري، أو كما وصفنا سابقًا، من مصدر خارجي مثل EventBridge ووظيفة Lambda. يتم تمييز كل مهمة حسب مرحلة الإنجاز، كما هو موضح في لقطة الشاشة التالية. يشير اللون الأخضر الداكن إلى إكمال المهمة بنجاح.

اختبار نقطة النهاية المنشورة

بعد endpoint-deploy اكتملت المهمة، يمكننا عرض نقطة النهاية على وحدة تحكم SageMaker. نقطة نهاية SageMaker هي نقطة نهاية للاستدلال في الوقت الفعلي. يعتني SageMaker بنشر نقطة نهاية HTTPS واستضافتها وكشفها.

يمكننا اختبار نقطة النهاية المنشورة باستخدام دفتر ملاحظات SageMaker.

اتبع هذه الخطوات لإعداد بيئة دفتر ملاحظات SageMaker:

  1. قم بتشغيل مثيل دفتر ملاحظات SageMaker.
  2. على مثيلات دفتر الملاحظات الصفحة، افتح مثيل دفتر الملاحظات الخاص بك عن طريق اختيار أي منهما افتح JupyterLab لواجهة JupyterLab أو فتح كوكب المشتري لعرض Jupyter الكلاسيكي.
  3. اختار تحميل لاستيراد دفتر الاختبار المتوفر في ملف جيثب ريبو.

قم بإعداد عينة اختبار

نحن نستخدم Pandas DataFrames لإنشاء مجموعة بيانات اختبارية من مجموعة بيانات العملاء التي تم استخدامها للتدريب. بالنسبة لمجموعة بيانات الاختبار، يجب علينا إسقاط عمود التسمية، وهو العمود الأول. نحن أيضًا نأخذ عينة عشوائية من مجموعة البيانات باستخدام طريقة عينة Pandas DataFrame.

طلب الاستنتاجات

الآن بعد أن حصلنا على عينات من بيانات الاختبار، نستخدم مكتبة Boto3 لإنشاء عميل وقت تشغيل SageMaker. نحن نستخدم العميل عندما نستدعي نقطة النهاية الخاصة بنا، ونمرر بيانات الاختبار، ونتلقى قيمة الاستدلال.

وفي الختام

يمكنك استخدام Amazon MWAA لتنسيق وأتمتة خطوط تعلم الآلة المعقدة بدءًا من مرحلة معالجة البيانات وحتى التدريب النموذجي ونشر نقطة النهاية. يمكنك تعيين خيارات تكوين خاصة في بيئة Amazon MWAA لدعم أطر عمل ML الشائعة مثل XGBoost.

في هذا المنشور، أوضحنا كيفية إنشاء مهمة AWS Glue وتشغيلها ديناميكيًا لمعالجة بيانات التدريب والتحقق من الصحة مسبقًا. لقد أظهرنا كيفية إنشاء DAG لدعم خط أنابيب ML هذا، بما في ذلك عبارات الاستيراد وتكوين مشغل DAG وتعريفات مهمة DAG وتعريف تبعية DAG. لقد أظهرنا الفرق بين استخدام مشغلي Airflow الأصليين مقابل استدعاء مكالمات AWS SDK API من عام PythonOperator.

Amazon MWAA عبارة عن أداة تنسيق متعددة الاستخدامات يمكن للمؤسسات استخدامها لتفعيل قدرات التعلم الآلي الخاصة بها وتوسيع نطاقها.


عن المؤلفين

جاستن ليتو هو مهندس الحلول الأول في Amazon Web Services وهو متخصص في تحليلات البيانات الضخمة والتعلم الآلي. شغفه هو مساعدة العملاء على اعتماد السحابة بشكل أفضل. يستمتع في أوقات فراغه بالإبحار بعيدًا عن الشاطئ والعزف على البيانو لموسيقى الجاز. يعيش في مانهاتن مع زوجته فيرا.

ديفيد ايرليش هو متخصص في التعلم الآلي في Amazon Web Services. إنه متحمس لمساعدة العملاء على إطلاق الإمكانات الحقيقية لبياناتهم. يستمتع في أوقات فراغه باستكشاف الأحياء المختلفة في مدينة نيويورك، والذهاب إلى نوادي الكوميديا، والسفر.

شرياس سوبرامانيان هو مهندس حلول متخصص في الذكاء الاصطناعي/تعلم الآلة، ويساعد العملاء باستخدام التعلم الآلي لحل تحديات أعمالهم باستخدام خدمات AWS.

المصدر: https://aws.amazon.com/blogs/machine-learning/orchestrate-xgboost-ml-pipelines-with-amazon-managed-workflows-for-apache-airflow/

الطابع الزمني:

اكثر من AWS مدونة التعلم الآلي