Zdjęcie autora
W przypadku przetwarzania równoległego dzielimy nasze zadanie na podjednostki. Zwiększa liczbę zadań przetwarzanych przez program i skraca całkowity czas przetwarzania.
Na przykład, jeśli pracujesz z dużym plikiem CSV i chcesz zmodyfikować pojedynczą kolumnę. Przekażemy dane jako tablicę do funkcji, która będzie równolegle przetwarzać wiele wartości jednocześnie w oparciu o liczbę dostępnych pracowników. Pracownicy ci bazują na liczbie rdzeni procesora.
Uwaga: użycie przetwarzania równoległego na mniejszym zbiorze danych nie poprawi czasu przetwarzania.
W tym blogu dowiemy się, jak skrócić czas przetwarzania dużych plików za pomocą wieloprocesowe, Joblib job, tqdm Pakiety Pythona. Jest to prosty samouczek, który można zastosować do dowolnego pliku, bazy danych, obrazu, wideo i dźwięku.
Uwaga: do eksperymentów używamy notatnika Kaggle. Czas przetwarzania może się różnić w zależności od maszyny.
Będziemy używać Wypadki w USA (2016 – 2021) zbiór danych z Kaggle, który składa się z 2.8 miliona rekordów i 47 kolumn.
Zaimportujemy multiprocessing
, joblib
, tqdm
dla przetwarzanie równoległe, pandas
dla pozyskiwanie danych, re
, nltk
, string
dla przetwarzanie tekstu.
# Równoległe obliczenia importować wieloprocesowe as mp od Joblib job importować Równoległy, opóźniony od tqdm.notebook importować tqdm # Pozyskiwanie danych importować pandy as pd # Przetwarzanie tekstu importować re od nltk.korpus importować pomijane słowa importować ciąg
Zanim wskoczymy od razu, umówmy się n_workers
przez podwojenie cpu_count()
. Jak widać, mamy 8 pracowników.
n_pracowników = 2 * mp.cpu_count() print(f"{n_pracowników} jest dostępnych") >>> Dostępnych jest 8 pracowników
W następnym kroku przetworzymy duże pliki CSV za pomocą pandy read_csv
funkcjonować. Następnie wydrukuj kształt ramki danych, nazwy kolumn i czas przetwarzania.
Uwaga: Magiczna funkcja Jupytera
%%time
może wyświetlić Czasy procesora i czas na ścianę na końcu procesu.
%%time file_name="../input/us-accidents/US_Accidents_Dec21_updated.csv" df = pd.read_csv(file_name) print(f"Kształt:{df.shape}nnNazwy kolumn:n{df.columns}n")
Wydajność
Kształt:(2845342, 47) Nazwy kolumn: Index(['ID', 'Severity', 'Start_Time', 'End_Time', 'Start_Lat', 'Start_Lng', 'End_Lat', 'End_Lng', 'Distance(mi) ', 'Opis', 'Numer', 'Ulica', 'Strona', 'Miasto', 'Okręg', 'Stan', 'Kod pocztowy', 'Kraj', 'Strefa czasowa', 'Kod_lotniska', 'Pogoda_sygnatura czasowa', „Temperatura (F)”, „Wiatr_Chill(F)”, „Wilgotność (%)”, „Ciśnienie (w)”, „Widoczność (mi)”, „Kierunek_wiatru”, „Prędkość wiatru (mph)”, „Opady (w )”, „Warunki_pogody”, „Udogodnienia”, „Wyboje”, „Przejście”, „Ustąp pierwszeństwa”, „Skrzyżowanie”, „Brak zjazdu”, „Kolej”, „Rondo”, „Stacja”, „Stop”, „Uspokojenie ruchu” , 'Traffic_Signal', 'Turning_Loop', 'Sunrise_Sunset', 'Civil_Twilight', 'Nautical_Twilight', 'Astronomical_Twilight'], dtype='object') Czasy procesora: użytkownik 33.9 s, system: 3.93 s, łącznie: 37.9 s Czas na ścianie : 46.9 sek
Połączenia clean_text
to prosta funkcja do przetwarzania i czyszczenia tekstu. Dostaniemy angielski pomijane słowa za pomocą nltk.copus
użyj go do odfiltrowania słów stop z wiersza tekstu. Następnie usuniemy ze zdania znaki specjalne i dodatkowe spacje. Będzie to funkcja bazowa określająca czas przetwarzania dla seryjny, równolegle, partia przetwarzanie.
def czysty_tekst(tekst): # Usuń słowa stopu przystanki = stopwords.words("polski") tekst = " ".join([słowo dla słowo in tekst.split() if słowo nie in przystanki]) # Usuń znaki specjalne text = text.translate(str.maketrans('', '', string.punctuation)) # usuwanie dodatkowych spacji tekst = re.sub(' +',' ', tekst) powrót XNUMX
Do przetwarzania seryjnego możemy użyć pand .apply()
funkcja, ale jeśli chcesz zobaczyć pasek postępu, musisz aktywować tqdm dla pandy a następnie użyj .progress_apply()
funkcja.
Zamierzamy przetworzyć 2.8 miliona rekordów i zapisać wynik z powrotem w kolumnie „Opis”.
%%time tqdm.pandas() df['Opis'] = df['Opis'].progress_apply(czysty_tekst)
Wydajność
Zajęło to 9 minut i 5 sekund high-end procesor do przetwarzania szeregowego 2.8 miliona wierszy.
100% 2845342/2845342 [09:05<00:00, 5724.25it/s] Czas procesora: użytkownik 8 min 14 s, system: 53.6 s, łącznie: 9 min 7 s Czas ściany: 9 min 5 s
Istnieją różne sposoby równoległego przetwarzania pliku, a my poznamy je wszystkie. The multiprocessing
to wbudowany pakiet Pythona, który jest powszechnie używany do równoległego przetwarzania dużych plików.
Stworzymy wieloprzetwarzanie pływacki w pracownicy 8 i użyj mapa funkcja do zainicjowania procesu. Aby wyświetlić paski postępu, używamy tqdm.
Funkcja mapy składa się z dwóch części. Pierwszy wymaga funkcji, a drugi argumentu lub listy argumentów.
Dowiedz się więcej, czytając dokumentacja.
%%time p = mp.Pool(n_workers) df['Opis'] = p.map(czysty_tekst,tqdm(df['Opis']))
Wydajność
Poprawiliśmy czas przetwarzania o prawie 3X. Czas przetwarzania spadł z 9 minut 5 sekund do 3 minut 51 sekund.
100% 2845342/2845342 [02:58<00:00, 135646.12it/s] Czas procesora: użytkownik 5.68 s, system: 1.56 s, łącznie: 7.23 s Czas ściany: 3 min 51 s
Dowiemy się teraz o innym pakiecie Pythona do przetwarzania równoległego. W tej sekcji użyjemy Joblib's Parallel i opóźniony replikować mapa funkcja.
- Parallel wymaga dwóch argumentów: n_jobs = 8 i backend = multiprocessing.
- Następnie dodamy czysty_tekst do opóźniony funkcja.
- Utwórz pętlę, aby podawać pojedynczą wartość na raz.
Poniższy proces jest dość ogólny i możesz modyfikować swoją funkcję i tablicę zgodnie z własnymi potrzebami. Użyłem go do bezproblemowego przetwarzania tysięcy plików audio i wideo.
Polecamy: dodaj obsługę wyjątków za pomocą try:
i except:
def tekst_parallel_clean(tablica): wynik = Parallel(n_jobs=n_workers,backend="multiprocessing")(opóźniony(czysty_tekst) (tekst) dla XNUMX in tqdm(tablica) ) powrót dalsze
Dodaj kolumnę „Opis” do text_parallel_clean()
.
%%time df['Opis'] = text_parallel_clean(df['Opis'])
Wydajność
Zajęło to naszej funkcji 13 sekund więcej niż wieloprocesowe Basen. Nawet wtedy, Parallel jest o 4 minuty i 59 sekund szybciej niż seryjny przetwarzanie.
100% 2845342/2845342 [04:03<00:00, 10514.98it/s] Czas procesora: użytkownik 44.2 s, system: 2.92 s, łącznie: 47.1 s Czas ściany: 4 min 4 s
Istnieje lepszy sposób przetwarzania dużych plików, dzieląc je na partie i przetwarzając je równolegle. Zacznijmy od utworzenia funkcji wsadowej, która będzie uruchamiać clean_function
na jednej partii wartości.
Funkcja przetwarzania wsadowego
def proc_batch(partia): powrót [ czysty_tekst(tekst) dla XNUMX in seria ]
Dzielenie pliku na partie
Poniższa funkcja podzieli plik na wiele partii na podstawie liczby pracowników. W naszym przypadku otrzymujemy 8 partii.
def plik_wsadowy(tablica, n_pracowników): dł_pliku = dł.(tablica) rozmiar_wsadu = round(długość_pliku / n_pracowników) wsadów = [ tablica[ix:ix+rozmiar_wsadu] dla ix in tqdm(zakres(0, długość_pliku, wielkość_wsadu))] powrót partie partie = plik_wsadowy(df['Opis'],n_workers) >>> 100% 8/8 [00:00<00:00, 280.01it/s]
Uruchamianie równoległego przetwarzania wsadowego
Wreszcie użyjemy Parallel i opóźniony do przetwarzania partii.
Uwaga: Aby uzyskać pojedynczą tablicę wartości, musimy uruchomić rozumienie listy, jak pokazano poniżej.
%%time batch_output = Parallel(n_jobs=n_workers,backend="multiprocessing")(opóźniony(proc_batch) (batch) dla partia in tqdm(partie) ) df['Opis'] = [j dla i in wsad_wyjście dla j in i]
Wydajność
Poprawiliśmy czas przetwarzania. Ta technika słynie z przetwarzania złożonych danych i uczenia modeli uczenia głębokiego.
100% 8/8 [00:00<00:00, 2.19it/s] Czas procesora: użytkownik 3.39 s, system: 1.42 s, łącznie: 4.81 s Czas ściany: 3 min 56 s
tqdm przenosi przetwarzanie wieloprocesowe na wyższy poziom. Jest prosty i potężny. Będę go polecać każdemu naukowcowi zajmującemu się danymi.
Zapoznaj się z dokumentacja aby dowiedzieć się więcej o przetwarzaniu wieloprocesowym.
Połączenia process_map
wymaga:
- Nazwa funkcji
- Kolumna ramki danych
- max_pracownicy
- chucksize jest podobny do rozmiaru partii. Obliczymy wielkość partii na podstawie liczby pracowników lub możesz dodać liczbę na podstawie swoich preferencji.
%%czas od tqdm.contrib.concurrent importować mapa_procesu partia = round(len(df)/n_workers) df["Opis"] = mapa_procesu( czysty_tekst, df["Opis"], max_workers=n_workers, chunksize=batch)
Wydajność
Za pomocą jednej linii kodu uzyskujemy najlepszy wynik.
100% 2845342/2845342 [03:48<00:00, 1426320.93it/s] Czas procesora: użytkownik 7.32 s, system: 1.97 s, łącznie: 9.29 s Czas ściany: 3 min 51 s
Musisz znaleźć równowagę i wybrać technikę, która najlepiej sprawdzi się w Twoim przypadku. Może to być przetwarzanie szeregowe, równoległe lub przetwarzanie wsadowe. Przetwarzanie równoległe może się odwrócić, jeśli pracujesz z mniejszym, mniej złożonym zestawem danych.
W tym minisamouczku poznaliśmy różne pakiety i techniki Pythona, które pozwalają nam na równoległe przetwarzanie naszych funkcji danych.
Jeśli pracujesz tylko z tabelarycznym zbiorem danych i chcesz poprawić wydajność przetwarzania, sugeruję, abyś spróbował Deska rozdzielcza, Tabela danych, KATARAKTY NA RZECE
Numer Referencyjny
Abid Ali Awan (@ 1abidaliawan) jest certyfikowanym specjalistą ds. analityków danych, który uwielbia tworzyć modele uczenia maszynowego. Obecnie koncentruje się na tworzeniu treści i pisaniu blogów technicznych na temat technologii uczenia maszynowego i data science. Abid posiada tytuł magistra zarządzania technologią oraz tytuł licencjata inżynierii telekomunikacyjnej. Jego wizją jest zbudowanie produktu AI z wykorzystaniem grafowej sieci neuronowej dla studentów zmagających się z chorobami psychicznymi.
- Dystrybucja treści i PR oparta na SEO. Uzyskaj wzmocnienie już dziś.
- Platoblockchain. Web3 Inteligencja Metaverse. Wzmocniona wiedza. Dostęp tutaj.
- Źródło: https://www.kdnuggets.com/2022/07/parallel-processing-large-file-python.html?utm_source=rss&utm_medium=rss&utm_campaign=parallel-processing-large-file-in-python
- 1
- 10
- 11
- 2016
- 2021
- 39
- 7
- 9
- a
- O nas
- wypadki
- Stosownie
- Po
- AI
- Wszystkie kategorie
- i
- Inne
- Aplikuj
- argument
- argumenty
- Szyk
- audio
- dostępny
- z powrotem
- Backend
- Bilans
- bar
- paski
- na podstawie
- Baseline
- poniżej
- BEST
- Ulepsz Swój
- Blog
- blogi
- budować
- Budowanie
- wbudowany
- obliczać
- walizka
- Dyplomowani
- znaków
- Miasto
- Sprzątanie
- kod
- Kolumna
- kolumny
- powszechnie
- kompleks
- równoległy
- zawartość
- kraj
- hrabstwo
- CPU
- Stwórz
- Tworzenie
- tworzenie
- Obecnie
- dane
- nauka danych
- naukowiec danych
- Baza danych
- głęboko
- głęboka nauka
- Stopień
- opóźniony
- opis
- Ustalać
- Wyświetlacz
- podwojenie
- porzucone
- Inżynieria
- Angielski
- Eter (ETH)
- Każdy
- przykład
- wyjątek
- dodatkowy
- sławny
- szybciej
- filet
- Akta
- filtrować
- Znajdź
- i terminów, a
- skupienie
- od
- funkcjonować
- Funkcje
- otrzymać
- GitHub
- będzie
- wykres
- Wykres sieci neuronowej
- Prowadzenie
- posiada
- W jaki sposób
- How To
- HTML
- HTTPS
- choroba
- obraz
- importować
- podnieść
- ulepszony
- in
- Zwiększenia
- zainicjować
- problem
- IT
- Oferty pracy
- skok
- Knuggety
- duży
- UCZYĆ SIĘ
- dowiedziałem
- nauka
- poziom
- Linia
- Lista
- maszyna
- uczenie maszynowe
- magia
- i konserwacjami
- mapa
- mistrz
- psychika
- Choroba umysłowa
- milion
- minuty
- modele
- modyfikować
- jeszcze
- wielokrotność
- Nazwa
- Nazwy
- Potrzebować
- wymagania
- sieć
- Nerwowy
- sieci neuronowe
- Następny
- notatnik
- numer
- przedmiot
- ogólny
- pakiet
- Pakiety
- pandy
- Parallel
- wykonać
- jest gwarancją najlepszej jakości, które mogą dostarczyć Ci Twoje monitory,
- plato
- Analiza danych Platona
- PlatoDane
- mocny
- wygląda tak
- przetwarzanie
- Procesor
- Produkt
- profesjonalny
- Program
- Postęp
- Python
- Kolej żelazna
- RE
- Czytający
- polecić
- dokumentacja
- zmniejszyć
- zmniejsza
- usunąć
- usuwanie
- Wymaga
- dalsze
- run
- Zapisz
- nauka
- Naukowiec
- druga
- sekund
- Sekcja
- działy
- wyrok
- seryjny
- zestaw
- Shape
- pokazane
- podobny
- Prosty
- pojedynczy
- Rozmiar
- mniejszy
- obowiązuje
- specjalny
- dzielić
- początek
- Stan
- stacja
- Ewolucja krok po kroku
- Stop
- Zatrzymuje
- bezpośredni
- ulica
- Walka
- Studenci
- trwa
- Zadanie
- Techniczny
- Techniki
- Technologies
- Technologia
- telekomunikacja
- Połączenia
- tysiące
- czas
- czasy
- stref czasowych
- do
- Kwota produktów:
- Trening
- Tutorial
- us
- posługiwać się
- Użytkownik
- wartość
- Wartości
- różnorodny
- Wideo
- wizja
- sposoby
- który
- KIM
- będzie
- w ciągu
- bez
- słowo
- słowa
- pracowników
- pracujący
- działa
- pisanie
- Twój
- zefirnet