Run tasks or subflows in parallel, create loops and conditional branching.
Add parallelism using Flowable tasks
One of the most common orchestration requirements is to execute independent processes in parallel. For example, you can process data for each partition in parallel. This can significantly speed up the processing time.
The flow below uses the EachParallel
flowable task to execute a list of tasks
in parallel.
- The
value
property defines the list of items to iterate over. - The
tasks
property defines the list of tasks to execute for each item in the list. You can access the iteration value using the{{ taskrun.value }}
variable.
yaml
id: python_partitions
namespace: company.team
description: Process partitions in parallel
tasks:
- id: getPartitions
type: io.kestra.plugin.scripts.python.Script
taskRunner:
type: io.kestra.plugin.scripts.runner.docker.Docker
containerImage: ghcr.io/kestra-io/pydata:latest
script: |
from kestra import Kestra
partitions = [f"file_{nr}.parquet" for nr in range(1, 10)]
Kestra.outputs({'partitions': partitions})
- id: processPartitions
type: io.kestra.plugin.core.flow.EachParallel
value: '{{ outputs.getPartitions.vars.partitions }}'
tasks:
- id: partition
type: io.kestra.plugin.scripts.python.Script
taskRunner:
type: io.kestra.plugin.scripts.runner.docker.Docker
containerImage: ghcr.io/kestra-io/pydata:latest
script: |
import random
import time
from kestra import Kestra
filename = '{{ taskrun.value }}'
print(f"Reading and processing partition {filename}")
nr_rows = random.randint(1, 1000)
processing_time = random.randint(1, 20)
time.sleep(processing_time)
Kestra.counter('nr_rows', nr_rows, {'partition': filename})
Kestra.timer('processing_time', processing_time, {'partition': filename})
Was this page helpful?