Równoległe przetwarzanie dużych plików w Pythonie

Równoległe przetwarzanie dużych plików w Pythonie

Węzeł źródłowy: 1970820

Równoległe przetwarzanie dużych plików w Pythonie
Zdjęcie autora
 

W przypadku przetwarzania równoległego dzielimy nasze zadanie na podjednostki. Zwiększa liczbę zadań przetwarzanych przez program i skraca całkowity czas przetwarzania. 

Na przykład, jeśli pracujesz z dużym plikiem CSV i chcesz zmodyfikować pojedynczą kolumnę. Przekażemy dane jako tablicę do funkcji, która będzie równolegle przetwarzać wiele wartości jednocześnie w oparciu o liczbę dostępnych  pracowników. Pracownicy ci bazują na liczbie rdzeni procesora. 
 

Uwaga: użycie przetwarzania równoległego na mniejszym zbiorze danych nie poprawi czasu przetwarzania.

 

W tym blogu dowiemy się, jak skrócić czas przetwarzania dużych plików za pomocą wieloprocesowe, Joblib job, tqdm Pakiety Pythona. Jest to prosty samouczek, który można zastosować do dowolnego pliku, bazy danych, obrazu, wideo i dźwięku. 
 

Uwaga: do eksperymentów używamy notatnika Kaggle. Czas przetwarzania może się różnić w zależności od maszyny.  

 

Będziemy używać Wypadki w USA (2016 – 2021) zbiór danych z Kaggle, który składa się z 2.8 miliona rekordów i 47 kolumn. 

Zaimportujemy multiprocessing, joblib, tqdm dla przetwarzanie równoległe, pandas dla pozyskiwanie danych, re, nltk, string dla przetwarzanie tekstu

# Równoległe obliczenia
importować wieloprocesowe as mp
od Joblib job importować Równoległy, opóźniony
od tqdm.notebook importować tqdm # Pozyskiwanie danych 
importować pandy as pd # Przetwarzanie tekstu 
importować re od nltk.korpus importować pomijane słowa
importować ciąg

Zanim wskoczymy od razu, umówmy się n_workers przez podwojenie cpu_count(). Jak widać, mamy 8 pracowników.

n_pracowników = 2 * mp.cpu_count() print(f"{n_pracowników} jest dostępnych") >>> Dostępnych jest 8 pracowników

W następnym kroku przetworzymy duże pliki CSV za pomocą pandy read_csv funkcjonować. Następnie wydrukuj kształt ramki danych, nazwy kolumn i czas przetwarzania. 

Uwaga: Magiczna funkcja Jupytera %%time może wyświetlić Czasy procesora i czas na ścianę na końcu procesu. 

 

%%time file_name="../input/us-accidents/US_Accidents_Dec21_updated.csv" df = pd.read_csv(file_name) print(f"Kształt:{df.shape}nnNazwy kolumn:n{df.columns}n")

Wydajność

Kształt:(2845342, 47) Nazwy kolumn: Index(['ID', 'Severity', 'Start_Time', 'End_Time', 'Start_Lat', 'Start_Lng', 'End_Lat', 'End_Lng', 'Distance(mi) ', 'Opis', 'Numer', 'Ulica', 'Strona', 'Miasto', 'Okręg', 'Stan', 'Kod pocztowy', 'Kraj', 'Strefa czasowa', 'Kod_lotniska', 'Pogoda_sygnatura czasowa', „Temperatura (F)”, „Wiatr_Chill(F)”, „Wilgotność (%)”, „Ciśnienie (w)”, „Widoczność (mi)”, „Kierunek_wiatru”, „Prędkość wiatru (mph)”, „Opady (w )”, „Warunki_pogody”, „Udogodnienia”, „Wyboje”, „Przejście”, „Ustąp pierwszeństwa”, „Skrzyżowanie”, „Brak zjazdu”, „Kolej”, „Rondo”, „Stacja”, „Stop”, „Uspokojenie ruchu” , 'Traffic_Signal', 'Turning_Loop', 'Sunrise_Sunset', 'Civil_Twilight', 'Nautical_Twilight', 'Astronomical_Twilight'], dtype='object') Czasy procesora: użytkownik 33.9 s, system: 3.93 s, łącznie: 37.9 s Czas na ścianie : 46.9 sek

Połączenia clean_text to prosta funkcja do przetwarzania i czyszczenia tekstu. Dostaniemy angielski pomijane słowa za pomocą nltk.copus użyj go do odfiltrowania słów stop z wiersza tekstu. Następnie usuniemy ze zdania znaki specjalne i dodatkowe spacje. Będzie to funkcja bazowa określająca czas przetwarzania dla seryjny, równolegle, partia przetwarzanie. 

def czysty_tekst(tekst): # Usuń słowa stopu przystanki = stopwords.words("polski") tekst = " ".join([słowo dla słowo in tekst.split() if słowo nie in przystanki]) # Usuń znaki specjalne text = text.translate(str.maketrans('', '', string.punctuation)) # usuwanie dodatkowych spacji tekst = re.sub(' +',' ', tekst) powrót XNUMX

Do przetwarzania seryjnego możemy użyć pand .apply() funkcja, ale jeśli chcesz zobaczyć pasek postępu, musisz aktywować tqdm dla pandy a następnie użyj .progress_apply() funkcja. 

Zamierzamy przetworzyć 2.8 miliona rekordów i zapisać wynik z powrotem w kolumnie „Opis”. 

%%time tqdm.pandas() df['Opis'] = df['Opis'].progress_apply(czysty_tekst)

Wydajność

Zajęło to 9 minut i 5 sekund high-end procesor do przetwarzania szeregowego 2.8 miliona wierszy. 

100% 2845342/2845342 [09:05<00:00, 5724.25it/s] Czas procesora: użytkownik 8 min 14 s, system: 53.6 s, łącznie: 9 min 7 s Czas ściany: 9 min 5 s

Istnieją różne sposoby równoległego przetwarzania pliku, a my poznamy je wszystkie. The multiprocessing to wbudowany pakiet Pythona, który jest powszechnie używany do równoległego przetwarzania dużych plików. 

Stworzymy wieloprzetwarzanie pływacki w pracownicy 8 i użyj mapa funkcja do zainicjowania procesu. Aby wyświetlić paski postępu, używamy tqdm.

Funkcja mapy składa się z dwóch części. Pierwszy wymaga funkcji, a drugi argumentu lub listy argumentów. 

Dowiedz się więcej, czytając dokumentacja

%%time p = mp.Pool(n_workers) df['Opis'] = p.map(czysty_tekst,tqdm(df['Opis']))

Wydajność

Poprawiliśmy czas przetwarzania o prawie 3X. Czas przetwarzania spadł z 9 minut 5 sekund do 3 minut 51 sekund.   

100% 2845342/2845342 [02:58<00:00, 135646.12it/s] Czas procesora: użytkownik 5.68 s, system: 1.56 s, łącznie: 7.23 s Czas ściany: 3 min 51 s

Dowiemy się teraz o innym pakiecie Pythona do przetwarzania równoległego. W tej sekcji użyjemy Joblib's Parallel i opóźniony replikować mapa funkcja. 

  • Parallel wymaga dwóch argumentów: n_jobs = 8 i backend = multiprocessing.
  • Następnie dodamy czysty_tekst  do opóźniony funkcja. 
  • Utwórz pętlę, aby podawać pojedynczą wartość na raz. 

Poniższy proces jest dość ogólny i możesz modyfikować swoją funkcję i tablicę zgodnie z własnymi potrzebami. Użyłem go do bezproblemowego przetwarzania tysięcy plików audio i wideo. 

Polecamy: dodaj obsługę wyjątków za pomocą try: i except:

def tekst_parallel_clean(tablica): wynik = Parallel(n_jobs=n_workers,backend="multiprocessing")(opóźniony(czysty_tekst) (tekst) dla XNUMX in tqdm(tablica) ) powrót dalsze

Dodaj kolumnę „Opis” do text_parallel_clean()

%%time df['Opis'] = text_parallel_clean(df['Opis'])

Wydajność

Zajęło to naszej funkcji 13 sekund więcej niż wieloprocesowe Basen. Nawet wtedy, Parallel jest o 4 minuty i 59 sekund szybciej niż seryjny przetwarzanie. 

100% 2845342/2845342 [04:03<00:00, 10514.98it/s] Czas procesora: użytkownik 44.2 s, system: 2.92 s, łącznie: 47.1 s Czas ściany: 4 min 4 s

Istnieje lepszy sposób przetwarzania dużych plików, dzieląc je na partie i przetwarzając je równolegle. Zacznijmy od utworzenia funkcji wsadowej, która będzie uruchamiać clean_function na jednej partii wartości. 

Funkcja przetwarzania wsadowego

def proc_batch(partia): powrót [ czysty_tekst(tekst) dla XNUMX in seria ]

Dzielenie pliku na partie

Poniższa funkcja podzieli plik na wiele partii na podstawie liczby pracowników. W naszym przypadku otrzymujemy 8 partii. 

def plik_wsadowy(tablica, n_pracowników): dł_pliku = dł.(tablica) rozmiar_wsadu = round(długość_pliku / n_pracowników) wsadów = [ tablica[ix:ix+rozmiar_wsadu] dla ix in tqdm(zakres(0, długość_pliku, wielkość_wsadu))] powrót partie partie = plik_wsadowy(df['Opis'],n_workers) >>> 100% 8/8 [00:00<00:00, 280.01it/s]

Uruchamianie równoległego przetwarzania wsadowego

Wreszcie użyjemy Parallel i opóźniony do przetwarzania partii. 

Uwaga: Aby uzyskać pojedynczą tablicę wartości, musimy uruchomić rozumienie listy, jak pokazano poniżej. 

 

%%time batch_output = Parallel(n_jobs=n_workers,backend="multiprocessing")(opóźniony(proc_batch) (batch) dla partia in tqdm(partie) ) df['Opis'] = [j dla i in wsad_wyjście dla j in i]

Wydajność

Poprawiliśmy czas przetwarzania. Ta technika słynie z przetwarzania złożonych danych i uczenia modeli uczenia głębokiego. 

100% 8/8 [00:00<00:00, 2.19it/s] Czas procesora: użytkownik 3.39 s, system: 1.42 s, łącznie: 4.81 s Czas ściany: 3 min 56 s

tqdm przenosi przetwarzanie wieloprocesowe na wyższy poziom. Jest prosty i potężny. Będę go polecać każdemu naukowcowi zajmującemu się danymi. 

Zapoznaj się z dokumentacja aby dowiedzieć się więcej o przetwarzaniu wieloprocesowym. 

Połączenia process_map wymaga:

  1. Nazwa funkcji
  2. Kolumna ramki danych
  3. max_pracownicy
  4. chucksize jest podobny do rozmiaru partii. Obliczymy wielkość partii na podstawie liczby pracowników lub możesz dodać liczbę na podstawie swoich preferencji. 
%%czas
od tqdm.contrib.concurrent importować mapa_procesu partia = round(len(df)/n_workers) df["Opis"] = mapa_procesu( czysty_tekst, df["Opis"], max_workers=n_workers, chunksize=batch)

Wydajność

Za pomocą jednej linii kodu uzyskujemy najlepszy wynik. 

100% 2845342/2845342 [03:48<00:00, 1426320.93it/s] Czas procesora: użytkownik 7.32 s, system: 1.97 s, łącznie: 9.29 s Czas ściany: 3 min 51 s

Musisz znaleźć równowagę i wybrać technikę, która najlepiej sprawdzi się w Twoim przypadku. Może to być przetwarzanie szeregowe, równoległe lub przetwarzanie wsadowe. Przetwarzanie równoległe może się odwrócić, jeśli pracujesz z mniejszym, mniej złożonym zestawem danych. 

W tym minisamouczku poznaliśmy różne pakiety i techniki Pythona, które pozwalają nam na równoległe przetwarzanie naszych funkcji danych. 

Jeśli pracujesz tylko z tabelarycznym zbiorem danych i chcesz poprawić wydajność przetwarzania, sugeruję, abyś spróbował Deska rozdzielcza, Tabela danych, KATARAKTY NA RZECE 

Numer Referencyjny 

 
 
Abid Ali Awan (@ 1abidaliawan) jest certyfikowanym specjalistą ds. analityków danych, który uwielbia tworzyć modele uczenia maszynowego. Obecnie koncentruje się na tworzeniu treści i pisaniu blogów technicznych na temat technologii uczenia maszynowego i data science. Abid posiada tytuł magistra zarządzania technologią oraz tytuł licencjata inżynierii telekomunikacyjnej. Jego wizją jest zbudowanie produktu AI z wykorzystaniem grafowej sieci neuronowej dla studentów zmagających się z chorobami psychicznymi.
 

Znak czasu:

Więcej z Knuggety