진지한 개발자

PySpark 특징 및 장점 본문

IT/spark

PySpark 특징 및 장점

제이_엔 2023. 7. 31. 14:40
728x90
  • 스파크에서 사용되는 분산 콜렉션 API
    • DataFrame : DataFrame is like a partitioned table.
      • Row 타입의 레코드와 칼럼으로 구성 됨
      • DataFrame 의 파티셔닝은 DataFrame이 클러스터에서 물리적으로 배치되는 형태를 정의 (물리적 분산저장) 
      • 데이터 타입 검증 시기 : 런타임에서 확인
      • 지원 : 모두 지원
      • Row 접근 : Row 타입을 직렬화된 바이너리 구조로 변환. 스파크의 최적화된 내부 포맷: 일반적으로 가장 빠름
      • DataFrame의 최적화
        1. sortWithinPartitions : 트랜스포메이션 처리 전에 파티션별 정렬을 수행하여 DataFrame 을 최적화 함

          • spark.read.load('file.json').sortWithinPartitions('count')
        2. repartition
          1. repartition 메서드를 호출하면 전체 데이터를 셔플 함
          2. 이후 작업이 추가실행되면서 Spark 내부 옵티마이저는 최적의 성능을 위해 파티션 구성을 동적으로 조정하기 때문
          3. 사용자가 지정한 파티션 구성이 유지되길 원한다면, 사용자가 명시적 지정 파티션 구성 실행 이후에 cache() 또는 persist() 메서드를 호출하여 결과를 캐시해야 함
          4. repartition은 향후에 사용할 파티션 수가 현재 파티션 수보다 많거나 컬럼을 기준으로 파티션을 만드는 경우에만 사용해야 함. 특정 컬럼을 기준으로 자주 필터링한다면 자주 필터링 되는 컬럼 기준으로 파티션을 재분배하는 것이 좋음
          5. 셔플링 작업이 최소화되도록 Executor 노드 간의 통신이 적어지도록 할수록 효율적
          6. 리파티션을 적용할 컬럼으로 executor 노드에 적절히 분배될 수 있는 컬럼을 골라서 executor 노드의 개수와 컬럼 안의 unique value 의 개수가 비슷할수록 좋음 
        3. COUNTING practice
          1. count 함수 사용 할때는
            select 구문에 표현식으로 작성하는 것이 아니라 agg 메서드에 작성하는게 좋음
            df.select(count(col("colB"))).groupBy('colA')           # no
            df.groupBy('colA').agg(count('colB'))                     # recomended
          2. agg 메서드 안에 작성된 표현식은 Spark Catalyst Optimizer에 의해 최적화되어 실행 계획을 개선 -> 실행 속도 빨라짐
          3. agg 메서드 사용시 표현식의 결과타입을 명시적으로 지정할 수 있음
          4. agg 메서드를 사용하면 여러 개의 집계 함수를 동시에 적용할 수 있음
        4. Action을 통한 Data cacheing
          1. spark 에서 transpormation query는 action 전까지는 수행되지 않음
            • df1 read 두번 읽음
              • df1 = spark.read.format('file')
              • df2 = df1.groupby('colA').count().collect()    =>> df1 read 수행
              • df2 = df1.groupby('colB').count().collect()    =>> df1 read 수행
            • df1 cacheing
              • df1 = spark.read.format('file')
              • print('df1 count: ' + str(df1.count()))   =>> df1 cacheing
              • df2 = df1.groupby('colA').count().collect()    =>> cacheing 이용
              • df2 = df1.groupby('colB').count().collect()    =>> cacheing 이용
            • action: count(), cache(), persist()
              • cache() : 메모리 상황에 따라 일부만 캐싱 됨
              • persist() : 메모리, 디스크를 둘다 캐시 영역으로 지정하기 때문에 전체를 다 강제 캐싱함
        5. 권장 파일포맷, 압축 포맷
          1. parquet : 데이터를 바이너리 파일에, 컬럼 지향 방식으로 저장. 사용하지 않는 데이터를 빠르게 건너뛸 수 있도록 통계를 함께 저장함. hive 와 동시에 사용한다면 orc도 권장
          2. gzip : 분할 가능한 압축 포맷
        6. 파일 크기 관리
          1. spark는 HDFS 기반이라 특히 작은 크기의 파일을 잘 다루지 못함. 파일의 크기가 작으면 read 할때 메타데이터에 관리부하가 발생 함
          2. 나중의 read 작업을 위해 write 작업 시 파일당 5000여개 정도의 row를 포함하는 파일 생성이 best!
    • Dataset
      • 데이터 타입 검증 시기 : 컴파일 타임에서 확인
      • 지원 : JVM 기반의 언어인 스칼라, 자바에서만 지원
      • Row 접근 : Row 타입을 사용자 정의 데이터 타입으로 변환. 사용자에게 더 많은 유연성을 제공하지만 성능 나빠짐
    • SQL 테이블, 뷰
  • UDF : User Defined Function
    • 예제 : pyspark의 DataFrame 에 새로운 컬럼 추가하기
df = sqlContext.createDataFrame(
	[(1, 'a', 23.0), (3, 'B', -23.0)], ('x1','x2','x3')
)

from pyspark.sql.functions import udf
from pyspark.sql.types import *

def valueToCategory(value):
	if value == 1: return 'cat1'
    elif value == 2: return 'cat2'
    else: return 'n/a'
    
# NOTE: it seems that calls to udf() must be after SparkContext() is called
udfValueToCategory = udf(valueToCategory, StringType())
df_with_cat = df.withColumn('category', udfValueToCategory('x1'))
df_with_cat.show()

## +---+---+-----+----------+
## | x1| x2|   x3|  category|
## +---+---+-----+----------+
## |  1|  a| 23.0|      cat1|
## |  3|  B|-23.0|       n/a|
## +---+---+----+-----------+
728x90

'IT > spark' 카테고리의 다른 글

PySpark json flatten case  (0) 2023.08.25
PySpark의 UDF 예제  (0) 2023.07.31
Pyspark 예제 실행  (0) 2023.04.19
Apache Spark 설치  (0) 2023.04.19
window 에 scala 설치  (0) 2023.02.08