About this blueprint
CLI Docker AWS Python DevOps Git Outputs Task Runner
This flow will clone a Git repository that defines Terraform resources to run script tasks on AWS ECS Fargate including the AWS Batch compute environment, job queue, and ECS task roles.
The only prerequisites are AWS credentials and an S3 Bucket in the same region in which you want to run AWS Batch jobs.
We assume that you have a default VPC in the region you are deploying to. If you do not have a default VPC, you can create it using the following command:
aws ec2 create-default-vpc --region us-east-1 # replace with your chosen AWS region
Once the flow completes, you can download the Terraform output to see the ARNs of the AWS Batch compute environment, job queue, and ECS task roles. You can store these as pluginDefaults
in your namespace to run all Python scripts on AWS ECS Fargate by default.
id: aws_batch_terraform_git
namespace: company.team
inputs:
- id: bucket
type: STRING
defaults: kestra-us
- id: region
type: STRING
defaults: us-east-1
tasks:
- id: wdir
type: io.kestra.plugin.core.flow.WorkingDirectory
tasks:
- id: git
type: io.kestra.plugin.git.Clone
url: https://github.com/kestra-io/terraform-deployments
branch: main
- id: tf
type: io.kestra.plugin.terraform.cli.TerraformCLI
inputFiles:
backend.tf: |
terraform {
backend "s3" {
region = "{{ inputs.region }}"
bucket = "{{ inputs.bucket }}"
key = "terraform.tfstate"
}
}
commands:
- mv aws-batch/* .
- terraform init
- terraform apply -auto-approve
- terraform output > output.txt
env:
TF_VAR_region: "{{ inputs.region }}"
TF_VAR_bucket: "{{ inputs.bucket }}"
AWS_ACCESS_KEY_ID: "{{ secret('AWS_ACCESS_KEY_ID') }}"
AWS_SECRET_ACCESS_KEY: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
AWS_DEFAULT_REGION: "{{ inputs.region }}"
outputFiles:
- "*.txt"
- id: parse_tf_output
type: io.kestra.plugin.scripts.python.Script
containerImage: ghcr.io/kestra-io/kestrapy:latest
inputFiles:
terraform.txt: "{{ outputs.tf.outputFiles['output.txt'] }}"
taskRunner:
type: io.kestra.plugin.scripts.runner.docker.Docker
script: |
from kestra import Kestra
outputs = {}
with open("terraform.txt", "r") as file:
for line in file:
key, value = line.strip().split(" = ")
outputs[key] = value.strip('"')
Kestra.outputs(outputs)
- id: parallel_ecs_fargate_tasks
type: io.kestra.plugin.core.flow.Parallel
tasks:
- id: run_command
type: io.kestra.plugin.scripts.python.Commands
containerImage: ghcr.io/kestra-io/kestrapy:latest
taskRunner:
type: io.kestra.plugin.ee.aws.runner.Batch
computeEnvironmentArn: "{{ outputs.parse_tf_output.vars.batch_compute_environment_arn }}"
jobQueueArn: "{{ outputs.parse_tf_output.vars.batch_job_queue_arn }}"
executionRoleArn: "{{ outputs.parse_tf_output.vars.ecs_task_execution_role_arn }}"
taskRoleArn: "{{ outputs.parse_tf_output.vars.ecs_task_role_arn }}"
accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
region: "{{ inputs.region }}"
bucket: "{{ inputs.bucket }}"
commands:
- pip show kestra
- id: run_python_script
type: io.kestra.plugin.scripts.python.Script
containerImage: ghcr.io/kestra-io/pydata:latest
taskRunner:
type: io.kestra.plugin.ee.aws.runner.Batch
computeEnvironmentArn: "{{ outputs.parse_tf_output.vars.batch_compute_environment_arn }}"
jobQueueArn: "{{ outputs.parse_tf_output.vars.batch_job_queue_arn }}"
executionRoleArn: "{{ outputs.parse_tf_output.vars.ecs_task_execution_role_arn }}"
taskRoleArn: "{{ outputs.parse_tf_output.vars.ecs_task_role_arn }}"
accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
region: "{{ inputs.region }}"
bucket: "{{ inputs.bucket }}"
script: >
import platform
import socket
import sys
print("Hello from AWS Batch and kestra!")
def print_environment_info():
print(f"Host's network name: {platform.node()}")
print(f"Python version: {platform.python_version()}")
print(f"Platform information (instance type): {platform.platform()}")
print(f"OS/Arch: {sys.platform}/{platform.machine()}")
try:
hostname = socket.gethostname()
ip_address = socket.gethostbyname(hostname)
print(f"Host IP Address: {ip_address}")
except socket.error as e:
print("Unable to obtain IP address.")
print_environment_info()