Source
yaml
id: auditlogs-to-bigquery
namespace: company.team
tasks:
  - id: consume
    type: io.kestra.plugin.kafka.Consume
    properties:
      auto.offset.reset: earliest
      bootstrap.servers: local:9092
    topic: kestra_auditlogs
    valueDeserializer: JSON
    maxRecords: 500
  - id: transform
    type: io.kestra.plugin.graalvm.js.FileTransform
    from: "{{ outputs.consume.uri }}"
    script: |
      var jacksonMapper = Java.type('io.kestra.core.serializers.JacksonMapper');
      delete row['headers'];
      var value = row['value']
      row['id'] = value['id']
      row['type'] = value['type']
      row['detail'] = value['detail']
      row['date'] = value['date']
      row['deleted'] = value['deleted']
      row['value'] = jacksonMapper.ofJson().writeValueAsString(value)
      row['detail_type'] = value['detail']['type']
      row['detail_cls'] = value['detail']['cls']
      row['detail_permission'] = value['detail']['permission']
      row['detail_id'] = value['detail']['id']
      row['detail_namespace'] = value['detail']['namespace']
      row['detail_flowId'] = value['detail']['flowId']
      row['detail_executionId'] = value['detail']['executionId']
  - id: avro
    type: io.kestra.plugin.serdes.avro.IonToAvro
    from: "{{ outputs.transform.uri }}"
    description: convert the file from Kestra internal storage to avro.
    schema: |
      {
        "type": "record",
        "name": "Root",
        "fields":
          [
            { "name": "id", "type": ["null", "string"] },
            { "name": "type", "type": ["null", "string"] },
            { "name": "detail", "type": ["null", "string"] },
            { "name": "date", "type": ["null", "string"] },
            { "name": "deleted", "type": ["null", "string"] },
            { "name": "value", "type": ["null", "string"] },
            { "name": "detail_type", "type": ["null", "string"] },
            { "name": "detail_cls", "type": ["null", "string"] },
            { "name": "detail_permission", "type": ["null", "string"] },
            { "name": "detail_id", "type": ["null", "string"] },
            { "name": "detail_namespace", "type": ["null", "string"] },
            { "name": "detail_flowId", "type": ["null", "string"] },
            { "name": "detail_executionId", "type": ["null", "string"] }
          ]
      }
  - id: load
    type: io.kestra.plugin.gcp.bigquery.Load
    avroOptions:
      useAvroLogicalTypes: true
    destinationTable: your_gcp_project.dwh.autditlogs
    format: AVRO
    from: "{{outputs.avro.uri }}"
    writeDisposition: WRITE_TRUNCATE
    serviceAccount: "{{ secret('GCP_CREDS') }}"
    projectId: your_gcp_project
triggers:
  - id: schedule
    type: io.kestra.plugin.core.trigger.Schedule
    cron: 0 10 * * *
About this blueprint
GCP
This flow shows how to stream Kestra audit logs to BigQuery. The audit logs
are stored in the kestra_auditlogs Kafka topic. The flow consumes the
audit logs from the Kafka topic, transforms the data, and loads it into
BigQuery.
The flow is triggered every day at 10 AM UTC. You can customize the trigger by changing the cron expression, timezone and more. For more information about cron expressions, visit the following documentation.
More Related Blueprints