Mengenal Airflow Fundamental
Oleh Yurio Windiatmoko (Dosen Sains Data Terapan PLAI BMD)

Bagaimana cara kerja Airflow? Apa saja konsep-konsep dasarnya?
Pada dasarnya, Warung Pintar menggunakan Apache Airflow sebagai Workflow Management System (WMS). Hal ini didasari oleh kebutuhan sistem manajemen alur kerja dengan menyediakan berbagai fitur dan fungsi yang lengkap serta dinamis. Selain itu, Airflow pun sangat fleksibel dan dapat digunakan pada banyak domain.
Seperti yang sudah disampaikan sebelumnya, Airflow merupakan Workflow Management System, di mana Airflow dapat difungsikan untuk,
- Mengelola penjadwalan dan menjalankan task untuk pipeline data (Kurang lebih bisa disebut sebagai The 21st Century of Mario Bros ‘Data Engineer’.)
The 21st Century of Mario Bros ‘Data Engineer‘.
- Memastikan task berurutan dengan benar berdasarkan dependensinya (Untuk catatan, task bisa berupa perintah apapun, tidak melulu merupakan sebuah pipeline data.)
- Mengelola alokasi sumber daya melalui penjadwalan resource dengan mematikan dan menghidupkan resource mesin melalui task perintah.
- Menyediakan mekanisme untuk melacak kondisi berbagai task dan memulihkannya dari kegagalan task.
Berbagai Implementasi Airflow.
Nah, secara garis besar, artikel ini bertujuan untuk memberi gambaran detail kepada kalian yang ingin memahami komponen Airflow dan bagaimana Airflow beroperasi.
Komponen Utama pada Airflow
Komponen Utama Airflow.
Terdapat beberapa komponen utama pada Airflow, yaitu,
- Webserver: Proses ini menjalankan aplikasi Flask sederhana dengan gunicorn yang membaca status semua task dari database metadata dan membuat status ini untuk UI Web.
- Web UI: Komponen ini memungkinkan pengguna di sisi klien untuk melihat dan mengedit status task dalam database metadata. Karena terpisahnya komponen antara Scheduler dan database, UI Web memungkinkan pengguna untuk memanipulasi aktivitas Scheduler.
- Scheduler: Scheduler, atau ‘Penjadwal’ adalah pemroses berupa daemon yang menggunakan definisi dari DAG. Bila dihubungkan dengan task dalam database metadata, scheduler berfungsi untuk menentukan task mana yang perlu dieksekusi lebih dulu serta prioritas pelaksanaannya. Pada umumnya, scheduler sendiri dijalankan sebagai sebuah service.
- Database Metadata: Sekumpulan data yang menyimpan informasi mengenai status dari task. Update dari database sendiri dilakukan dengan menggunakan lapisan abstraksi yang diimplementasikan pada SQLAlchemy.
- Executor: Executor adalah pemroses antrian pesan yang berhubungan dengan scheduler dalam menentukan proses worker agar benar-benar melaksanakan setiap task sesuai jadwal. Terdapat beberapa jenis executor, di mana masing-masing executor tersebut memiliki metode khusus untuk memfasilitasi worker bekerja, maupun dalam mengeksekusi task. Misal, LocalExecutor menjalankan tugas secara paralel pada mesin yang sama dengan proses scheduler. Ada pula executor lain seperti CeleryExecutor yang mengeksekusi task menggunakan proses yang ada pada sekelompok mesin worker yang terpisah.
- Worker: Ini adalah pemroses yang benar-benar melaksanakan logika task dan ditentukan pada executor apa yang digunakan.
Konsep Utama Airflow
Konsep Utama Airflow.
Workflow
Workflow adalah “DAGs”, atau merupakan gabungan dari berbagai DAG, yang terkoneksi dengan Sensor, Subdagm atau metode lainnya.
Workflow dalam Airflow adalah kumpulan task yang memiliki dependensi terarah. Secara khusus, Airflow menggunakan grafik asiklik terarah, atau Directed Acyclic Graph — disingkat DAG— untuk mewakili alur kerja. Setiap node dalam grafik adalah task, dan penghubung atau edges-nya menentukan dependensi di antara task. (Grafik ini diharuskan menjadi asiklik sehingga tidak ada dependensi melingkar, di mana hal ini dapat menyebabkan loop eksekusi tanpa akhir.
Beberapa properties DAG yang paling utama adalah sebagai berikut,
- dag_id: Pengidentifikasi unik di antara semua DAG,
- start_date: Titik waktu awal di mana task pada DAG akan dimulai,
- schedule_interval: Interval waktu sebuah DAG dieksekusi.
Selain dag_id, start_date, dan schedule_interval, setiap DAG dapat diinisialisasi dengan seperangkat default_arguments. Argumen default ini diwarisi ke semua task pada DAG tersebut.
Operator, Sensor, dan Task
Meskipun DAG digunakan untuk mengatur task dan mengatur konteks pelaksanaannya, DAGs tidak melakukan perhitungan yang sebenarnya. Sebagai gantinya, task adalah elemen dari Airflow yang berfungsi untuk melakukan pekerjaan yang ingin dilakukan.
Terdapat dua tipe task,
- Task dapat melakukan beberapa operasi eksplisit, di mana, dalam hal ini, mereka disebut sebagai operator, atau,
- Task dapat menghentikan sementara pelaksanaan task dependensi hingga beberapa kriteria telah terpenuhi, di mana, dalam hal ini mereka disebut sebagai sensor.
DagRuns dan TaskInstances
Setelah kita mendefinisikan DAG, membuat task, dan mendefinisikan dependensinya pada DAG , kita dapat mulai menjalankan task berdasarkan parameter DAG. Secara garis besar, konsep kunci dalam Airflow adalah execution_time. Ketika scheduler Airflow sedang berjalan, scheduler tersebut akan menentukan jadwal tanggal dengan interval yang teratur untuk menjalankan task terkait DAG. Waktu pelaksanaan dimulai pada start_date DAG dan berulang di setiap schedule_interval. Sebagai contoh, waktu eksekusi yang dijadwalkan adalah (2017–01–01 00:00:00, 2017–01–02 00:00:00, …) . Untuk setiap execution_time, DagRun dibuat dan beroperasi pada konteks waktu eksekusi itu. Dengan kata lain, DagRun hanyalah DAG dengan waktu eksekusi tertentu.
Semua tugas yang terkait dengan DagRun disebut sebagai TaskInstances. Singkat kata, TaskInstance adalah task yang telah dipakai dan memiliki konteks execution_date. DagRuns dan TaskInstances sendiri adalah konsep sentral dalam Airflow.
Setiap DagRun dan TaskInstance dikaitkan dengan entri dalam database metadata Airflow yang mencatat status mereka, misal, “queued”, “running”, “failed”, “skipped”, atau “up for retry”. Membaca dan memperbaharui status ini adalah kunci untuk penjadwalan dan proses eksekusi pada Airflow.
Fitur dan Manfaat Airflow
Manfaat Airflow.
Untuk kalian ingat, Airflow bukanlah..
Bukan Airflow.
Contoh Penggunaan Airflow pada Pipeline Data
Contoh Case Airfllow.
Bagaimana Cara Mengelola Beragam Jenis Pipeline Data?
Ilustrasi 1,000 Pipeline untuk Dieksekusi.
Airflow memiliki arsitektur modular dan menggunakan proses antrian pesan untuk mengatur jumlah worker berapapun jumlahnya. Airflow siap untuk ditingkatkan hingga tak terbatas karena Horizontal Scaling.
a. Arsitektur menggunakan Satu Node di Airflow.
Single Node Airflow.
b. Arsitektur menggunakan Multi-Node di Airflow.
Multi-Nodes Airflow.
Penggunaan Apache Airflow oleh Warung Pintar pada Google Cloud Composer
GCP Composer.
Nah, bagaimana Warung Pintar menggunakan Apache Airflow secara praktikal pada Google Cloud Composer?
- Notifikasi Airflow melalui Workplace merupakan salah satu implemen dari kustomisasi Airflow pada Warung Pintar, di mana detailnya dapat kalian pahami melalui link Open Source ini.
- Penerapan DAG Generator untuk pipeline Raw Table Data dari Database yang sering digunakan.
Alur Kerja Airflow
Pertama, scheduler membaca folder DAG mem-parsing dan melihat apakah script-nya sudah memenuhi kriteria.
Jika sudah memenuhi kriteria, scheduler akan membuat DagRun di DB dan me-register script-nya agar DagRun berstatus Running.
Selanjutnya, scheduler menjadwalkan TaskInstances untuk dijalankan hingga TaskInstance terjadwalkan “Scheduled”.
Lalu, scheduler mengirim TaskInstance ke Executor agar Executor mengirim TaskInstance ke sistem antrian agar terantrikan atau Queued.
Executor mengeluarkan TaskInstance, dilanjutkan dengan pembaharuan TaskInstance di MetaDB untuk dijalankan. Setelahnya, worker mengeksekusi TaskInstance.
Setelah task selesai, Executor akan memperbaharui TaskInstance ke Success. Walau demikian, DagRun masih berjalan ke task berikutnya dalam Dag tersebut.
Setelah semua task selesai dalam Dag, scheduler akan memperbaharui status menjadi Success
pada MetaDB DagRun. Jika terdapat task yang gagal, DagRun akan memperbaharuinya ke Failed
.
Akhirnya, Web Server membaca MetaDB ke Pembaharuan UI.
Ringkasan Alur Kerja Airflow
Masih belum begitu jelas? Berikut kesimpulannya,
- Scheduler membaca folder DAG,
- DAG diparsing untuk membuat DagRun berdasarkan parameter scheduler DAG kalian,
- TaskInstance digunakan untuk setiap task yang perlu dieksekusi dan ditandai ke “Scheduled” dalam database metadata,
- Scheduler mendapatkan semua TaskInstance yang ditandai “Scheduled” dari metadata database, mengubah status menjadi “Queued”, dan mengirimkannya ke Executor task yang akan dieksekusi.
- Executor mengeluarkan task dari antrian, tergantung pada pengaturan executor kalian, mengubah status dari “Queued” menjadi “Running”, dan worker pun mulai mengeksekusi TaskInstances.
- Ketika task selesai, executor mengubah status task tersebut ke status finalnya, seperti “Success”, “Failed”, dll. dalam database, di mana DagRun turut diperbarui oleh scheduler dengan status “Success” atau “Failed” dan Web Server secara berkala mengambil data dari metadaDB untuk memperbarui UI.