Blueprints
Anomaly detection using DuckDB SQL query and S3 file event trigger, sending a CSV file attachment via email if anomalies are detected
About this blueprint
S3 Trigger Notifications DuckDB
This flow will be triggered any time a new file arrives in a given S3 bucket
and source_prefix
folder.
The flow will check for anomalies in the data from that file using a DuckDB query, and will move the file to the same S3 bucket below the destination_prefix
folder.
If anomalies are detected, the flow will send an email to the recipients specified on the to
property, and will send anomalous rows as a CSV file attachment in the same email.
If you use MotherDuck, use Kestra Secret to store the MotherDuck service token. Then, modify the query
task as follows to point the task to your MotherDuck database:
yaml
- id: query
type: io.kestra.plugin.jdbc.duckdb.Query
description: Validate new file for anomalies
sql: |
SELECT *
FROM read_csv_auto('s3://{{ vars.bucket }}/{{ vars.destination_prefix }}/{{ trigger.objects | jq('.[].key') | first }}')
WHERE price * quantity != total;
store: true
url: "jdbc:duckdb:md:my_db?motherduck_token={{ secret('MOTHERDUCK_TOKEN') }}"
yaml
id: s3_trigger_duckdb
namespace: company.team
variables:
bucket: kestraio
source_prefix: monthly_orders
destination_prefix: stage_orders
tasks:
- id: query
type: io.kestra.plugin.jdbc.duckdb.Query
description: Validate new file for anomalies
sql: >
INSTALL httpfs;
LOAD httpfs;
SET s3_region='{{ secret('AWS_DEFAULT_REGION') }}';
SET s3_access_key_id='{{ secret('AWS_ACCESS_KEY_ID') }}';
SET s3_secret_access_key='{{ secret('AWS_SECRET_ACCESS_KEY') }}';
SELECT *
FROM read_csv_auto('s3://{{ vars.bucket }}/{{ vars.destination_prefix
}}/{{ trigger.objects | jq('.[].key') | first }}')
WHERE price * quantity != total;
store: true
- id: csv
type: io.kestra.plugin.serdes.csv.IonToCsv
description: Create CSV file from query results
from: "{{ outputs.query.uri }}"
- id: if_anomalies_detected
type: io.kestra.plugin.core.flow.If
condition: "{{ outputs.query.size }}"
description: Send an email if outliers detected
then:
- id: send_email_alert
type: io.kestra.plugin.notifications.mail.MailSend
subject: Anomalies in data detected
from: anna@kestra.io
to: anna@kestra.io
username: anna@kestra.io
host: mail.privateemail.com
port: 465
password: "{{ secret('EMAIL_PASSWORD') }}"
sessionTimeout: 6000
attachments:
- name: anomalies_in_orders.csv
uri: "{{ outputs.csv.uri }}"
htmlTextContent: >
Detected anomalies in sales data in file <b>s3://{{ vars.bucket }}/{{
vars.destination_prefix }}/{{ trigger.objects | jq('.[].key') | first
}}'</b>. <br />
Anomalous rows are attached in a CSV file.<br /><br />
Best regards,<br />
Data Team
triggers:
- id: poll_for_new_s3_files
type: io.kestra.plugin.aws.s3.Trigger
bucket: "{{ vars.bucket }}"
prefix: "{{ vars.source_prefix }}"
maxKeys: 1
interval: PT1S
filter: FILES
action: MOVE
moveTo:
key: "{{ vars.destination_prefix }}/{{ vars.source_prefix }}"
region: "{{ secret('AWS_DEFAULT_REGION') }}"
accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"