Notice
Recent Posts
Recent Comments
Link
250x250
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |
31 |
Tags
- OpenSearch
- 금융소득
- 리소스
- AWS
- Resolver
- lambda
- 인덱싱
- 성능개선
- Kubernetes
- 산지연금
- kubectl
- 외국납부세액공제
- pod
- serverless
- node
- route53
- 신탁공매
- 세금계산서
- 매입불공제
- Filter
- 경매
- 양도소득세
- 농지연금
- 임업후계자
- 정책자금
- python
- 공매
- S3
- boto3
- command
Archives
- Today
- Total
진지한 개발자
Pyspark 사용예 (HDFS) 본문
728x90
from pyspark.sql import SparkSession
# Spark 세션 생성
spark = SparkSession.builder \
.appName("ETL Example") \
.getOrCreate()
# 1. 데이터 추출 (Extract)
# HDFS에서 CSV 파일 읽기
input_path = "hdfs://namenode:8020/path/to/input/data.csv"
df = spark.read.csv(input_path, header=True, inferSchema=True)
# 2. 데이터 변환 (Transform)
# 예: 특정 열의 값을 변경하거나 새로운 열 추가
transformed_df = df.withColumn("new_column", df["existing_column"] * 2) \
.filter(df["filter_column"] > 100)
# 3. 데이터 적재 (Load)
# 변환된 데이터를 HDFS에 저장 (Parquet 형식으로 저장)
output_path = "hdfs://namenode:8020/path/to/output/transformed_data.parquet"
transformed_df.write.parquet(output_path, mode="overwrite")
# Spark 세션 종료
spark.stop()
728x90
'IT > spark' 카테고리의 다른 글
PySpark Preprocessing (0) | 2023.08.25 |
---|---|
PySpark json flatten case (0) | 2023.08.25 |
PySpark의 UDF 예제 (0) | 2023.07.31 |
PySpark 특징 및 장점 (0) | 2023.07.31 |
Pyspark 예제 실행 (0) | 2023.04.19 |