![]() Globals() = create_dag(dag_id, dag_conf_dict) # you can load the files from git or S3, I use local storage for testingĭag_conf_dict = loads(dag_conf_file.read())ĭag_id = "test_dag" # read it from the file # create a loop if you have multiple file "dependencies": script_confįor dependency in task_conf: For the method create_dag, I will try to simplify the code (according to what I understood from your json file):įrom import BashOperatorĭag = DAG(dag_id=dag_id, start_date=datetime(2022, 1, 1), schedule_interval=None) # configure your dagįor script_name, script_conf in ems():īash_command=f"python ",.Try to imporve your json schema, for ex, scripts can be an array.Airflow runs the dag file processor each X seconds ( conf), so no need to use an API, instead, you can upload your files to S3/GCS or a git repository, and load them in the main script before calling the create_dag method.So without passing in the details of your java file, if you have already a script which creates the dags in memory, try to apply those steps, and you will find the created dags in the metadata and the UI. # this step is very important to persist the created dag and add it to the dag bag in the main, load your file/(any external data source) and loop over dags configs, and for each dag:.create a python class or method which return a DAG object (assume it is a method and it is defined as create_dag(config_dict)).create a python script in your dags folder (assume its name is dags_factory.py).Is there another approach I missed using REST API?Īirflow dags are python objects, so you can create a dags factory and use any external data source (json/yaml file, a database, NFS volume. I did some research and per my understanding Airflow DAGs can only be created by using decorators on top of Python files. What we want to do is to be able to recreate that DAG visually within Airflow DAG programmatically and then execute it, rerun failures etc. What we have done is created a scheduled Python script that reads all the JSON files and for each model creates in memory DAG that executes each model and its SQL scripts as per the defined dependencies in the JSON config files. Our models are updated by many individuals so we need to update our DAG daily. The scripts are run through a Python job.py file that takes a script file name as parameter. We also keep a JSON file for each model which defines the dependencies between each SQL file.A collection of SQL files that need to be run for the model.We have a collection of models, each model consists of: Is it possible to create a Airflow DAG programmatically, by using just REST API? Background
0 Comments
Leave a Reply. |
AuthorWrite something about yourself. No need to be fancy, just an overview. ArchivesCategories |