Procesamiento paralelo de archivos grandes en Python

Procesamiento paralelo de archivos grandes en Python

Nodo de origen: 1970820

Procesamiento paralelo de archivos grandes en Python
Imagen del autor
 

Para el procesamiento paralelo, dividimos nuestra tarea en subunidades. Aumenta el número de trabajos procesados ​​por el programa y reduce el tiempo total de procesamiento. 

Por ejemplo, si está trabajando con un archivo CSV grande y desea modificar una sola columna. Introduciremos los datos como una matriz en la función, y procesará en paralelo varios valores a la vez en función del número de valores disponibles.  los trabajadores.. Estos trabajadores se basan en la cantidad de núcleos dentro de su procesador. 
 

Nota: el uso de procesamiento paralelo en un conjunto de datos más pequeño no mejorará el tiempo de procesamiento.

 

En este blog, aprenderemos cómo reducir el tiempo de procesamiento en archivos grandes usando multiprocesamiento, libreta de trabajoy tqdm Paquetes de Python. Es un tutorial simple que se puede aplicar a cualquier archivo, base de datos, imagen, video y audio. 
 

Nota: estamos usando el cuaderno Kaggle para los experimentos. El tiempo de procesamiento puede variar de una máquina a otra.  

 

Usaremos el Accidentes en EE. UU. (2016 - 2021) conjunto de datos de Kaggle que consta de 2.8 millones de registros y 47 columnas. 

Nosotros importaremos multiprocessing, jobliby tqdm para procesamiento paralelo, pandas para ingestiones de datosy re, nltky string para procesamiento de texto

# Computación paralela
importar multiprocesamiento as mp
en libreta de trabajo importar Paralelo, retrasado
en tqdm.cuaderno importar tqdm # Ingestión de datos 
importar Los pandas as pd # Procesamiento de texto 
importar re en nltk.corpus importar Para las palabras
importar cadena

Antes de saltar directamente, establezcamos n_workers al duplicar cpu_count(). Como puede ver, tenemos 8 trabajadores.

n_trabajadores = 2 * mp.cpu_count() print(f"{n_trabajadores} trabajadores disponibles") >>> 8 trabajadores están disponibles

En el siguiente paso, incorporaremos grandes archivos CSV usando el Los pandas read_csv función. Luego, imprima la forma del marco de datos, el nombre de las columnas y el tiempo de procesamiento. 

Nota: Función mágica de Jupyter %%time puede mostrar tiempos de CPU y tiempo de pared Al final del proceso. 

 

%%time file_name="../input/us-accidents/US_Accidents_Dec21_updated.csv" df = pd.read_csv(file_name) print(f"Shape:{df.shape}nnColumn Names:n{df.columns}n")

Salida

Forma:(2845342, 47) Nombres de columna: Índice(['ID', 'Severity', 'Start_Time', 'End_Time', 'Start_Lat', 'Start_Lng', 'End_Lat', 'End_Lng', 'Distance(mi) ', 'Descripción', 'Número', 'Calle', 'Lado', 'Ciudad', 'Condado', 'Estado', 'Código postal', 'País', 'Zona horaria', 'Código_aeropuerto', 'Tiempo_marca de tiempo', 'Temperatura (F)', 'Wind_Chill (F)', 'Humedad (%)', 'Presión (in)', 'Visibilidad (mi)', 'Wind_Direction', 'Wind_Speed ​​(mph)', 'Precipitación (in )', 'Condición_climática', 'Servicio', 'Bump', 'Cruce', 'Ceder_camino', 'Cruce', 'Sin salida', 'Ferrocarril', 'Rotonda', 'Estación', 'Parada', 'Calmar_tráfico' , 'Traffic_Signal', 'Turning_Loop', 'Sunrise_Sunset', 'Civil_Twilight', 'Nautical_Twilight', 'Astronomical_Twilight'], dtype='object') Tiempos de CPU: usuario 33.9 s, sys: 3.93 s, total: 37.9 s Tiempo de pared : 46.9 s

El clean_text es una función sencilla para procesar y limpiar el texto. conseguiremos ingles Para las palabras usando nltk.copus utilícelo para filtrar las palabras vacías de la línea de texto. Después de eso, eliminaremos los caracteres especiales y los espacios adicionales de la oración. Será la función de línea de base para determinar el tiempo de procesamiento para de serie, paraleloy lote procesar. 

def texto_limpio(texto): # Eliminar palabras vacías paradas = stopwords.words("english") text = " ".join([palabra para por el temor  in texto.split() if por el temor  no in se detiene]) # Eliminar caracteres especiales texto = texto.translate(str.maketrans('', '', cadena.puntuación)) # eliminando los espacios extra texto = re.sub(' +',' ', texto) volvemos texto

Para el procesamiento en serie, podemos usar los pandas .apply() función, pero si desea ver la barra de progreso, debe activar tqdm para Los pandas y luego usar el .progress_apply() función. 

Vamos a procesar los 2.8 millones de registros y guardar el resultado en la columna de la columna "Descripción". 

%%time tqdm.pandas() df['Descripción'] = df['Descripción'].progress_apply(clean_text)

Salida

Le tomó 9 minutos y 5 segundos para el de gama alta procesador para procesar en serie 2.8 millones de filas. 

100 % 2845342/2845342 [09:05<00:00, 5724.25 it/s] Tiempos de CPU: usuario 8 min 14 s, sys: 53.6 s, total: 9 min 7 s Tiempo de pared: 9 min 5 s

Hay varias formas de procesar el archivo en paralelo, y vamos a aprender sobre todas ellas. El multiprocessing es un paquete de python incorporado que se usa comúnmente para el procesamiento paralelo de archivos grandes. 

Crearemos un multiprocesamiento Piscina 8 trabajadores y utilice el mapa función para iniciar el proceso. Para mostrar las barras de progreso, estamos usando tqdm.

La función map consta de dos secciones. El primero requiere la función y el segundo requiere un argumento o lista de argumentos. 

Aprende más leyendo documentación

%%time p = mp.Pool(n_workers) df['Description'] = p.map(clean_text,tqdm(df['Description']))

Salida

Hemos mejorado nuestro tiempo de procesamiento en casi 3X. El tiempo de procesamiento se redujo de 9 minutos 5 segundos a 3 minutos 51 segundos.   

100% 2845342/2845342 [02:58<00:00, 135646.12it/s] Tiempos de CPU: usuario 5.68 s, sys: 1.56 s, total: 7.23 s Tiempo de pared: 3 min 51 s

Ahora aprenderemos sobre otro paquete de Python para realizar procesamiento en paralelo. En esta sección, usaremos joblib's Paralelo y retrasado para replicar el mapa función. 

  • Parallel requiere dos argumentos: n_jobs = 8 y backend = multiprocesamiento.
  • Luego, agregaremos texto_limpio  En el correo electrónico “Su Cuenta de Usuario en su Nuevo Sistema XNUMXCX”. retrasado función. 
  • Cree un bucle para alimentar un solo valor a la vez. 

El proceso a continuación es bastante genérico y puede modificar su función y matriz según sus necesidades. Lo he usado para procesar miles de archivos de audio y video sin ningún problema. 

Recomendado: agregar manejo de excepciones usando try: y except:

def texto_paralelo_limpio(matriz): resultado = Parallel(n_jobs=n_workers,backend="multiprocessing"))(retrasado(clean_text) (texto) para texto in tqdm(matriz) ) volvemos resultado

Agregue la columna "Descripción" a text_parallel_clean()

%%time df['Descripción'] = text_parallel_clean(df['Descripción'])

Salida

Nuestra función tardó 13 segundos más que multiprocesar el Piscina. Incluso entonces, Paralelo es 4 minutos y 59 segundos más rápido que de serie procesar. 

100% 2845342/2845342 [04:03<00:00, 10514.98it/s] Tiempos de CPU: usuario 44.2 s, sys: 2.92 s, total: 47.1 s Tiempo de pared: 4 min 4 s

Hay una mejor manera de procesar archivos grandes dividiéndolos en lotes y procesándolos en paralelo. Comencemos por crear una función por lotes que ejecutará un clean_function en un solo lote de valores. 

Función de procesamiento por lotes

def proc_batch(lote): volvemos [ texto_limpio(texto) para texto in lote ]

Dividir el archivo en lotes

La siguiente función dividirá el archivo en múltiples lotes según la cantidad de trabajadores. En nuestro caso, obtenemos 8 lotes. 

def archivo por lotes(matriz,n_trabajadores): file_len = len(matriz) lote_tamaño = round(file_len / n_workers) lotes = [ matriz[ix:ix+batch_size] para ix in tqdm(rango(0, longitud_archivo, tamaño_lote)) ] volvemos lotes lotes = archivo_lote(df['Descripción'],n_trabajadores) >>> 100% 8/8 [00:00<00:00, 280.01it/s]

Ejecutar procesamiento por lotes en paralelo

Finalmente, utilizaremos Paralelo y retrasado para procesar lotes. 

Nota: Para obtener una sola matriz de valores, tenemos que ejecutar la comprensión de listas como se muestra a continuación. 

 

%%time batch_output = Parallel(n_jobs=n_workers,backend="multiprocessing"))(retrasado(proc_batch) (lote) para lote in tqdm(lotes) ) df['Descripción'] = [j para i in lote_salida para j in i]

Salida

Hemos mejorado el tiempo de procesamiento. Esta técnica es famosa por procesar datos complejos y entrenar modelos de aprendizaje profundo. 

100% 8/8 [00:00<00:00, 2.19it/s] Tiempos de CPU: usuario 3.39 s, sys: 1.42 s, total: 4.81 s Tiempo de pared: 3 min 56 s

tqdm lleva el multiprocesamiento al siguiente nivel. Es simple y poderoso. Lo recomendaré a todos los científicos de datos. 

Primero eche un vistazo al sitio web de la página documentación para obtener más información sobre el multiprocesamiento. 

El process_map requiere:

  1. Nombre de la función
  2. Columna de marco de datos
  3. max_trabajadores
  4. chucksize es similar al tamaño del lote. Calcularemos el tamaño del lote utilizando el número de trabajadores o puede agregar el número según su preferencia. 
%%tiempo
en tqdm.contrib.concurrente importar process_map lote = round(len(df)/n_workers) df["Description"] = process_map(clean_text, df["Description"], max_workers=n_workers, chunksize=batch )

Salida

Con una sola línea de código, obtenemos el mejor resultado. 

100% 2845342/2845342 [03:48<00:00, 1426320.93it/s] Tiempos de CPU: usuario 7.32 s, sys: 1.97 s, total: 9.29 s Tiempo de pared: 3 min 51 s

Debe encontrar un equilibrio y seleccionar la técnica que mejor se adapte a su caso. Puede ser procesamiento en serie, paralelo o por lotes. El procesamiento paralelo puede ser contraproducente si está trabajando con un conjunto de datos más pequeño y menos complejo. 

En este minitutorial, hemos aprendido sobre varios paquetes y técnicas de Python que nos permiten procesar en paralelo nuestras funciones de datos. 

Si solo está trabajando con un conjunto de datos tabulares y desea mejorar el rendimiento de su procesamiento, le sugiero que intente Tablero, tabla de datosy RÁPIDOS 

Referencia 

 
 
Abid Ali Awan (@ 1abidaliawan) es un profesional científico de datos certificado al que le encanta crear modelos de aprendizaje automático. Actualmente, se está enfocando en la creación de contenido y escribiendo blogs técnicos sobre aprendizaje automático y tecnologías de ciencia de datos. Abid tiene una Maestría en Gestión de Tecnología y una licenciatura en Ingeniería de Telecomunicaciones. Su visión es construir un producto de IA utilizando una red neuronal gráfica para estudiantes que luchan contra enfermedades mentales.
 

Sello de tiempo:

Mas de nuggets