아카이브

[메타코드m] <Spark를 활용한 데이터 엔지니어링 이론 + 실습 강의> 2주차 본문

공모전 및 대외활동/앰배서더

[메타코드m] <Spark를 활용한 데이터 엔지니어링 이론 + 실습 강의> 2주차

머루아빠승우 2024. 12. 25. 23:54

안녕하세요! 여러분,

오늘은 2주차 스파크 엔지니어링 강좌 2주차 후기 글입니다.

 

현재 메타코드에서 Spark를 활용한 데이터 엔지니어링

초급과정 내용을 공유하고 있어요!

Spark를 배워서 데이터 엔지니어로 성장하고 싶으신 분들이라면

메타코드에서 성장하시면 좋을 것 같습니다~

 

 

[New] Spark를 활용한 Data Engineering 입문 실습 강의 | 네카라쿠배 현직자 강사

 

metacodes.co.kr


실습

Databricks의 환경에서 학습이 됩니다.

해당 환경은 아파치 스파크 환경을 작동할 수 있는 클라우드 기반 환경이에요.

기본적으로 Vscode를 사용해본 분들이라면, 어떤 느낌인지 쉽게 아실 것 같습니다.

강사님이 이렇게 중간 에러가 나는 원리를 설명해주시는데,

알다 싶이 코딩 실력, 짬(?)은 이런 에러 문자를 보고

해결하는 방식을 이해하면서 발전한다고 생각해요.

 

혼자서는 어렵지만, 강사님이 직관적으로 설명해주시고

본인 스스로도 납득할 수 있는 내용으로 알려주시기에, 학습에 큰 도움이 됩니다.


Key값을 바꾸지 않고 Value만 바꾸는 구조가 좋은 이유라든지

실제로 어떤 Action을 사용해서 나타나는 결과를 정리할지

그리고 데이터를 저장하는 방식을 이해하고 경로를 나누는 등, 

스파크를 배우면서 익혀야할 직관을 알려주십니다.

이렇게 수료증까지 취득 완료했습니다!

짧은 시간 동안 공부 집중하려니 조금 쉽지 않았던 것 같아요.

무엇보다 종강하고나서 바로 약속이랑, 공부랑 병행하려니

그냥 강의를 듣고만 있던 시간도 많이 있습니다.😅

 

다시 공부하고 싶을 때가 생길 것 같아 장학생이 종료된 뒤,

해당 강의는 재구매하려구요!

 

아래는 공부하면서 작성한 스터디 노트입니다.


하둡과 스파크 차이점

처리 모델

  • Hadoop은 MapReduce 모델을 사용하며, 데이터를 디스크 기반으로 처리합니다.
  • Spark는 Resilient Distributed Datasets (RDDs)와 유향 비순환 그래프(DAG) 실행 엔진을 사용하여 메모리 내 처리를 수행합니다3.

성능

  • Spark는 일반적으로 Hadoop보다 훨씬 빠릅니다. 특히 반복적이고 대화형 작업에서 100배까지 빠를 수 있습니다
  • Spark의 인메모리 처리 방식은 디스크 I/O를 줄여 처리 속도를 크게 향상시킵니다.

데이터 처리 방식

  • Hadoop은 주로 배치 처리에 적합합니다.
  • Spark는 실시간 데이터 처리, 반복적 알고리즘, 대화형 쿼리 등 다양한 유형의 데이터 처리를 지원합니다.

사용 편의성

  • Spark는 Java, Python, Scala, R 등 다양한 프로그래밍 언어를 지원하여 개발자 친화적입니다.
  • Hadoop MapReduce는 상대적으로 복잡하고 프로그래밍이 어려울 수 있습니다.

비용

  • Hadoop은 일반적으로 더 경제적인 옵션입니다.
  • Spark는 대용량 RAM을 필요로 하므로 클러스터 비용이 더 높을 수 있습니다

스파크 특징

RDD (Resilient Distributed Dataset)

복구 가능한 분산 데이터

RDD는 Spark의 핵심 데이터 모델로, 다음과 같은 특징을 가집니다:

  • 분산 저장된 데이터 요소들의 집합
  • 병렬 처리 가능
  • 장애 발생 시 자체 복구 기능

RDD의 복구에는 감동이 있다.

  • 손상된 RDD의 복구는 기록된 RDD의 생성 과정을 이용해서 복구함
  • 생성된 RDD는 변경되지 않는다 (불변성) RDD가 변경되는 과정을 기억하지 않고, 변경 시 새로운 RDD를 만들어 생성 과정을 기록 A를 만들 때 a가 필요했어, 이후 변경을 하려하면 a를 기반으로 A를 만들고 B로 변경하여 사용
  • In - memory 여야하는 이유가 있네 속도도 빠르고

RDD의 생성 방법:

  1. sc.parallelize(): 기존 컬렉션(예: 리스트)에서 RDD 생성
  2. sc.textFile(): 외부 데이터 소스(예: 텍스트 파일)에서 RDD 생성
  3. 기존 RDD에 변환 연산(Transformation) 적용

생성한 RDD는 어디에 저장할까?

  • 로컬 파일 시스템
  • HDFS

파티션 :

  • RDD 데이터가 클러스터를 구성하는 여러 서버에 나누어 저장될 때 데이터 단위를 의미함

Job과 Executor의 관계

  • Job: Spark에서 Job은 사용자가 제출한 전체 Spark 프로그램을 의미 하나의 Job은 여러 개의 작은 태스크(tasks)로 나뉘어 실행
  • Executor: Executor는 클러스터의 각 워커 노드에서 실행되는 프로세스 태스크를 실행하고 데이터를 메모리나 디스크에 저장하는 역할

트랜스포메이션과 액션

  • 트랜스포메이션 RDD의 형태 변환 (기존 RDD를 이용해 새 RDD 형성)
  • Action : RDD의 결과로 RDD를 만드는 것이 아닌 다른 타입의 데이터 형성하는 것

스파크 컨텍스트

  • 스파크 애플리케이션과 클러스터를 연결하는 객체
  • 스파크컨텍스트생성방법 sc = SparkContext(master=”local”, appName=”name”,conf=conf) # master = ’yarn’

RDD에서의Partition 수 지정

  • rdd1 = sc.parallelize( data, number of partitions)

RDD 기본 액션

collect : RDD의 모든 원소를 모아서 배열로 변환 (여러개의 rdd를 모아버려)

count : RDD의 전체 원소 개수 반환

 

RDD 변형

Transformation (RDD→New RDD )

  1. Map : RDD의 모든 원소에 적용하여 새로운 RDD 정의
    • rdd.map(): 각 요소에 함수를 적용하여 새로운 RDD 생성
    1. 각 그룹은 키와 해당 키에 속한 원소(value)의 시퀀스로 구성
      1. d 0 list(x[1]) 1 list(x[1])
      2. x → [("even", [2, 4, 6, 8, 10]), ("odd", [1, 3, 5, 7, 9])]
      3. list를 안하면 Iterator 객체 자체가 출력됨 (예: <pyspark.resultiterable.ResultIterable object>)그룹화 연산
      4. groupBykey() : RDD의 구성이 키와 쌍으로 이루어진 경우에 사용 가능!!
      5. rdd.groupBy() : 여러 개의 그룹을 생성
    • rdd1에 속하거나 rdd2에 속하는 원소
  2. 필터와 정렬 연산
    rdd1 = sc.parallelize(["1,2,3", "4,5,6", "7,8,9"])
    res = rdd1.pipe("cut -f 1,3 -d ,")
    print(res.collect())
    
    -b : 바이트 단위임 cut - b 1-3 은 1,2 , 4,5, 7,8 출력됨
    -c : 문자 단위 cut -c 3-7 은 3번째부터 7번째 문자 출력
    -d : 구분자를 지정하여 설정 -d , 는 ,단위로 컷
    -f : 렬단위 데이터 추출 : 구분자로 분리된 텍스트에서 특정 필드 선택
    ( -d , 로 ,단위 구분 
    
    1 / 2 / 3
    1 / 2 / 3
    4 / 5 / 6
    7 / 8 / 9
    
    1,3 선택 -> 1,3, 4,6 7,9
    
    rdd.filter(): 조건에 맞는 요소만 선택하여 새로운 RDD 생성
    rdd = sc.parallelize(range(1,101))
    
    # 인자 1 중복여부 Flase 중복 금지 랜덤 추출
    # 인자 1 중복여부 True 중복 허용 랜덤 추출 -> 전체 개수가 
    res1 = rdd.sample(False, 0.5) # 0.5 는 확률임 몇개가 추출될지는 모름
    res2 = rdd.sample(True, 1.5) # 1.5는 확률 다 동등하게 1.5%로 등장함
    print(res1.take(5), res1.count()) # 5개만 가져옴
    print(res2.take(5), res2.count()) # 5개만 가져옴
    
    결과가 리스트 형태가 
    
    6. 파티션 연산
    • 각 입력 파티션이 하나의 출력 파티션에만 영향을 미침
    • 셔플링이 발생하지 않음
    • 예시:
      • map()
      • filter()
      • flatMap()
    넓은 변환(Wide Transformation)
    • 여러 파티션의 데이터가 섞여 새로운 파티션을 생성
    • 데이터 셔플링이 발생
    • 예시:
      • groupBy()
      • groupByKey()
      • reduceByKey()
      • distinct()
      • cartesian()
      • subtract()
    distinct() Transformation
    • Wide Transformation에 해당합니다
    • 모든 파티션의 데이터를 확인하여 중복을 제거해야 하므로 셔플링(shuffling)이 발생합니다
    • 결과 RDD는 원본 RDD보다 크기가 작거나 같습니다.
    cartesian() Transformation
    • Wide Transformation에 해당합니다
    • 두 RDD의 모든 가능한 요소 쌍을 생성하므로 대규모 데이터 셔플링이 발생합니다
    • 결과 RDD는 원본 RDD들보다 크기가 커집니다
    # distinct() 예시
    
    rdd = sc.parallelize([1, 1, 2, 3])
    distinct_rdd = rdd.distinct()
    
    # 결과: [1, 2, 3]
    
    # cartesian() 예시
    
    rdd1 = sc.parallelize([1, 2])
    rdd2 = sc.parallelize(['a', 'b'])
    cartesian_rdd = rdd1.cartesian(rdd2)
    # 결과: [(1,'a'), (1,'b'), (2,'a'), (2,'b')]
    

    RDD Action : RDD를 가지고 새로운 데이터 타입으로 형성하는 매소드

    first() : 첫번째 원소 추출
    • 반환 값이 리스트 형태인 것이 sample()과의 차이점!
    • 액션이기 때문에 새로운 형태의 데이터로 출력됨 transport는 그대로 RDD 형태임
    형태가 딕셔너리 형태임필터와 정렬 연산
    rdd1 = sc.parallelize(["1,2,3", "4,5,6", "7,8,9"])
    res = rdd1.pipe("cut -f 1,3 -d ,")
    print(res.collect())
    
    -b : 바이트 단위임 cut - b 1-3 은 1,2 , 4,5, 7,8 출력됨
    -c : 문자 단위 cut -c 3-7 은 3번째부터 7번째 문자 출력
    -d : 구분자를 지정하여 설정 -d , 는 ,단위로 컷
    -f : 렬단위 데이터 추출 : 구분자로 분리된 텍스트에서 특정 필드 선택
    ( -d , 로 ,단위 구분 
    
    1 / 2 / 3
    1 / 2 / 3
    4 / 5 / 6
    7 / 8 / 9
    
    1,3 선택 -> 1,3, 4,6 7,9
    
    rdd.filter(): 조건에 맞는 요소만 선택하여 새로운 RDD 생성
    rdd = sc.parallelize(range(1,101))
    
    # 인자 1 중복여부 Flase 중복 금지 랜덤 추출
    # 인자 1 중복여부 True 중복 허용 랜덤 추출 -> 전체 개수가 
    res1 = rdd.sample(False, 0.5) # 0.5 는 확률임 몇개가 추출될지는 모름
    res2 = rdd.sample(True, 1.5) # 1.5는 확률 다 동등하게 1.5%로 등장함
    print(res1.take(5), res1.count()) # 5개만 가져옴
    print(res2.take(5), res2.count()) # 5개만 가져옴
    
    결과가 리스트 형태가 
    
    6. 파티션 연산
    • 각 입력 파티션이 하나의 출력 파티션에만 영향을 미침
    • 셔플링이 발생하지 않음
    • 예시:
      • map()
      • filter()
      • flatMap()
    넓은 변환(Wide Transformation)
    • 여러 파티션의 데이터가 섞여 새로운 파티션을 생성
    • 데이터 셔플링이 발생
    • 예시:
      • groupBy()
      • groupByKey()
      • reduceByKey()
      • distinct()
      • cartesian()
      • subtract()
    distinct() Transformation
    • Wide Transformation에 해당합니다
    • 모든 파티션의 데이터를 확인하여 중복을 제거해야 하므로 셔플링(shuffling)이 발생합니다
    • 결과 RDD는 원본 RDD보다 크기가 작거나 같습니다.
    cartesian() Transformation
    • Wide Transformation에 해당합니다
    • 두 RDD의 모든 가능한 요소 쌍을 생성하므로 대규모 데이터 셔플링이 발생합니다
    • 결과 RDD는 원본 RDD들보다 크기가 커집니다
    # distinct() 예시
    
    rdd = sc.parallelize([1, 1, 2, 3])
    distinct_rdd = rdd.distinct()
    
    # 결과: [1, 2, 3]
    
    # cartesian() 예시
    
    rdd1 = sc.parallelize([1, 2])
    rdd2 = sc.parallelize(['a', 'b'])
    cartesian_rdd = rdd1.cartesian(rdd2)
    # 결과: [(1,'a'), (1,'b'), (2,'a'), (2,'b')]
    

대박 중요!

rdd = sc.parallelize(range(1,11), 3) 은 3개 덩어리를 나눈다는 것!

1,2,3,/ 4,5,6 / 7,8,9,10 or 1,2,3,4 / 5,6,7 / 8,9,10 이렇게 될지

모른다! 정해지지 않음

RDD 데이터의 저장과 불러오기

RDD → Hadoop

Hadoop → RDD

스파크 지원 데이터 형식

  • 하둡 api 기반으로 다양한 형태 데이터 지원

텍스트 파일 저장 및 로드 방법

# 저장 
rdd.saveAsTextFile("path")
#로드
rdd = sc.textFile("path")

시퀀스 파일의 저장 및 로드

  • 시퀀스 파일이란 키와 값으로 구성된 데이터가 저장된 바이너리 형식 (쌍)
  • 하둡에서 자주 사용되는 대량 데이터 처리에 적합한 분할 압축 기능을 포함한 파일 관리 구조