使用 Airflow 编排查询
本教程将介绍通过 Apache Airflow DAG 的开发以使用 Apache Drill 实现基本的 ETL 流程。在编写和测试我们的新 DAG 之前,我们将使用 pip 将 Airflow 安装到 Python virtualenv 中。参考 Airflow installation documentation 获得有关安装 Airflow 的更多信息。
在本教程中,我将在 Debian Linux 机器上使用 shell 运行指令,在其他平台上进行简单转换后也可实现相同的功能。
准备开始
- 安装高于 3.6 版本的 Python,包括 pip 和可选的 virtualenv。
- 在一个有查询权限和新增存储权限的主机上安装 Drill,我将以嵌入式模式运行 Drill 1.19。
(可选) 配置 virtualenv
创建并激活一个新的名为 “airflow” 的 virtualenv。如有必要,给你的环境调整 Python 解释器路径和 virtualenv 目标路径。
VIRT_ENV_HOME=~/.local/lib/virtualenv
virtualenv -p /usr/bin/python3 $VIRT_ENV_HOME/airflow
. $VIRT_ENV_HOME/airflow/activate
安装 Airflow
如果你阅读了安装指南,就会看到 Airflow 项目提供了约束文件,将 Python 的依赖对应到稳定版本。在大多数情况下,不需约束文件即可正常工作,但为了可重复,我们将使用脚本来适配不同 Python 版本的约束文件。
AIRFLOW_VERSION=2.1.2
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-0airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
pip install apache-airflow-providers-apache-drill
初始化 Airflow
因为只是教程示例,因此我们为 Airflow 设置一个本地 SQLite 数据库,并为我们自己添加一个管理员用户。
# 可选: 将 Airflow 的数据目录从默认值更改为 ~/airflow
export0 AIRFLOW_HOME=~/Development/airflow
mkdir -p ~/Development/airflow/
# 为 Airflow 创建一个新的 SQLite 数据库
airflow db init
# 添加管理员用户
airflow users create \
--username admin \
--firstname FIRST_NAME \
--lastname LAST_NAME \
--role Admin \
--email admin@example.org \
--password admin
配置 Drill 连接
目前,我们已有一个可以正常运行的 Airflow。通过指令 airflow webserver
启动 Web UI 并在浏览器输入 http://localhost:8080。点击 Admin -> Connections。添加名为 drill_tutorial
的 Drill 的连接,根据 Drill 的环境调整设置。如果你使用嵌入式模式启动 Drill,那么你需要以下配置。
Setting | Value |
---|---|
Conn Id | drill_tutorial |
Conn Type | Drill |
Host | localhost |
Port | 8047 |
Extra | {“dialect_driver”: “drill+sadrill”, “storage_plugin”: “dfs”} |
请注意,必须在 Extra
字段中指定 sqlalchemy-drill 类型和驱动程序信息。参考 the sqlalchemy-drill documentation 获得有关其配置的更多信息.
保存新连接后,可以使用 ctrl+c 关闭 Airflow Web UI。
探索源数据
只有了解源数据后,才可以建立有效的 ETL 流程。让我们从源数据中获取前一百万行的样本做初步了解。
curl -s https://data.cdc.gov/api/views/vbim-akqf/rows.csv\?accessType\=DOWNLOAD | pv -lSs 1000000 > /tmp/cdc_covid_cases.csvh
你可以替换 pv -lSs 1000000
为 head -n1000000
或者直接浏览文件。使用网络浏览器下载也可以。请注意,对于 Drill,使用扩展名 .csvh
保存文件对接下来的步骤很重要,因为查询CSV文件时会自动设置 extractHeader = true
,并确保此文件包含该设置。
下面对 Drill 进行操作,区别于将所有的操作列出来,我只列出运行的查询以及相对应的结果:
select * from dfs.tmp.`cdc_covid_case.csvh`
-- 1. 在日期字段中,空字符串 '' 可以转换为 SQL NULL
-- 2. 年龄组可以分为两个数字字段,最后一个组是无界的。
select age_group, count() from dfs.tmp.`cdc_covid_case.csvh` group by age_group;
select sex, count() from dfs.tmp.`cdc_covid_case.csvh` group by sex;
select race_ethnicity_combined, count() from dfs.tmp.`cdc_covid_case.csvh` group by race_ethnicity_combined;
-- 3. 字符串 'Missing' 可以转换为 SQL NULL
-- 4. 'NA' 也将转换为 NULL
-- 5. Race_ethnicity_combined 可以分为两个字段,但在本教程中我们将保持原样。
select hosp_yn, count() from dfs.tmp.`cdc_covid_case.csvh` group by hosp_yn;
-- 6. 除了 'Missing 之外,指标变量还有三个可能的值,所以它们不能转换为可为空的布尔值。
下面介绍如何创建 ETL 流程。
开发 CTAS (Create Table As Select) ETL 流程
drop table if exists dfs.tmp.cdc_covid_cases;
create table dfs.tmp.cdc_covid_cases as
with missing2null as (
select
nullif(cdc_case_earliest_dt, '') cdc_case_earliest_dt,
nullif(cdc_report_dt, '') cdc_report_dt,
nullif(pos_spec_dt, '') pos_spec_dt,
nullif(onset_dt, '') onset_dt,
case when current_status not in ('Missing', 'NA') then current_status end current_status,
case when sex not in ('Missing', 'NA') then sex end sex,
case when age_group not in ('Missing', 'NA') then age_group end age_group,
case when race_ethnicity_combined not in ('Missing', 'NA') then race_ethnicity_combined end race_ethnicity_combined,
case when hosp_yn not in ('Missing', 'NA') then hosp_yn end hosp_yn,
case when icu_yn not in ('Missing', 'NA') then icu_yn end icu_yn,
case when death_yn not in ('Missing', 'NA') then death_yn end death_yn,
case when medcond_yn not in ('Missing', 'NA') then medcond_yn end medcond_yn
from
dfs.tmp.`cdc_covid_cases.csvh`),
age_parse as (
select
*,
regexp_replace(age_group, '([0-9]+)[ \-\+]+([0-9]*) Years', '$1') age_min_incl,
regexp_replace(age_group, '([0-9]+)[ \-\+]+([0-9]*) Years', '$2') age_max_excl
from
missing2null)
select
cast(cdc_case_earliest_dt as date) cdc_case_earliest_dt,
cast(cdc_report_dt as date) cdc_report_dt,
cast(pos_spec_dt as date) pos_spec_dt,
cast(onset_dt as date) onset_dt,
current_status,
sex,
age_group,
cast(age_min_incl as float) age_min_incl,
1 + cast(case when age_max_excl = '' then 'Infinity' else age_max_excl end as float) age_max_excl,
race_ethnicity_combined,
hosp_yn,
icu_yn,
death_yn,
medcond_yn
from
age_parse;
这是一个重要的 SQL 语句,涵盖了相当多的转换工作,并最终输出为 Parquet 列存格式,高效并清晰的表示数据集,非常适合分析或机器学习。然而我们可以再进一步:
- 我们没有在 ETL 的复选框和向导中隐藏配置。
- 我们不必在开始探索和测试转换时给 SQL 添加另一种语言。
- 我们不必担心性能或如何并行处理我们的数据流,因为这方面优化应该交给 Drill 处理。
此外,虽然 SQL 不是最简洁的语言,但我们的 ETL 流程清晰且可维护。我相信几个月后,一个精通 SQL 的同事,在没有文档的情况下,依然可以读懂并修改代码。这些优点可以方便对代码进行扩展。
接下来,请将上面的 CTAS 脚本保存到新文件中 $AIRFLOW_HOME/dags/cdc_covid_cases.drill.sql
。双文件扩展名只是处理 SQL 脚本类型和语言的一种方式,可以根据自己的习惯自行选择。
开发 Airflow DAG
我们的 DAG 定义保存在单个 Python 脚本中。该脚本的完整列表将在后面列出。请将此脚本保存到一个新文件中 $AIRFLOW_HOME/dags/drill_tutorial.py
。
'''
使用 Apache Drill 来转换、加载和报告 COVID 案例。
数据来源引用:
疾病控制和预防中心,COVID-19 报告。COVID-19 病历监控公共数据访问,摘要和限制。
https://data.cdc.gov/Case-Surveillance/COVID-19-Case-Surveillance-Public-Use-Data/vbim-akqf
'''
from datetime import timedelta
from airflow import DAG
# 使用 PythonOperator 从 CDC 网站缓存 COVID-19 CSV 文件
from airflow.operators.python import PythonOperator
# 使用 DrillOperators 启动对 COVID-19 数据的查询
from airflow.providers.apache.drill.operators.drill import DrillOperator
from airflow.utils.dates import days_ago
# 假设请求是存在的,因为 sqlalchemy-drill 需要这些请求
import requests
# 传递这些参数
# 每个新任务可以传递新的参数
default_args = {
'owner': 'Joe Public',
'depends_on_past': False,
'email': ['joe@public.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
def stage_from_www(src_url, tgt_path):
'''
使用 Request 库将病例监控数据从 CDC 下载到本地。如果是分布式环境,用 HDFS、S3 等替换本地文件系统。
另一种选择是使用 Drill 提供的 HTTP 存储插件来直接从数据源获取数据。
请注意:避免在内存中缓存大量数据集(启用流式响应)
'''
resp = requests.get(
src_url,
stream=True # don't buffer big datasets in memory
)
with open(tgt_path) as f:
f.write(resp.content)
with DAG(
'drill_tutorial',
default_args=default_args,
description='Drill tutorial that loads COVID-19 case data from the CDC.',
schedule_interval=timedelta(weeks=2), # 数据源每两周更新一次
start_date=days_ago(0),
) as dag:
# 将此模块的字符串用于 DAG 的文档 (在 Web UI 中可视化)。
dag.doc_md = __doc__
# 第一个任务是使用 PythonOperator 从 CDC 网站获取 CSV 数据。
stage_from_www_task = PythonOperator(
task_id='stage_from_www',
python_callable=stage_from_www,
op_kwargs= {
'src_url': 'https://data.cdc.gov/api/views/vbim-akqf/rows.csv?accessType=DOWNLOAD',
'tgt_path': '/tmp/cdc_covid_cases.csvh'
}
)
stage_from_www.doc = 'Download COVID case CSV data from the CDC using ' \
'an HTTP GET'
# 第二个任务是通过 DrillOperator 的外部脚本执行 CTAS 的 ETL 流程。
# 也可以指定 SQL,并跨任务拆分此多语句的 SQL 脚本。也就是将初始 DROP TABLE 设为单独的任务。
ctas_etl_task = DrillOperator(
drill_conn_id='drill_tutorial',
task_id='ctas_etl',
sql='cdc_covid_cases.drill.sql'
)
ctas_etl_task.doc = 'Recreate dfs.tmp.cdc_covid_cases using CTAS'
# 第三个任务是通过 DrillOperator 生成每日案例计数报告。
# 将报告由 dfs.tmp 格式转换为可读的 CSV 格式, 使用 Airflow 可以用多种方式实现。
daily_count_report_task = DrillOperator(
drill_conn_id='drill_tutorial',
task_id='drill_report',
sql='''
set `store.format` = 'csv';
drop table if exists dfs.tmp.cdc_daily_counts;
create table dfs.tmp.cdc_daily_counts as
select
cdc_case_earliest_dt,
count(*) as case_count
from
dfs.tmp.cdc_covid_cases
group by
cdc_case_earliest_dt
order by
cdc_case_earliest_dt;
'''
)
daily_count_report_task.doc = 'Report daily case counts to CSV'
# 指定 DAG 的依赖关系。
stage_from_www_task >> ctas_etl_task >> daily_count_report_task
age_parse;
手动启动 Airflow DAG
可以通过解释器运行 DAG 脚本来测试 Python 语法。
python3 $AIRFLOW_HOME/dags/drill-tutorial.py
如果一切正常,Python 将无错误退出,接着确认 Drillbit 运行正常, 然后启动 airflow 测试 DAG。
airflow dags test drill_tutorial $(date +%Y-%m-%d)
将 COVID 案例数据集下载到本地一段时间后,可以看到在 Drill 上执行的所有查询都通过 sqlalchemy-drill 记录到控制台。DAG 执行后会产生两个文件。
- 一个 Parquet 格式的数据集位于
$TMPDIR/cdc_covid_cases
。 - 一个 CSV 格式的每日监控的病例数报告位于
$TMPDIR/cdc_daily_counts
。
在 Drill 中使用 OLAP 查看第一个文件,然后在电子表格或文本编辑器中查看第二个文件。
目前为止,你已使用 Apache Airflow 和 Apache Drill 构建了 ETL 流程!
下一步
- 了解 Airflow scheduling 后台运行 scheduler 以使任务自动运行。
- 调整 DAG 使用其他数据源。如果你在自己的环境中拥有数据库、文件和 Web 服务,那么这些数据源是很好的选择。你也可以在线查看更多公共数据集和 API。
- 尝试通过将 CTAS 指向带有日期的子目录来给现有的数据集添加新分区。
- 留意现有流程中的数据处理步骤,包括那些不是严格意义上 ETL 的步骤,应该交由 Drill 来处理。
感谢完成本教程,希望你喜欢上 Drill!