목차
[ Composer ]
Composer는 GCP에서 사용하는 서비스이다.
Airflow인데 GCP 내에서는 Composer 로 Airflow를 구동시킨다.
결국 Composer == Airflow 와 동일하다고 볼 수 있다.
GCP에서는 Composer로 작업을 스캐줄할 수 있다.
당연히 Airflow라서 DAG를 생성해야 한다.
이번 글에서는 GCP에서 Dataproc용 Dag를 구성하고 Spark 코드를 실행하는 것을 알아보겠다.
[ DAG, Directed Acyclic Graph ]
DAG는 비순환그래프(Directed Acyclic Graph) 라는 건데 이 구조를 통해서 Airflow 작업은 스케쥴된다.
DAG는 Python으로 작성하면 된다. 이말은 Airflow도 Python으로 동작한다는 의미와 같다.
DAG에는 Operator이 존재하는데 이 Operator들이 job을 하나씩 실행한다.
Bash Operator은 Bash Job을 실행시키고 Python Operator은 Python 스크립트를 실행한다.
그런데 Dataproc Operation을 실행하려면 3가지가 필요하다.
dataproc.DataprocCreateClusterOperator 와 dataproc.DataprocSubmitJobOperator
그리고 dataproc.DataprocDeleteClusterOperator 이다.
왜 3개인지 알아보자
[ DataProc Operator ]
DataProc은 간단하게 하둡을 구성한다고 생각하면 편하다.
하둡은 빅데이터 시스템 입니다. Master가 존재하고 Worker가 존재합니다.
Master가 Worker를 구동시켜서 분산처리를 합니다.
그런데 GCP Composer로 DataProc을 사용하면 비용을 줄일 수 있다.
빅데이터 시스템이 계속 동작하면 비용이 나갈 것 아닌가?
그래서 DataProc을 매번 생성하고 삭제하면서 비용을 아낄 수 도 있다.
위 말을 풀어보면,
dataproc.DataprocCreateClusterOperator 로 DataProc을 생성하고
dataproc.DataprocSubmitJobOperator 로 Spark 작업을 실행하고
dataproc.DataprocDeleteClusterOperator 로 DataProc을 삭제한다.
와 같다.
[ DAG Code 와 Spark Code ]
DataProc을 생성하려면 생성하려는 이미지 버전을 알아야 한다.
지금 사용하려는 이미지는 "2.1.41-debian11" 이다.
이미지 안에는 여러가지 하둡 관련된 라이브러리와 데이터 분석과 관련된 라이브러리가 설치되어 있다.
아래 링크로 가면 해당 이미지에 설치된 라이브러리와 버전을 알 수 있다.
이제 코드를 보자 먼저 DAG Code이다.
DAG Code
from airflow import models
from airflow.providers.google.cloud.operators import dataproc
from airflow.utils import trigger_rule
from datetime import datetime
import pendulum
def set_pyspark_job_configure(spark_code, app_name, job_config):
job_configure = {
'pyspark_job': {
'main_python_file_uri': spark_code,
'args': [app_name, job_config]
},
'reference': {'project_id': PROJECT_ID},
'placement': {'cluster_name': CLUSTER_NAME}
}
return job_configure
local_tz = pendulum.timezone("Asia/Seoul")
PROJECT_ID = "MY-GCP-Project"
REGION = "us-west1-a"
BUCKET = "gs://my_gcp_bucket"
DATASET = "TEST_DATASET"
TABLE = "TEST_TABLE"
# cluster setting
# {{ ds-nodash }} 는 오늘 날짜를 넣어준다.
# Machine_type_uri는 이미지를 생성한 머신이다. Machine에 따라 스펙이 다르다.
CLUSTER_NAME = "this-is-dataproc-job-{{ ds_nodash }}"
CLUSTER_CONFIG = {
"gce_cluster_config": {
"subnetwork_uri": "my_sub_network_uri",
"zone_uri": f"{REGION}"
},
"master_config": {
"num_instances": 1,
"machine_type_uri": "n2-standard-4",
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 256}
},
"worker_config": {
"num_instances": 2,
"machine_type_uri": "n2-standard-8",
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 256}
},
"software_config": {
"image_version": "2.1.22-debian11"
},
"initialization_actions": [
{
"executable_file": f"{BUCKET}/dags/shell/install_googlemaps.sh",
},
]
}
# job_config & spark code
JOB_CONFIGURE = {
'driver': 'com.microsoft.sqlserver.jdbc.SQLServerDriver',
'job_arguments': [
{'project_id': f'{PROJECT_ID}', 'db_type': 'bigquery', 'dataset': f'{DATASET}', 'table': f'{TABLE}', 'bucket': f'{BUCKET}'},
],
}
SPARK_CODE_ALL = f"{BUCKET}/dags/spark/test_spark_code.py"
JOB_ALL = set_pyspark_job_configure(SPARK_CODE_ALL, 'PROD_ADDR_TO_COUNTRY_FOR_SELLER_ALL', str(JOB_CONFIGURE))
# DAG 정의
with models.DAG(
'real.dag_for_test', # DAG 이름
schedule_interval=None, #'30 10 * * *', # 스케줄링 설정: 매일 오전 10시 30분 / None 은 1회성
start_date=datetime(2023, 8, 10, tzinfo=local_tz), # DAG 시작 날짜 설정
catchup=False, # 과거 작업을 실행하지 않음
) as deg:
create_cluster_task = dataproc.DataprocCreateClusterOperator(
task_id="create_cluster",
project_id=PROJECT_ID,
cluster_name=CLUSTER_NAME,
cluster_config=CLUSTER_CONFIG,
region=REGION,
)
addr_to_country_for_seller_all = dataproc.DataprocSubmitJobOperator(
task_id="real_test_job",
job=JOB_ALL,
region=REGION,
project_id=PROJECT_ID
)
delete_cluster_task = dataproc.DataprocDeleteClusterOperator(
task_id="delete_cluster",
project_id=PROJECT_ID,
cluster_name=CLUSTER_NAME,
region=REGION,
trigger_rule=trigger_rule.TriggerRule.ALL_DONE, # 이렇든 저렇든 실행한다는 의미
deferrable=True
)
# 태스크 순서 정의: 첫 번째 작업이 성공하면 두 번째 작업으로 진행
create_cluster_task >> addr_to_country_for_seller_all >> delete_cluster_task
Spark Code
from ast import literal_eval
from datetime import timedelta, datetime
from pytz import timezone
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DateType, DoubleType
import logging
import sys
def create_sql(project_id, dataset_name, table_name):
select_query = (
f"""
SELECT
*
FROM `{project_id}.{dataset_name}.{table_name}`
"""
)
return select_query
def read_spark_df(spark, project_id, job_type, db_type, dataset_name, table_name, today):
logging.info(f"job_type: {job_type}")
logging.info(f"Project: {project_id}, DB_Type: {db_type}, Dataset: {dataset_name}, Table: {table_name}")
select_query = create_sql(project_id, dataset_name, table_name)
bq_df = spark.read.format("bigquery") \
.option("project", project_id) \
.option("query", select_query) \
.option("parallelism", 10) \
.option("maxPartitions", 100) \
.load()
return bq_df
def info_call(df):
# 읽어들인 df의 컬럼 개수와 컬럼의 타입을 로그로 남긴다.
logging.info("DataFrame의 크기: {}".format(df.count()))
logging.info("DataFrame의 컬럼 개수: {}".format(len(df.columns)))
logging.info("DataFrame의 컬럼과 데이터 타입:")
for col_name, col_type in df.dtypes:
logging.info("{}: {}".format(col_name, col_type))
logging.info(df.show(10))
return None
def df_withcolumn(df):
# 실제 컬럼명으로 변경한다.
df = df.withColumn("col_1", F.col("col_1").cast(IntegerType()))
df = df.withColumn("col_2", F.col("col_2").cast(StringType()))
df = df.withColumn("col_3", F.col("col_3").cast(StringType()))
return df
# 로그 레벨 설정 (예: WARN, INFO, DEBUG)
logging.basicConfig(level=logging.INFO)
# 쿼리에 사용할 환경변수 선언
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
today = (datetime.now(timezone('Asia/Seoul')) - timedelta(days=0)).strftime("%Y-%m-%d")
yesterday = (datetime.now(timezone('Asia/Seoul')) - timedelta(days=1)).strftime("%Y-%m-%d")
# Dag에서 전달하는 값을 받
app_name = sys.argv[1]
job_config = literal_eval(sys.argv[2])
read_config = job_config.get('job_read_arguments')[0]
write_config = job_config.get('job_write_arguments')[1]
read_project_id = read_config.get('project_id')
read_db_type = read_config.get('db_type')
read_dataset_name = read_config.get('dataset')
read_table_name = read_config.get('table')
read_bucket = read_config.get('bucket')
write_project_id = write_config.get('project_id')
write_db_type = write_config.get('db_type')
write_dataset_name = write_config.get('dataset')
write_table_name = write_config.get('table')
write_bucket = write_config.get('bucket')
# Spark 세션 설정
logging.info(f"Create Spark Session")
spark_session = SparkSession \
.builder \
.appName(app_name) \
.config('spark.debug.maxToStringFields', 500) \
.config('spark.sql.execution.arrow.pyspark.enabled', 'true') \
.config('spark.sql.parquet.writeLegacyFormat', 'true') \
.config('spark.sql.legacy.timeParserPolicy', 'CORRECTED') \
.getOrCreate()
# 읽기
logging.info(f"Spark Read Table From BQ")
df = read_spark_df(spark_session, read_project_id, read_db_type, read_dataset_name, read_table_name)
# 생성된 df 정보 확인
info_call(df)
# 필드의 스키마를 고정
logging.info(f"Spark PreProcessing : Fix Schema")
df = df_withcolumn(df)
# DataFrame을 BigQuery 테이블로 저장
logging.info(f"Spark Write to BQ New : {write_project_id}.{write_dataset_name}.{write_table_name}")
df.write \
.format("bigquery") \
.option("temporaryGcsBucket", f"{write_bucket.split('//')[1]}") \
.option("partitionField", "batch_dt") \
.option("partitionType", "DAY") \
.option("parallelism", 10) \
.option("maxPartitions", 100) \
.option("autodetect", "false") \
.option("table", f"{write_project_id}.{write_dataset_name}.{write_table_name}") \
.mode("overwrite") \
.save()
spark_session.stop()
'동굴 속 정보' 카테고리의 다른 글
칼하트 WIP 디트로이트 L 구매와 오씨에스코리아 배송 조회 (0) | 2024.03.12 |
---|---|
성남에서 미취업 청년지원에게 100만원 쏜다 (0) | 2024.03.11 |
경기도 청년지원금 나도 받을 수 있을까? (0) | 2024.03.09 |
메이시스(macys) 주문 성공 주소 만들기 (0) | 2024.03.09 |
괌에서 관광지는 이걸로 종결 (1) | 2024.03.04 |