이 기사는 데이터 과학 Blogathon
개요
Spark는 빅 데이터 처리를 위해 전 세계의 데이터 과학자들이 사용하는 분석 엔진입니다. Hadoop을 기반으로 하며 일괄 처리 및 스트리밍 데이터 처리가 가능합니다. Hadoop은 클러스터의 여러 노드에 걸쳐 데이터를 분할한 다음 데이터를 병렬로 계산하기 위해 자체 컴퓨팅 리소스를 사용하는 분산 컴퓨팅을 위한 프레임워크입니다. 이것은 오픈 소스 소프트웨어이며 번개처럼 빠르게 작동하므로 빅 데이터 처리에 널리 사용됩니다.
시작하기 전에 Spark에 어느 정도 익숙하고 빅 데이터를 처리하는 작은 응용 프로그램에 대해 작업했다고 가정합니다. 또한 Spark RDD, Spark DataFrame에 대한 지식과 관계형 데이터베이스 및 SQL에 대한 기본 이해는 이 기사를 계속 진행하는 데 도움이 됩니다.
스파크 촉매 옵티마이저
우리는 spark 2의 촉매 최적화 프로그램을 이해하는 것으로 이 기사를 시작하고 병렬로 데이터를 처리하기 위해 논리적 및 물리적 계획을 생성하는 방법을 살펴보겠습니다.
Spark 2에는 번개처럼 빠른 실행을 제공하는 촉매 최적화 도구가 포함되어 있습니다. 촉매 옵티마이저는 spark SQL 및 DataFrame API를 구동하는 최적화 엔진입니다.
촉매 최적화 프로그램에 대한 입력은 처리해야 하는 SQL 쿼리 또는 DataFrame API 메서드일 수 있습니다. 이를 입력 관계라고 합니다. SQL 쿼리의 결과가 spark DataFrame이기 때문에 둘 다 비슷하다고 생각할 수 있습니다. 이러한 입력을 사용하여 촉매 최적화 프로그램은 논리적 최적화 계획을 제시합니다. 하지만 이 단계에서는 컬럼의 종류를 고려하지 않기 때문에 논리적인 계획이 미해결이라고 합니다. 사실 이 단계에서 옵티마이저는 컬럼의 존재를 인지하지 못합니다.
이것은 카탈로그가 그림으로 들어오는 곳입니다. 카탈로그에는 모든 데이터 소스의 모든 테이블에 대한 세부 정보가 카탈로그 형식으로 포함되어 있습니다. 카탈로그는 논리적 계획의 입력 및 결과 분석을 수행하는 데 사용됩니다.
이 시점 이후에 실제 최적화가 수행됩니다. 여기에서 가능한 최적화를 찾기 위해 입력이 전달됩니다. 이러한 단계에는 쿼리가 보다 효율적으로 실행되도록 쿼리를 단순화하기 위해 프로젝션 정리 및 표현식 단순화가 포함될 수 있습니다. 그런 다음 옵티마이저는 서로 다른 조합으로 서로 다른 최적화를 제시하고 논리적 계획 모음을 생성합니다.
그런 다음 각 계획의 비용이 계산됩니다. 리소스 및 실행 시간 측면에서 비용이 가장 낮은 논리적 계획이 선택됩니다. 논리적 계획을 선택한 후에는 사용 가능한 리소스를 고려하여 물리적 계획으로 변환해야 합니다. 따라서 입력으로 가장 낮은 비용의 논리적 계획이 생성되고 물리적 계획의 수가 텅스텐 엔진을 사용하여 각각에 대한 비용이 계산됩니다. 비용 계산에는 사용 가능한 리소스, 전체 성능 및 각 물리적 계획에 대한 리소스 사용 효율성을 비롯한 여러 매개변수가 포함됩니다. 이 단계의 출력은 Sparks 실행 엔진에서 실행될 Java 바이트 코드입니다. 이것은 촉매 최적화기의 최종 출력입니다.
촉매 옵티마이저의 높은 수준의 논리적 개요 다이어그램은 다음과 같습니다.
스파크 SQL 소개
DataFrame API를 사용하여 Spark DataFrame에서 수행할 수 있는 몇 가지 작업이 있습니다. Spark DataFrame의 다양한 행과 열을 사용하여 다양한 변환을 수행할 수 있습니다. 집계 및 윈도우 작업도 수행할 수 있습니다.
관계형 데이터베이스 및 SQL 작업에 대한 배경 지식이 있는 사용자는 관계형 테이블을 사용하는 DataFrame에 익숙할 것입니다. Spark SQL에서 쿼리를 작성하여 여러 분석 작업을 수행할 수 있습니다.
Spark DataFrameas를 관계형 데이터베이스 테이블로 취급하는 방법을 이해할 수 있는 몇 가지 예를 보여 드리겠습니다.
스파크 세션 생성
이를 위해 시스템에서 스파크를 설정해야 하며 스파크 콘솔에 로그인한 후 예제를 수행하기 위해 다음 패키지를 가져와야 합니다.
pyspark.sql에서 가져오기 pyspark.sql.types에서 SparkSession 가져오기 * pyspark.sql.functions에서 가져오기 * pyspark.sql.types에서 가져오기 행 날짜/시간 가져오기 날짜/시간
필요한 가져오기 후에 다음 명령으로 스파크 세션을 초기화해야 합니다.
spark = SparkSession.builder.appName("파이썬 스파크 SQL 기본 예제").config("spark.some.config.option", "some-value").getOrCreate()
그런 다음 병렬화 기능을 사용하여 Spark RDD를 생성합니다. 이 RDD에는 XNUMX명의 학생에 대한 XNUMX개의 행이 포함되어 있으며 값은 자명합니다.
Student_records = sc.parallelize([Row(roll_no=1,name='John Doe',passed=True,marks={'Math':89,'Physics':87,'Chemistry':81},sports =[' 체스','축구'], DoB=datetime(2012,5,1,13,1,5)), Row(roll_no=2,name='John Smith',passed=False,marks={'수학': 29,'물리학':31,'화학':36}, 스포츠 =['배구','탁구'], DoB=datetime(2012,5,12,14,2,5))])
데이터프레임 생성
이 RDD에서 DataFrame을 만들고 명령에 따라 결과 DataFrame을 표시해 보겠습니다.
학생_기록_df = 학생_기록.toDF() 학생_기록_df.show()
이제 열 'marks'의 내용이 잘린 것을 볼 수 있습니다. 전체 콘텐츠를 보려면 다음 명령을 실행할 수 있습니다.
Student_records_df.show(truncate=False)
임시 보기 만들기
위의 DataFrame은 관계형 테이블로 취급할 수 있습니다. 이를 위해 다음 명령을 사용하여 생성된 스파크 세션에 유효한 '레코드'라는 관계형 보기를 생성할 수 있습니다.
학생_레코드_df.createOrReplaceTempView('레코드')
이제 이 보기에 대해 SQL 쿼리를 실행하고 결과를 표시할 시간입니다.
spark.sql("SELECT * FROM 레코드").show()
여기에서 spark.sql이 Spark DataFrame을 반환하는지 확인할 수 있습니다.
DataFrame 내에서 목록 또는 사전의 요소에 액세스
RDD를 만드는 동안 사전 데이터 구조로 'marks' 필드를 채우고 목록 데이터 구조로 'sports' 필드를 채웠습니다. 해당 사전과 목록에서 특정 요소를 선택하는 SQL 쿼리를 작성할 수 있습니다.
spark.sql('SELECT roll_no, mark["Physics"], sports[1] FROM 레코드').show()
목록에서 요소의 위치를 지정하거나 사전의 경우를 지정할 수 있으며 키를 사용하여 요소에 액세스합니다.
Where 절
다음 예에서 where 절의 사용을 살펴보겠습니다.
spark.sql("SELECT * FROM 레코드가 전달된 위치 = True").show()
위의 예에서는 '통과' 열에 부울 값이 True인 행을 선택했습니다.
데이터 구조 필드의 값을 사용하여 where 절을 작성할 수도 있습니다. 다음 예에서는 마크 사전의 'Chemistry' 키를 사용하고 있습니다.
spark.sql('SELECT * FROM 레코드 WHEREmarks["화학"] < 40').show()
글로벌 뷰 생성
위에서 생성한 'records' 보기는 현재 세션에 대한 범위만 있습니다. 세션이 사라지면 보기가 종료되고 액세스할 수 없습니다. 그러나 뷰를 생성한 세션이 종료되더라도 동일한 애플리케이션에서 시작된 다른 세션이 뷰에 액세스할 수 있도록 하려면 다음 명령을 사용하여 전역 뷰를 만듭니다.
학생_레코드_df.createGlobalTempView('글로벌_레코드')
이 보기의 범위는 세션 수준이 아닌 응용 프로그램 수준입니다. 이제 이 전역 보기에서 선택 쿼리를 실행해 보겠습니다.
spark.sql("SELECT * FROM global_temp.global_records").show()
모든 전역 보기는 global_temp라는 데이터베이스에 보존됩니다.
DataFrame에서 열 삭제
DataFrame의 열만 보려면 다음 명령을 사용할 수 있습니다.
학생_레코드_df.columns
열을 삭제하려면 drop 명령을 사용할 수 있습니다. 데이터 세트에서 '통과' 열을 삭제해 보겠습니다.
학생_기록_df = 학생_기록_df.drop('통과')
이제 DataFrame에 더 이상 '통과' 열이 없음을 알 수 있습니다.
몇 가지 추가 쿼리
각 학생의 평균 점수를 표시하는 열을 만들어 보겠습니다.
spark.sql("라운드((marks.Physics+marks.Chemistry+marks.Math)/3) avg_marks FROM 레코드 선택").show()
이제 이 열을 기존 DataFrame에 추가합니다.
student_records_df=spark.sql("SELECT *, round( (marks.Physics+marks.Chemistry+marks.Math)/3) avg_marks FROM 레코드") student_records_df.show()
이전에 '통과' 열을 삭제했습니다. 평균 점수를 계산한 후 상태를 '통과' 또는 '실패'로 지정하고 평균 점수가 40%보다 큰지 확인하는 '상태'라는 새 열을 파생할 수 있습니다.
이를 수행하려면 먼저 뷰를 다시 업데이트해야 합니다.
학생_레코드_df.createOrReplaceTempView('레코드')
다음 쿼리를 통해 이를 달성할 수 있습니다.
학생_기록_df = 학생_기록_df.withColumn('상태',(때(col('avg_marks')>=40, '통과')).otherwise('실패')) 학생_기록_df.show()
위의 명령은 내부에 정의된 작업을 실행하여 기존 DataFrame에 새 열을 추가합니다.
그룹화 기준 및 집계
Spark SQL의 몇 가지 기능을 더 살펴보겠습니다. 이를 위해서는 새로운 DataFrame을 가져와야 합니다. 직원 레코드로 새 DataFrame을 생성해 보겠습니다.
employeeData =(('John','HR','NY',90000,34,10000), ('Neha','HR','NY',86000,28,20000), ('Robert','판매 ','CA',81000,56,22000), ('마리아','판매','CA',99000,45,15000), ('폴','IT','NY',98000,38,14000, 90000,34,20000), ('젠','IT','CA',93000,28,28000), ('라지','IT','CA',95000,31,19000), ('푸자','IT ','CA',XNUMX)) columns = ('employee_name','department','state','salary','age','bonus') employeeDf = spark.createDataFrame(employeeData, columns)
부서별 총 급여를 조회하려면 다음과 같이 할 수 있습니다.
직원Df.groupby(col('부서')).agg(sum(col('급여'))).show()
결과는 부서별 총 급여를 보여줍니다. 총 급여를 순서대로 보고 싶다면 다음과 같은 방법으로 달성할 수 있습니다.
employeeDf.groupby(col('department')).agg(sum(col('salary')).alias('total_sal')).orderBy('total_sal').show()
여기에서 전체 급여가 오름차순으로 표시됩니다. 이것을 내림차순으로 보려면 다음 명령을 실행해야 합니다.
직원Df.groupby(col('부서')).agg(sum(col('급여')).alias('total_sal')).orderBy(col('total_sal').desc()).show()
한 번에 여러 DataFrame 열에 대해 그룹화 및 집계를 수행할 수 있습니다.
직원Df.groupby(col('부서'),col('주')).agg(sum(col('보너스'))).show()
다음과 같은 방법으로 한 번에 더 많은 집계를 실행할 수 있습니다.
employeeDf.groupby(col('department')).agg(avg(col('salary')).alias('avarage_salary'),max(col('bonus')).alias('maximum_bonus')).show ()
스파크에서 윈도잉
창 함수를 사용하면 입력 행 범위에서 주어진 행의 순위와 같은 결과를 계산할 수 있습니다.
각 부서의 두 번째로 높은 급여를 계산하려고 한다고 가정합니다. 이러한 시나리오에서는 스파크 창 기능을 사용할 수 있습니다.
스파크에서 윈도우를 사용하려면 pyspark.sql.window에서 Window 패키지를 가져와야 하고 다음을 작성할 수 있습니다.
pyspark.sql.window에서 가져오기 창 windowSpec = Window.partitionBy("department").orderBy(col("salary").desc()) employeeDf = employeeDf.withColumn("rank",dense_rank().over(windowSpec) ) employeeDf.filter(col('순위') == 2).show()
위의 명령 시퀀스에서 먼저 pyspark.sql.window에서 Window 패키지를 가져왔습니다.
그런 다음 윈도우에 대한 사양을 정의했습니다.
다음으로 DataFrame의 창 기능을 수행하고 부서별 최고 급여를 표시하는 새로운 열 순위를 추가했습니다.
마지막으로 순위가 2인 DataFrame을 필터링하여 모든 부서에서 두 번째로 높은 급여를 표시하는 명령을 실행했습니다.
스파크에서 조인
조인을 수행하기 위해 각 부서의 관리자를 포함하는 다른 데이터 세트를 생성해 보겠습니다.
관리자 = (('영업','마리아'),('인사','존'),('IT','푸자')) mg_columns = ('부서', '관리자') managerDf = spark.createDataFrame( 관리자, mg_columns) managerDf.show()
이제 각 직원의 관리자 이름을 보려면 다음 명령을 실행할 수 있습니다.
employeeDf.join(managerDf, employeeDf['department'] == managerDf['department'], how='inner').select(col('employee_name'),col('manager')).show()
조인 방법으로 두 DataFrame의 조인을 수행할 수 있습니다. 조인을 수행할 열과 수행하려는 조인 유형(내부, 왼쪽, 오른쪽 등)을 조인 메서드 내에서 지정해야 합니다.
결론
이 기사에서는 Spark SQL의 기본 사항, 번개처럼 빠르게 작동하는 이유 및 Spark SQL을 사용하여 Spark DataFrame을 조작하는 방법을 배웠습니다. 또한 데이터를 분할하고 논리적으로 정렬하는 방법과 마지막으로 조인을 사용하여 여러 DataFrame으로 작업하는 방법을 배웠습니다.
읽어 주셔서 감사합니다. 이러한 기술이 데이터에 대한 복잡한 분석을 빠르게 수행하는 데 도움이 되기를 바랍니다.
행복한 배움!!
이 기사에 표시된 미디어는 Analytics Vidhya의 소유가 아니며 작성자의 재량에 따라 사용됩니다.
관련
출처: https://www.analyticsvidhya.com/blog/2021/08/an-introduction-to-data-analysis-using-spark-sql/
- ACCESS
- 계정
- All
- 분석
- 분석
- API를
- API
- 어플리케이션
- 어플리케이션
- 기사
- 기초
- 빅 데이터
- 건축업자
- 화학
- 체스
- 단
- 컴퓨팅
- 함유량
- 만들기
- Current
- 데이터
- 데이터 분석
- 데이터 처리
- 데이터베이스
- 데이터베이스
- 분산 컴퓨팅
- 암사슴
- 드롭
- 떨어 뜨린
- 효율성
- 종료
- 등
- 실행
- FAST
- 최종적으로
- 먼저,
- 축구
- 형태
- 뼈대
- 가득 찬
- 기능
- 글로벌
- 그룹
- 하둡
- 방법
- How To
- hr
- HTTPS
- 포함
- IT
- 자바
- 어울리다
- 키
- 배운
- 레벨
- 번개
- 명부
- math
- 미디어
- 노드
- NY
- 행정부
- 선택권
- 주문
- 기타
- 성능
- 물리적
- 물리학
- .
- Python
- 범위
- 읽기
- 기록
- 의지
- 제품 자료
- 결과
- 반품
- ROBERT
- 달리기
- 판매
- 과학
- 과학자
- 선택된
- 세트
- 기술
- 작은
- So
- 소프트웨어
- 속도
- 스포츠
- SQL
- 단계
- 스타트
- 주 정부
- Status
- 스트리밍
- 학생
- 체계
- 일시적인
- 기초
- 시간
- 상단
- 치료
- 업데이트
- us
- 가치
- 관측
- 누구
- 이내
- 작업
- 일
- 세계
- 쓰기