About this blueprint
Trigger Ingest BigQuery Outputs
This flow shows how to stream Kestra audit logs to BigQuery. The audit logs are stored in the kestra_auditlogs
Kafka topic. The flow consumes the audit logs from the Kafka topic, transforms the data, and loads it into BigQuery.
The flow is triggered every day at 10 AM UTC. You can customize the trigger by changing the cron expression, timezone and more. For more information about cron expressions, visit the following documentation.
yaml
id: auditlogs_to_bigquery
namespace: company.team
tasks:
- id: consume
type: io.kestra.plugin.kafka.Consume
properties:
auto.offset.reset: earliest
bootstrap.servers: local:9092
topic: kestra_auditlogs
valueDeserializer: JSON
maxRecords: 500
- id: transform
type: io.kestra.plugin.scripts.nashorn.FileTransform
from: "{{ outputs.consume.uri }}"
script: |
var jacksonMapper = Java.type('io.kestra.core.serializers.JacksonMapper');
delete row['headers'];
var value = row['value']
row['id'] = value['id']
row['type'] = value['type']
row['detail'] = value['detail']
row['date'] = value['date']
row['deleted'] = value['deleted']
row['value'] = jacksonMapper.ofJson().writeValueAsString(value)
row['detail_type'] = value['detail']['type']
row['detail_cls'] = value['detail']['cls']
row['detail_permission'] = value['detail']['permission']
row['detail_id'] = value['detail']['id']
row['detail_namespace'] = value['detail']['namespace']
row['detail_flowId'] = value['detail']['flowId']
row['detail_executionId'] = value['detail']['executionId']
- id: avro
type: io.kestra.plugin.serdes.avro.IonToAvro
from: "{{ outputs.transform.uri }}"
description: convert the file from Kestra internal storage to avro.
schema: |
{
"type": "record",
"name": "Root",
"fields":
[
{ "name": "id", "type": ["null", "string"] },
{ "name": "type", "type": ["null", "string"] },
{ "name": "detail", "type": ["null", "string"] },
{ "name": "date", "type": ["null", "string"] },
{ "name": "deleted", "type": ["null", "string"] },
{ "name": "value", "type": ["null", "string"] },
{ "name": "detail_type", "type": ["null", "string"] },
{ "name": "detail_cls", "type": ["null", "string"] },
{ "name": "detail_permission", "type": ["null", "string"] },
{ "name": "detail_id", "type": ["null", "string"] },
{ "name": "detail_namespace", "type": ["null", "string"] },
{ "name": "detail_flowId", "type": ["null", "string"] },
{ "name": "detail_executionId", "type": ["null", "string"] }
]
}
- id: load
type: io.kestra.plugin.gcp.bigquery.Load
avroOptions:
useAvroLogicalTypes: true
destinationTable: your_gcp_project.dwh.autditlogs
format: AVRO
from: "{{outputs.avro.uri }}"
writeDisposition: WRITE_TRUNCATE
serviceAccount: "{{ secret('GCP_CREDS') }}"
projectId: your_gcp_project
triggers:
- id: schedule
type: io.kestra.plugin.core.trigger.Schedule
cron: 0 10 * * *