Airflow – Create Multiple Tasks With List Comprehension and Reuse A Single Operator

Sometimes we need to create an Airflow dag and create same task for multiple different tables (i.e. table_a, table_b, table_c).

We can achieve this with a list comprehension with a list of each table we need to build a task for. I.e.

def _snowflake_poke_ccpa(**kwargs):
    db = 'YOUR_SF_DB'
    table = kwargs['table']
    exec_date = kwargs['ds']

    partition_exists = su.snowflake_poke(SNOWFLAKE_ACCOUNT_NAME,
                                         db,
                                         sql_schema,
                                         SNOWFLAKE_WAREHOUSE_NAME,
                                         SNOWFLAKE_ROLE,
                                         SNOWFLAKE_ENVIRONMENT_NAME,
                                         SNOWFLAKE_CREDS,
                                         table,
                                         exec_date
                                         );

    if partition_exists:
        return True
    else:
        raise Exception('SF partition not found. Setting to FAILED.')


def _upsert(table, **kwargs):
    ds = execution_date = kwargs['ds_nodash']
    dt = kwargs['ds']
    logging_statement = to_str(PIPELINE_LOGGER_SCRIPT).format(dag_id=dag.dag_id,
                                                              task_id='unspert_' + table,
                                                              execution_date=execution_date)
    sql_commands = to_str(SQL_PIPELINE_DIR + "your_sql_directory/" + table + ".sql")
    sql_commands = sql_commands.format(dt=dt, ps=POSTFIX_STAGE)
    print("Sending the following SQL commands:\n", sql_commands)
    conn = su.connect(SNOWFLAKE_ACCOUNT_NAME,
                      SNOWFLAKE_DB_NAME,
                      sql_schema,
                      SNOWFLAKE_WAREHOUSE_NAME,
                      SNOWFLAKE_ROLE,
                      SNOWFLAKE_ENVIRONMENT_NAME)
    su.execute_dml_statements(conn, sql_commands)
    su.execute_dml_statements(conn, logging_statement)
    su.disconnect(conn)

for table in ['table_a', 'table_b', 'table_c']:
    """Check if records have landed in SF"""
    sf_poke = PythonOperator(
        task_id='sf_poke_' + table,
        provide_context=True,
        python_callable=_snowflake_poke,
        op_kwargs={'table': table},
        retries=24,
        retry_delay=60 * 30,  # 30 minutes
        dag=dag)

    upsert = PythonOperator(
        task_id='upsert_from_' + table,
        provide_context=True,
        python_callable=_upsert,
        op_kwargs={'table': table},
        dag=dag)
    start >> sf_poke_ccpa >> upsert >> done_upsert

Which will create multiple tasks for each table with proper dependencies using the for loop like so:

Leave a Reply