고기 대신 SW 한점/Public Cloud

Glue, Athena를 활용한 Data 정합성 구성

지식한점 2023. 3. 7. 16:41
반응형

마이크로서비스간 데이터 동기화를 위해 Kafka를 활용한 CDC 구현과 그 구현의 데이터 정합성을 검사하기 위해서 RDS PostgreSQL의 Table을 S3로 복사하고 이를 Athena로 쿼리할 수 있도록 구성하는 방법을 설명합니다.

먼저 타깃 DB로의 데이터를 옮길 필요가 있을 때 흔히 CDC와 함께 고려되는 것은 ETL 솔루션인데 그것이 어떤 것인지 간단히 알아보도록 합시다.
CDC와 ETL은 소스 DB의 데이터를 선별해 타깃 DB로 옮긴다는 방식에서는 같지만, 두 솔루션의 용도와 목적은 판이하게 다릅니다.

CDC Changed Data Capture의 약자로 데이터베이스의 변경된 내용만 추출하여 다른 시스템 간 데이터 복제 및 활용을 할 수 있게 하는 솔루션입니다. 반면 ETL은 추출, 변환, 적재(Extract, transform, load)의 의미를 담고 있습니다. 즉 다양한 소스 시스템으로부터 필요한 데이터를 추출(extract) 하여 변환(transformation) 작업을 거쳐 저장하거나 분석을 담당하는 시스템으로 전송 및 적재(loading) 하는 모든 과정을 포함하고 있습니다.

 

 

CDC vs ETL 장단점 비교

  • CDC
    • 운영 환경에서 큰 부하 없이 DB 복제 가능
    • 장애 발생 시 일부 데이터에 대한 정합성 보장이 어려움
  • ETL
    • 한 번에 많은 데이터를 처리하므로 데이터 변환 및 정합성 확보에 유리
    • 소스 DB 부하를 고려하면 실시간 처리가 어려움

AWS Glue
  - 2017년부터 제공
  - 완전 관리형 ETL 서비스
  - 대표기능
     --데이터를 읽어와 자동으로 table scheme을 생성해주는 Data Catalog
     --데이터 수집 및 변환 코드를 자동으로 생성하고 실행해주는 ETL Job (Apache Spark 기반)

 

AWS S3
  - 높은 내구성과 가용성, 확장성
  - 보안과 컴플라이언스
  - 유연한 관리: Tiering 및 lifecycle 정책 적용 가능

 

AWS Athena
  - 표준 SQL을 사용해서 Amazon S3의 데이터를 분석하는 대화식 쿼리 서비스
  - 서버리스 서비스로서 설정 및 관리해야 할 인프라가 없음
  - 쿼리 실행에 대해서만 지불하며, 압축을 통해서 쿼리당 30~90% 비용 절감 가능

  1. JDBC Crawler: DB에서 AS-IS 테이블들의 Schema를 자동으로 읽어서 Data Catalog에 저장합니다.
  2. JDBC Crawler: DB에서 TO-BE 테이블들의 Schema를 자동으로 읽어서 Data Catalog에 저장합니다.
  3. ETL Jobs: 미리 설정된 시간에 trigger에 의해 실행되며, Data Catalog를 참조하여 DB 테이블을 S3에 복사합니다.
  4. S3 Crawler: S3의 데이터를 읽어서 자동으로 생성한 테이블 Schema를 Data Catalog에 저장합니다.
  5. Athena: Data Catalog의 테이블 Schema를 이용하여 S3의 테이블 데이터를 SQL 쿼리로 조회합니다.

기대효과

  • 완전 관리형 서비스인 Glue와 Athena 도입으로 인프라 운영 부담
  • 서버리스 형태인 Glue와 Athena를 사용하여 유휴 비용을 최소화
  • S3에 복제된 데이터를 활용하므로 운영중인 워크로드에 대한 영향 없이 정합성 분석 가능
  • 부하가 적인 시간대에 일대사를 통해 S3에 데이터를 복제 
  • S3 암호화를 통해 저장된 데이터 보호
  • S3에서 Amazon Macie를 사용하면 중요한 데이터를 대규모로 검색하고 보호 가능

제약사항

  • 실시간적으로 정합성을 확인하기 어려움 
  • 상세 분석을 위해서는 표준 SQL과 python에 대한 지식이 필요 

 

사전준비

 

데이터 검증을 위한 PoC S3 Bucket 구조

  • DataSync 앱 테이블 : <BUCKET_NAME>/datasync/<TABLE_NAME>
  • SubNWrite 앱 테이블 : <BUCKET_NAME>/subnwrite/<TABLE_NAME>
  • Athena 쿼리 output 저장소 : <BUCKET_NAME>/athena/output<TABLE_NAME>

 

Glue Data Catalog 구성하기

  • DB의 데이터를 S3로 이전하기 위해서 source DB의 Schema 정보가 필요합니다. 
  • Glue Data Catalog 기능을 활용하여 자동으로 DB Schema 가져올 수 있습니다. 

 

Database 추가하기

  • 콘솔에서 AWS Glue에 접속합니다. (https://console.aws.amazon.com/glue)
  • Database를 선택하고 Add database 버튼을 클릭합니다. 
  • Database 이름에 적당한 이름(예. jdbc)을 입력하고 Create합니다. 

 

Connection 추가하기

  • Schema를 가져오기 위한 Crawler에서 사용할 connection을 생성합니다. 
  • 왼쪽 메뉴에서 Connection을 선택하고 Add connection을 클릭합니다.
  • Connection 이름을 입력하고, connection type으로는 Amazon RDS를 선택한 후, Database engine(여기서는 Amazon Aurora)을 선택하고 Next를 클릭합니다.
  • 데이터스토어 설정 화면에서 Instance, Database name, username, password을 입력하고 Next를 클릭한 후 Finish를 클릭합니다. (DB에 대한 Security Group 정보를 설정해야 함 :
    Setting up a VPC to connect to JDBC data stores for AWS Glue - AWS Glue )
  • 새로 생성한 connection이 보이면, Test connection을 클릭하여 접속에 성공하는지 확인합니다. 

 

 

Crawler 추가하기

  • 왼쪽 메뉴에서 Crawlers을 선택하고 Add crawler을 클릭합니다.
  • Crawler 이름에 적당한 이름(예. from_rds_datasync)을 입력하고 Next를 클릭합니다.
  • Crawler source type으로 Data Store를 선택하고 Next를 클릭합니다.
  • Add a data store 화면에서 data store로 JDBC를 선택하고, connection으로 위에서 생성한 connection을 선택한 후 include path에 database 이름과 스키마를 입력하고 Next를 클릭합니다. 
  • 추가로 스키마를 가져올 테이블이 있으면 Add another data store를 Yes를 선택하여 추가합니다.
  • 다음으로 IAM Role을 선택한 후, Frequency는 우선 편의를 위해 Run on demand를 선택합니다. 
  • Crawler’s output 설정 화면에서 위에서 생성한 database를 선택하고 Next를 클릭합니다. 
  • ‘Run it now?’ 링크를 클릭하거나 crawler를 선택하여 실행합니다. 

 

  • 약 2분이 지나면 crawling이 완료되며,  성공 메시지를 확인합니다. 
  • 이제 Databases > Tables로 이동하면, 테이블들이 보입니다. 

Crawler 스케쥴링

  • PoC 환경에서는 아래 화면과 같이 매일 새벽 1시(16:00 UTC+0)에 RDS 스키마를 읽어오도록 구성했습니다.
  • 참고로, 나중에 설정할 Athena용 S3 테이블 스키마는 매일 새벽 4시로 설정되어 있는 것을 볼 수 있습니다.

 

Glue ETL 구성하기

  • Data Catalog에서 자동으로 구성한 Schema를 이용해서 데이터를 S3로 이전하기 위해서 Spark Job을 생성합니다. 
  • Glue는 Data Catalog의 메타데이터를 활용하여 Python 또는 Scala script를 자동으로 생성해 줍니다. 
  • 자동으로 생성된 Script를 수정하여 다양한 부가적인 처리를 할 수 있습니다. 

 

Spark Jobs

 

  • Job은 테이블 마다 하나씩 생성합니다. 
  • Job properties 항목에서 아래와 같이 입력 후 Next를 클릭합니다. 
    • Name: 적당한 이름을 입력 (예. pt_la_album_img)
    • IAM role: 위에서 생성한 role을 선택
    • Type: Spark (그대로 유지)
    • Glue Version: Spark 2.4, Python 3 (Glue Version 2.0) (그대로 유지)
    • 나머지는 값들은 초기 값 그대로
  • Data source 항목에서 source 테이블을 선택
  • Transform type은 Change schema를 선택
  • Data Target 항목에서는 ‘Create tables in your data target’을 선택한 후 아래와 같이 입력하고 Next를 클릭합니다. 
    • Data store: Amazon S3
    • Format: Parquet
    • Target path: <테이블 별 저장 path 
  • ‘Save job and edit script’ 버튼을 클릭합니다. 
  • 아래와 같은 python 스크립트가 자동으로 생성됩니다. 
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
.......................................................
..................................................

 

  • 자동 생성된 스크립트를 반복 실행하는 경우 같은 데이터가 중복되므로 아래와 같이 target S3 path를 비우는 구문을 추가합니다. 
...

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

## S3 Delete
glueContext.purge_s3_path("s3://verification-daily-0215/datasync/xxx", {"retentionPeriod":0})

...
  • Script editor 화면에서 script 내용을 확인한 후 Save와 Run job을 클릭하여 job을 실행합니다.
  • 우측 상단의 ‘X’ 버튼을 클릭하여 editor를 종료합니다. 
  • 위와 같는 방법으로 to-be 테이블 3개를 포함하여 총 13개의 테이블에 대해서 Job을 생성합니다.

 

Triggers

  • 이제 ETL Job을 매일 새벽 01시 20분에 자동으로 실행하기 위해서 trigger를 구성합니다. 
  • 메뉴에서 Trigger를 선택한 후 Add trigger를 클릭합니다. 
  • trigger를 추가합니다. 

 

Athena로 쿼리하기

  • Athena는 data source로 Data Catalog에 등록된 테이블을 지원합니다. 
  • 이를 위해 Data Catalog의 crawler를 이용해 Athena에서 사용할 schema를 추가로 생성합니다. 
  • 이후 정합성 검증을 위한 쿼리를 실행하여 ad-hoc으로 결과를 확인할 수 있습니다. 
  • 결과 열람을 위한 테이블을 추가로 생성해서 QuickSight나 Grafana 등의 dashboard와 연결할 수 있습니다. 

Data Catalog를 이용하여 Table 준비하기

  • Athena에서 Glue ETL Job을 통해 S3로 이전된 데이터를 쿼리하기 위해서 crawler를 이용하여 테이블을 추가 생성합니다. 
  • Crawler 메뉴에서 Add crawler를 클릭한 후 아래와 예시와 같은 설정으로 새로운 crawler를 추가합니다.

 

 

  • 같은 방법으로 subnwrite 테이블들에 대한 crawler도 생성합니다.

 

Athena에서 테이블 내용 확인

  • Athena > Query Editor로 이동 후, 좌측 메뉴에서 Data Source와 Database를 선택하면 위에서 추가한 테이블 목록이 표시됩니다.
  • 테이블 이름 우측의 context menu(⋮)를 클릭하여 Preview Table을 수행할 수 있습니다.


 

 

 

 

 

 

반응형