Sometimes we need to create an Airflow dag and create same task for multiple different tables (i.e. table_a, table_b, table_c).
We can achieve this with a list comprehension with a list of each table we need to build a task for. I.e.
def _snowflake_poke_ccpa(**kwargs):
db = 'YOUR_SF_DB'
table = kwargs['table']
exec_date = kwargs['ds']
partition_exists = su.snowflake_poke(SNOWFLAKE_ACCOUNT_NAME,
db,
sql_schema,
SNOWFLAKE_WAREHOUSE_NAME,
SNOWFLAKE_ROLE,
SNOWFLAKE_ENVIRONMENT_NAME,
SNOWFLAKE_CREDS,
table,
exec_date
);
if partition_exists:
return True
else:
raise Exception('SF partition not found. Setting to FAILED.')
def _upsert(table, **kwargs):
ds = execution_date = kwargs['ds_nodash']
dt = kwargs['ds']
logging_statement = to_str(PIPELINE_LOGGER_SCRIPT).format(dag_id=dag.dag_id,
task_id='unspert_' + table,
execution_date=execution_date)
sql_commands = to_str(SQL_PIPELINE_DIR + "your_sql_directory/" + table + ".sql")
sql_commands = sql_commands.format(dt=dt, ps=POSTFIX_STAGE)
print("Sending the following SQL commands:\n", sql_commands)
conn = su.connect(SNOWFLAKE_ACCOUNT_NAME,
SNOWFLAKE_DB_NAME,
sql_schema,
SNOWFLAKE_WAREHOUSE_NAME,
SNOWFLAKE_ROLE,
SNOWFLAKE_ENVIRONMENT_NAME)
su.execute_dml_statements(conn, sql_commands)
su.execute_dml_statements(conn, logging_statement)
su.disconnect(conn)
for table in ['table_a', 'table_b', 'table_c']:
"""Check if records have landed in SF"""
sf_poke = PythonOperator(
task_id='sf_poke_' + table,
provide_context=True,
python_callable=_snowflake_poke,
op_kwargs={'table': table},
retries=24,
retry_delay=60 * 30, # 30 minutes
dag=dag)
upsert = PythonOperator(
task_id='upsert_from_' + table,
provide_context=True,
python_callable=_upsert,
op_kwargs={'table': table},
dag=dag)
start >> sf_poke_ccpa >> upsert >> done_upsert
Which will create multiple tasks for each table with proper dependencies using the for loop like so:

Recent Comments