Skip to content

应用开发

提供ETL工程化的项目示例,帮助初学者快速理解和学习ETL完整的工程化开发。

任务需求

现有汽车信息数据car_price.csv

  • CarName字段包含[dirty data]数据进行纠正,去除字符串[dirty data]

  • 删除price小于10000的汽车数据

  • 最终结果Schema:car_id, symboling, car_name, price, 将结果导出json文件

在命令行使用cookiecutter创建项目骨架:

❯ cookiecutter https://github.com/pyloong/cookiecutter-pythonic-project-bigdata-etl
project_name [My Project]: Automotive Data Etl
project_slug [automotive_data_etl]:
project_description [My Awesome Project!]: This is my first etl package, i love it.
author_name [Author]: ming
author_email [ming@example.com]: ming@gmail.com
version [0.1.0]:
Select python_version:
1 - 3.10
2 - 3.9
Choose from 1, 2 [1]:
use_src_layout [y]:
use_poetry [y]:
use_docker [n]:
Select ci_tools:
1 - none
2 - Gitlab
3 - Github
Choose from 1, 2, 3 [1]:
Select use_framework:
1 - none
2 - pyspark
Choose from 1, 2 [1]: 2

Task类

创建汽车数据ETL任务AutomotiveDataTask类,src/automotive_data_etl/tasks/automotive_task/task.py

task.py
"""Processing car data task."""
import logging

from pyspark.sql import DataFrame

from automotive_data_etl.tasks.abstract.task import AbstractTask
from automotive_data_etl.tasks.automotive_task.automotive_transform import AutomotiveDataTransform


class AutomotiveDataTask(AbstractTask):
    """Processing car data task."""

    def __init__(self):
        super().__init__()
        self.spark = self.ctx.get_spark_session()

    def _extract(self) -> DataFrame:
        """Read CSV file return DataFrame"""
        df: DataFrame = self.spark.read.csv(
            self.settings.input_path,
            encoding='utf-8',
            header=True,
            inferSchema=True
        )
        this.logger.info(f'Extract data from {self.settings.input_path}')
        return df

    def _transform(self, df: DataFrame) -> DataFrame:
        """execute CarTransform transform function"""
        return AutomotiveDataTransform().transform(df)

    def _load(self, df: DataFrame) -> None:
        """Load final data to output path"""
        df.write.json(self.settings.output_path, mode='overwrite', encoding='utf-8')
        this.logger.info(f'Load data to {self.settings.output_path}')

每一个Task任务都会经过“输入”、“转换”和“输出”的过程,实现AbstractTask中的_extract_transform_load抽象方法

  • _extract:读取tmp/input/car_price.csvCSV文件
  • _transform:执行将实现的Transform类的transform方法
  • _load:将DataFrame以Json格式写入tmp/output目录下

Transform类

创建汽车数据AutomotiveDataTransform类,src/automotive_data_etl/tasks/automotive_task/automotive_transform.py

automotive_transform.py
"""Car data Transformation."""
from functools import reduce

from pyspark.sql import DataFrame
from pyspark.sql.functions import col
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

from automotive_data_etl.tasks.abstract.transform import AbstractTransform


class AutomotiveDataTransform(AbstractTransform):
    """Car data Transformation."""

    def transform(self, df: DataFrame) -> DataFrame:
        """Execute the transform process"""
        transformations = (
            self._filter_price,
            self._process_car_name,
            self._select_final_columns,
        )
        return reduce(DataFrame.transform, transformations, df)  # type: ignore

    @staticmethod
    def _filter_price(df: DataFrame) -> DataFrame:
        """Filter results price > 10000"""
        return df.filter(col('price') > 10000)

    @staticmethod
    def _process_car_name(df: DataFrame) -> DataFrame:
        """Clean [dirty data] from CarName"""
        res_df = df.withColumn('CarName', _name_replace_udf(col('CarName')).alias('CarName'))
        return res_df

    @staticmethod
    def _select_final_columns(df: DataFrame) -> DataFrame:
        """Car price data dataframe select final columns"""
        return df.select(
            col('car_ID').alias('car_id'),
            col('symboling'),
            col('CarName').alias('car_name'),
            col('price'),
        )


@udf(returnType=StringType())
def _name_replace_udf(car_name):
    """Clean [dirty data] udf"""
    if not car_name:
        return None
    err_str = '[dirty data]'
    if err_str not in car_name:
        return car_name
    car_name = car_name.replace(err_str, '')
    return car_name

AutomotiveDataTransform类,实现以下方法:

  • _filter_price的功能是筛选 price > 10000 的数据
  • _process_car_name的功能是使用udf方法_name_replace_udf,将CarName字段中包含脏数据[dirty_data]的内容进行处理
  • _select_final_columns方法是查询并返回car_id,symboling,car_name,price数据
  • transform的功能是执行处理流程,返回数据结果,至此完成汽车数据转换过程

配置

将如下配置更新到配置文件中,因为项目默认使用dev环境配置,则需要在configs/dev.toml中增加如下内容:

# spark configs
spark_master = 'local[*]'
spark_config.spark.driver.memory = '3G'
spark_config.spark.executor.memory = '16G'
spark_config.spark.sql.debug.maxToStringFields = 100