Blueprints

Stream kestra audit logs from a Kafka topic to BigQuery for analytics and troubleshooting

About this blueprint

Trigger Ingest BigQuery Outputs

This flow shows how to stream Kestra audit logs to BigQuery. The audit logs are stored in the kestra_auditlogs Kafka topic. The flow consumes the audit logs from the Kafka topic, transforms the data, and loads it into BigQuery.

The flow is triggered every day at 10 AM UTC. You can customize the trigger by changing the cron expression, timezone and more. For more information about cron expressions, visit the following documentation.

yaml
id: auditlogs_to_bigquery
namespace: company.team
tasks:
  - id: consume
    type: io.kestra.plugin.kafka.Consume
    properties:
      auto.offset.reset: earliest
      bootstrap.servers: local:9092
    topic: kestra_auditlogs
    valueDeserializer: JSON
    maxRecords: 500
  - id: transform
    type: io.kestra.plugin.scripts.nashorn.FileTransform
    from: "{{ outputs.consume.uri }}"
    script: |
      var jacksonMapper = Java.type('io.kestra.core.serializers.JacksonMapper');
      delete row['headers'];

      var value = row['value']

      row['id'] = value['id']
      row['type'] = value['type']
      row['detail'] = value['detail']
      row['date'] = value['date']
      row['deleted'] = value['deleted']
      row['value'] = jacksonMapper.ofJson().writeValueAsString(value)
      row['detail_type'] = value['detail']['type']
      row['detail_cls'] = value['detail']['cls']
      row['detail_permission'] = value['detail']['permission']
      row['detail_id'] = value['detail']['id']
      row['detail_namespace'] = value['detail']['namespace']
      row['detail_flowId'] = value['detail']['flowId']
      row['detail_executionId'] = value['detail']['executionId']
  - id: avro
    type: io.kestra.plugin.serdes.avro.IonToAvro
    from: "{{ outputs.transform.uri }}"
    description: convert the file from Kestra internal storage to avro.
    schema: |
      {
        "type": "record",
        "name": "Root",
        "fields":
          [
            { "name": "id", "type": ["null", "string"] },
            { "name": "type", "type": ["null", "string"] },
            { "name": "detail", "type": ["null", "string"] },
            { "name": "date", "type": ["null", "string"] },
            { "name": "deleted", "type": ["null", "string"] },
            { "name": "value", "type": ["null", "string"] },
            { "name": "detail_type", "type": ["null", "string"] },
            { "name": "detail_cls", "type": ["null", "string"] },
            { "name": "detail_permission", "type": ["null", "string"] },
            { "name": "detail_id", "type": ["null", "string"] },
            { "name": "detail_namespace", "type": ["null", "string"] },
            { "name": "detail_flowId", "type": ["null", "string"] },
            { "name": "detail_executionId", "type": ["null", "string"] }
          ]
      }
  - id: load
    type: io.kestra.plugin.gcp.bigquery.Load
    avroOptions:
      useAvroLogicalTypes: true
    destinationTable: your_gcp_project.dwh.autditlogs
    format: AVRO
    from: "{{outputs.avro.uri }}"
    writeDisposition: WRITE_TRUNCATE
    serviceAccount: "{{ secret('GCP_CREDS') }}"
    projectId: your_gcp_project
triggers:
  - id: schedule
    type: io.kestra.plugin.core.trigger.Schedule
    cron: 0 10 * * *

Consume

File Transform

Ion To Avro

Load

Schedule

New to Kestra?

Use blueprints to kickstart your first workflows.

Get started with Kestra