Notice
Recent Posts
Recent Comments
Link
250x250
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | |
7 | 8 | 9 | 10 | 11 | 12 | 13 |
14 | 15 | 16 | 17 | 18 | 19 | 20 |
21 | 22 | 23 | 24 | 25 | 26 | 27 |
28 | 29 | 30 |
Tags
- 임업후계자
- 리소스
- 정책자금
- 성능개선
- S3
- python
- 양도소득세
- 세금계산서
- AWS
- 매입불공제
- 농지연금
- 인덱싱
- Resolver
- boto3
- 경매
- kubectl
- 공매
- route53
- serverless
- 외국납부세액공제
- 금융소득
- command
- Filter
- 신탁공매
- lambda
- 산지연금
- OpenSearch
- node
- Kubernetes
- pod
Archives
- Today
- Total
진지한 개발자
PySpark 특징 및 장점 본문
728x90
- 스파크에서 사용되는 분산 콜렉션 API
- DataFrame : DataFrame is like a partitioned table.
- Row 타입의 레코드와 칼럼으로 구성 됨
- DataFrame 의 파티셔닝은 DataFrame이 클러스터에서 물리적으로 배치되는 형태를 정의 (물리적 분산저장)
- 데이터 타입 검증 시기 : 런타임에서 확인
- 지원 : 모두 지원
- Row 접근 : Row 타입을 직렬화된 바이너리 구조로 변환. 스파크의 최적화된 내부 포맷: 일반적으로 가장 빠름
- DataFrame의 최적화
- sortWithinPartitions : 트랜스포메이션 처리 전에 파티션별 정렬을 수행하여 DataFrame 을 최적화 함
- spark.read.load('file.json').sortWithinPartitions('count')
- repartition
- repartition 메서드를 호출하면 전체 데이터를 셔플 함
- 이후 작업이 추가실행되면서 Spark 내부 옵티마이저는 최적의 성능을 위해 파티션 구성을 동적으로 조정하기 때문
- 사용자가 지정한 파티션 구성이 유지되길 원한다면, 사용자가 명시적 지정 파티션 구성 실행 이후에 cache() 또는 persist() 메서드를 호출하여 결과를 캐시해야 함
- repartition은 향후에 사용할 파티션 수가 현재 파티션 수보다 많거나 컬럼을 기준으로 파티션을 만드는 경우에만 사용해야 함. 특정 컬럼을 기준으로 자주 필터링한다면 자주 필터링 되는 컬럼 기준으로 파티션을 재분배하는 것이 좋음
- 셔플링 작업이 최소화되도록 Executor 노드 간의 통신이 적어지도록 할수록 효율적
- 리파티션을 적용할 컬럼으로 executor 노드에 적절히 분배될 수 있는 컬럼을 골라서 executor 노드의 개수와 컬럼 안의 unique value 의 개수가 비슷할수록 좋음
- COUNTING practice
- count 함수 사용 할때는
select 구문에 표현식으로 작성하는 것이 아니라 agg 메서드에 작성하는게 좋음
df.select(count(col("colB"))).groupBy('colA') # no
df.groupBy('colA').agg(count('colB')) # recomended - agg 메서드 안에 작성된 표현식은 Spark Catalyst Optimizer에 의해 최적화되어 실행 계획을 개선 -> 실행 속도 빨라짐
- agg 메서드 사용시 표현식의 결과타입을 명시적으로 지정할 수 있음
- agg 메서드를 사용하면 여러 개의 집계 함수를 동시에 적용할 수 있음
- count 함수 사용 할때는
- Action을 통한 Data cacheing
- spark 에서 transpormation query는 action 전까지는 수행되지 않음
- df1 read 두번 읽음
- df1 = spark.read.format('file')
- df2 = df1.groupby('colA').count().collect() =>> df1 read 수행
- df2 = df1.groupby('colB').count().collect() =>> df1 read 수행
- df1 cacheing
- df1 = spark.read.format('file')
- print('df1 count: ' + str(df1.count())) =>> df1 cacheing
- df2 = df1.groupby('colA').count().collect() =>> cacheing 이용
- df2 = df1.groupby('colB').count().collect() =>> cacheing 이용
- action: count(), cache(), persist()
- cache() : 메모리 상황에 따라 일부만 캐싱 됨
- persist() : 메모리, 디스크를 둘다 캐시 영역으로 지정하기 때문에 전체를 다 강제 캐싱함
- df1 read 두번 읽음
- spark 에서 transpormation query는 action 전까지는 수행되지 않음
- 권장 파일포맷, 압축 포맷
- parquet : 데이터를 바이너리 파일에, 컬럼 지향 방식으로 저장. 사용하지 않는 데이터를 빠르게 건너뛸 수 있도록 통계를 함께 저장함. hive 와 동시에 사용한다면 orc도 권장
- gzip : 분할 가능한 압축 포맷
- 파일 크기 관리
- spark는 HDFS 기반이라 특히 작은 크기의 파일을 잘 다루지 못함. 파일의 크기가 작으면 read 할때 메타데이터에 관리부하가 발생 함
- 나중의 read 작업을 위해 write 작업 시 파일당 5000여개 정도의 row를 포함하는 파일 생성이 best!
- sortWithinPartitions : 트랜스포메이션 처리 전에 파티션별 정렬을 수행하여 DataFrame 을 최적화 함
- Dataset
- 데이터 타입 검증 시기 : 컴파일 타임에서 확인
- 지원 : JVM 기반의 언어인 스칼라, 자바에서만 지원
- Row 접근 : Row 타입을 사용자 정의 데이터 타입으로 변환. 사용자에게 더 많은 유연성을 제공하지만 성능 나빠짐
- SQL 테이블, 뷰
- DataFrame : DataFrame is like a partitioned table.
- UDF : User Defined Function
- 예제 : pyspark의 DataFrame 에 새로운 컬럼 추가하기
df = sqlContext.createDataFrame(
[(1, 'a', 23.0), (3, 'B', -23.0)], ('x1','x2','x3')
)
from pyspark.sql.functions import udf
from pyspark.sql.types import *
def valueToCategory(value):
if value == 1: return 'cat1'
elif value == 2: return 'cat2'
else: return 'n/a'
# NOTE: it seems that calls to udf() must be after SparkContext() is called
udfValueToCategory = udf(valueToCategory, StringType())
df_with_cat = df.withColumn('category', udfValueToCategory('x1'))
df_with_cat.show()
## +---+---+-----+----------+
## | x1| x2| x3| category|
## +---+---+-----+----------+
## | 1| a| 23.0| cat1|
## | 3| B|-23.0| n/a|
## +---+---+----+-----------+
728x90
'IT > spark' 카테고리의 다른 글
PySpark json flatten case (0) | 2023.08.25 |
---|---|
PySpark의 UDF 예제 (0) | 2023.07.31 |
Pyspark 예제 실행 (0) | 2023.04.19 |
Apache Spark 설치 (0) | 2023.04.19 |
window 에 scala 설치 (0) | 2023.02.08 |