본문 바로가기
동굴 속 정보

GCP에서 Composer로 Dataproc 실행

by 도시형닌자 2024. 3. 10.

 

목차

     

    [ Composer ]

    Composer는 GCP에서 사용하는 서비스이다.

    Airflow인데 GCP 내에서는 Composer 로 Airflow를 구동시킨다. 

    결국 Composer == Airflow 와 동일하다고 볼 수 있다.

     

    GCP에서는 Composer로 작업을 스캐줄할 수 있다.

    당연히 Airflow라서 DAG를 생성해야 한다.

     

    이번 글에서는 GCP에서 Dataproc용 Dag를 구성하고  Spark 코드를 실행하는 것을 알아보겠다.

    [ DAG, Directed Acyclic Graph ]

    DAG는 비순환그래프(Directed Acyclic Graph) 라는 건데 이 구조를 통해서 Airflow 작업은 스케쥴된다. 

    DAG는 Python으로 작성하면 된다. 이말은 Airflow도 Python으로 동작한다는 의미와 같다.

     

    DAG에는 Operator이 존재하는데 이 Operator들이 job을 하나씩 실행한다.

    Bash Operator은 Bash Job을 실행시키고 Python Operator은 Python 스크립트를 실행한다.

     

    그런데 Dataproc Operation을 실행하려면 3가지가 필요하다.

    dataproc.DataprocCreateClusterOperator 와 dataproc.DataprocSubmitJobOperator

    그리고 dataproc.DataprocDeleteClusterOperator 이다.

     

    왜 3개인지 알아보자

    [ DataProc Operator ]

    DataProc은 간단하게 하둡을 구성한다고 생각하면 편하다.

    하둡은 빅데이터 시스템 입니다. Master가 존재하고 Worker가 존재합니다.

    Master가 Worker를 구동시켜서 분산처리를 합니다.

     

    그런데 GCP Composer로 DataProc을 사용하면 비용을 줄일 수 있다.

    빅데이터 시스템이 계속 동작하면 비용이 나갈 것 아닌가?

    그래서 DataProc을 매번 생성하고 삭제하면서 비용을 아낄 수 도 있다.

     

    위 말을 풀어보면,

    dataproc.DataprocCreateClusterOperator 로 DataProc을 생성하고 

    dataproc.DataprocSubmitJobOperator 로 Spark 작업을 실행하고

    dataproc.DataprocDeleteClusterOperator 로 DataProc을 삭제한다.

    와 같다.

    [ DAG Code 와 Spark Code ]

    DataProc을 생성하려면 생성하려는 이미지 버전을 알아야 한다.

    지금 사용하려는 이미지는 "2.1.41-debian11" 이다. 

     

    이미지 안에는 여러가지 하둡 관련된 라이브러리와 데이터 분석과 관련된 라이브러리가 설치되어 있다.

    아래 링크로 가면 해당 이미지에 설치된 라이브러리와 버전을 알 수 있다.

     

     

    이제 코드를 보자 먼저 DAG Code이다.

    DAG Code

    from airflow import models
    from airflow.providers.google.cloud.operators import dataproc
    from airflow.utils import trigger_rule
    from datetime import datetime
    import pendulum
    
    
    def set_pyspark_job_configure(spark_code, app_name, job_config):
        job_configure = {
            'pyspark_job': {
                'main_python_file_uri': spark_code,
                'args': [app_name, job_config]
            },
            'reference': {'project_id': PROJECT_ID},
            'placement': {'cluster_name': CLUSTER_NAME}
        }
        return job_configure
    
    
    local_tz = pendulum.timezone("Asia/Seoul")
    
    PROJECT_ID = "MY-GCP-Project"
    REGION = "us-west1-a"
    BUCKET = "gs://my_gcp_bucket"
    DATASET = "TEST_DATASET"
    TABLE = "TEST_TABLE"
    
    # cluster setting
    # {{ ds-nodash }} 는 오늘 날짜를 넣어준다.
    # Machine_type_uri는 이미지를 생성한 머신이다. Machine에 따라 스펙이 다르다.
    CLUSTER_NAME = "this-is-dataproc-job-{{ ds_nodash }}"
    CLUSTER_CONFIG = {
        "gce_cluster_config": {
            "subnetwork_uri": "my_sub_network_uri",
            "zone_uri": f"{REGION}"
        },
        "master_config": {
            "num_instances": 1,
            "machine_type_uri": "n2-standard-4",
            "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 256}
        },
        "worker_config": {
            "num_instances": 2,
            "machine_type_uri": "n2-standard-8",
            "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 256}
        },
        "software_config": {
            "image_version": "2.1.22-debian11"
        },
        "initialization_actions": [
            {
                "executable_file": f"{BUCKET}/dags/shell/install_googlemaps.sh",
            },
        ]
    }
    
    # job_config & spark code
    JOB_CONFIGURE = {
        'driver': 'com.microsoft.sqlserver.jdbc.SQLServerDriver',
        'job_arguments': [
            {'project_id': f'{PROJECT_ID}', 'db_type': 'bigquery', 'dataset': f'{DATASET}', 'table': f'{TABLE}', 'bucket': f'{BUCKET}'},
        ],
    }
    
    SPARK_CODE_ALL = f"{BUCKET}/dags/spark/test_spark_code.py"
    
    JOB_ALL = set_pyspark_job_configure(SPARK_CODE_ALL, 'PROD_ADDR_TO_COUNTRY_FOR_SELLER_ALL', str(JOB_CONFIGURE))
    
    # DAG 정의
    with models.DAG(
        'real.dag_for_test',  # DAG 이름
        schedule_interval=None, #'30 10 * * *',  # 스케줄링 설정: 매일 오전 10시 30분 / None 은 1회성
        start_date=datetime(2023, 8, 10, tzinfo=local_tz),  # DAG 시작 날짜 설정
        catchup=False,  # 과거 작업을 실행하지 않음
    ) as deg:
    
        create_cluster_task = dataproc.DataprocCreateClusterOperator(
            task_id="create_cluster",
            project_id=PROJECT_ID,
            cluster_name=CLUSTER_NAME,
            cluster_config=CLUSTER_CONFIG,
            region=REGION,
        )
    
        addr_to_country_for_seller_all = dataproc.DataprocSubmitJobOperator(
            task_id="real_test_job",
            job=JOB_ALL,
            region=REGION,
            project_id=PROJECT_ID
        )
    
        delete_cluster_task = dataproc.DataprocDeleteClusterOperator(
            task_id="delete_cluster",
            project_id=PROJECT_ID,
            cluster_name=CLUSTER_NAME,
            region=REGION,
            trigger_rule=trigger_rule.TriggerRule.ALL_DONE, # 이렇든 저렇든 실행한다는 의미
            deferrable=True
        )
    
    
        # 태스크 순서 정의: 첫 번째 작업이 성공하면 두 번째 작업으로 진행
        create_cluster_task >> addr_to_country_for_seller_all >> delete_cluster_task

    Spark Code

    from ast import literal_eval
    from datetime import timedelta, datetime
    from pytz import timezone
    from pyspark.sql import SparkSession
    from pyspark.sql import functions as F
    from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DateType, DoubleType
    import logging
    import sys
    
    
    def create_sql(project_id, dataset_name, table_name):
        select_query = (
            f"""
                SELECT 
                    *
                FROM `{project_id}.{dataset_name}.{table_name}`
                """
        )
        return select_query
    
    
    def read_spark_df(spark, project_id, job_type, db_type, dataset_name, table_name, today):
        logging.info(f"job_type: {job_type}")
        logging.info(f"Project: {project_id}, DB_Type: {db_type}, Dataset: {dataset_name}, Table: {table_name}")
    
        select_query = create_sql(project_id, dataset_name, table_name)
    
        bq_df = spark.read.format("bigquery") \
            .option("project", project_id) \
            .option("query", select_query) \
            .option("parallelism", 10) \
            .option("maxPartitions", 100) \
            .load()
    
        return bq_df
    
    
    def info_call(df):
        # 읽어들인 df의 컬럼 개수와 컬럼의 타입을 로그로 남긴다.
        logging.info("DataFrame의 크기: {}".format(df.count()))
        logging.info("DataFrame의 컬럼 개수: {}".format(len(df.columns)))
        logging.info("DataFrame의 컬럼과 데이터 타입:")
        for col_name, col_type in df.dtypes:
            logging.info("{}: {}".format(col_name, col_type))
        logging.info(df.show(10))
        return None
    
    
    def df_withcolumn(df):
        # 실제 컬럼명으로 변경한다.
        df = df.withColumn("col_1", F.col("col_1").cast(IntegerType()))
        df = df.withColumn("col_2", F.col("col_2").cast(StringType()))
        df = df.withColumn("col_3", F.col("col_3").cast(StringType()))
        return df
    
    
    
    # 로그 레벨 설정 (예: WARN, INFO, DEBUG)
    logging.basicConfig(level=logging.INFO)
    
    # 쿼리에 사용할 환경변수 선언
    now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    today = (datetime.now(timezone('Asia/Seoul')) - timedelta(days=0)).strftime("%Y-%m-%d")
    yesterday = (datetime.now(timezone('Asia/Seoul')) - timedelta(days=1)).strftime("%Y-%m-%d")
    
    # Dag에서 전달하는 값을 받
    app_name = sys.argv[1]
    job_config = literal_eval(sys.argv[2])
    read_config = job_config.get('job_read_arguments')[0]
    write_config = job_config.get('job_write_arguments')[1]
    
    read_project_id = read_config.get('project_id')
    read_db_type = read_config.get('db_type')
    read_dataset_name = read_config.get('dataset')
    read_table_name = read_config.get('table')
    read_bucket = read_config.get('bucket')
    
    write_project_id = write_config.get('project_id')
    write_db_type = write_config.get('db_type')
    write_dataset_name = write_config.get('dataset')
    write_table_name = write_config.get('table')
    write_bucket = write_config.get('bucket')
    
    # Spark 세션 설정
    logging.info(f"Create Spark Session")
    spark_session = SparkSession \
        .builder \
        .appName(app_name) \
        .config('spark.debug.maxToStringFields', 500) \
        .config('spark.sql.execution.arrow.pyspark.enabled', 'true') \
        .config('spark.sql.parquet.writeLegacyFormat', 'true') \
        .config('spark.sql.legacy.timeParserPolicy', 'CORRECTED') \
        .getOrCreate()
    
    # 읽기
    logging.info(f"Spark Read Table From BQ")
    df = read_spark_df(spark_session, read_project_id, read_db_type, read_dataset_name, read_table_name)
    
    # 생성된 df 정보 확인
    info_call(df)
    
    # 필드의 스키마를 고정
    logging.info(f"Spark PreProcessing : Fix Schema")
    df = df_withcolumn(df)
    
    
    # DataFrame을 BigQuery 테이블로 저장
    logging.info(f"Spark Write to BQ New : {write_project_id}.{write_dataset_name}.{write_table_name}")
    df.write \
        .format("bigquery") \
        .option("temporaryGcsBucket", f"{write_bucket.split('//')[1]}") \
        .option("partitionField", "batch_dt") \
        .option("partitionType", "DAY") \
        .option("parallelism", 10) \
        .option("maxPartitions", 100) \
        .option("autodetect", "false") \
        .option("table", f"{write_project_id}.{write_dataset_name}.{write_table_name}") \
        .mode("overwrite") \
        .save()
    
    
    spark_session.stop()