일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |
31 |
- 외국납부세액공제
- Kubernetes
- Filter
- 금융소득
- OpenSearch
- route53
- 농지연금
- boto3
- S3
- 양도소득세
- 세금계산서
- 산지
- 신탁공매
- 임업후계자
- 클러스터
- command
- 산지연금
- 이자
- 성능개선
- AWS
- kubectl
- lambda
- 공매
- serverless
- python
- 정책자금
- 인덱싱
- 매입불공제
- Resolver
- 경매
- Today
- Total
목록IT/spark (9)
진지한 개발자
from pyspark.sql import SparkSession# Spark 세션 생성spark = SparkSession.builder \ .appName("ETL Example") \ .getOrCreate()# 1. 데이터 추출 (Extract)# HDFS에서 CSV 파일 읽기input_path = "hdfs://namenode:8020/path/to/input/data.csv"df = spark.read.csv(input_path, header=True, inferSchema=True)# 2. 데이터 변환 (Transform)# 예: 특정 열의 값을 변경하거나 새로운 열 추가transformed_df = df.withColumn("new_column", df["existing_colu..
# convert each row in DataFrame to list of integer df.col_2 = df.col2.map(lambda x: [int(e) for e in x]) df_spark = spark.createDataFrame(df) df_spark.select('col_1', explode(col('col_2')).alias('cols_2')).show(10)
data1 = spark.read.parquet(path) json_schema = spark.read.json(data1.rdd.map(lambda row: row.json_col)).schema data2 = data1.withColumn("data", from_json("json_col", json_schema)) col1 = datat2.columns col1.remove("data") col2 = data2.select("data.*").columns append_str = "data." col3 = [append_str + val for val in col2] col_list = col1 + col3 data3 = data2.select(*col_list).drop("json_col")
from pyspark.sql.functions import lit df = sqlContext.createDataFrame( [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3")) df_with_x4 = df.withColumn("x4", lit(0)) df_with_x4.show() ## +---+---+-----+---+ ## | x1| x2| x3| x4| ## +---+---+-----+---+ ## | 1| a| 23.0| 0| ## | 3| B|-23.0| 0| ## +---+---+-----+---+ from pyspark.sql.functions import exp df_with_x5 = df_with_x4.withColumn("x5", exp(..
스파크에서 사용되는 분산 콜렉션 API DataFrame : DataFrame is like a partitioned table. Row 타입의 레코드와 칼럼으로 구성 됨 DataFrame 의 파티셔닝은 DataFrame이 클러스터에서 물리적으로 배치되는 형태를 정의 (물리적 분산저장) 데이터 타입 검증 시기 : 런타임에서 확인 지원 : 모두 지원 Row 접근 : Row 타입을 직렬화된 바이너리 구조로 변환. 스파크의 최적화된 내부 포맷: 일반적으로 가장 빠름 DataFrame의 최적화 sortWithinPartitions : 트랜스포메이션 처리 전에 파티션별 정렬을 수행하여 DataFrame 을 최적화 함 spark.read.load('file.json').sortWithinPartitions('cou..
spark 실행 확인 cd $SPARK_HOME ./bin/pyspark 위의 경우 worker UI 화면 접근 가능 예제 : 워드 카운트 lines = sc.textFile("README.md") lines.count() lines.first() pythonLines = lines.filter(lambda line : "Python" in line) pyrhonLines.first() python 파일 생성 from pyspark import SparkConf, SparkContext conf = SparkConf().setMaster("local").setAppName("test") sc = SparkContext(conf=conf) lines = sc.textFile("./README.md") pr..
1. Java, scala 설치 # sudo apt update # sudo apt-get upgrade sudo apt install default-jdk scala2. Apache Spark 다운로드 wget https://downloads.apache.org/spark/spark-3.4.0/spark-3.4.0-bin-hadoop3-scala2.13.tgz 3. Spark /opt 경로에 풀기 sudo mkdir /opt/spark sudo tar -xf spark*.tgz -C /opt/spark --strip-component 1 sudo chmod -R 777 /opt/spark 4. 시스템 경로에 Java, Spark 폴더 추가 echo "export JAVA_HOME=/usr/lib/jvm..
기본세팅 spark 와 java 호환 버전 확인 https://docs.scala-lang.org/overviews/jdk-compatibility/overview.html Java 설치 wget https://download.oracle.com/java/17/latest/jdk-17_linux-x64_bin.rpm sudo rpm -ivh jdk-17_linux-x64_bin.rpm sudo alternatives --config java java -version scala 설치 Coursier : JVM 및 기본 Scala 툴을 자동 설치 해 줌 설치경로 : https://github.com/coursier/launchers/raw/master/cs-x86_64-pc-win32.zip ammonite..
기본세팅 spark 와 java 호환 버전 확인 The Scala 3.x series supports JDK 8, as well as 11 and beyond. https://docs.scala-lang.org/overviews/jdk-compatibility/overview.html Java(17) 설치 Correto 17 설치 sudo yum install java-17-amazon-corretto Java 환경변수 설정 (~/.bashrc 에 작성후 source ~/.bashrc 적용 # Java set export JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto export PATH=$JAVA_HOME/bin:$PATH export CLASS_PATH=$JAVA..