Skip to content

load_kafka

Loads a byte stream from a Apache Kafka topic.

load_kafka topic:string, [count=int, exit=bool, offset=int|string,
options=record, aws_iam=record]

The load_kafka operator reads bytes from a Kafka topic.

The implementation uses the official librdkafka from Confluent and supports all configuration options. You can specify them via options parameter as {key: value, ...}.

The operator injects the following default librdkafka configuration values in case no configuration file is present, or when the configuration does not include them:

  • bootstrap.servers: localhost
  • client.id: tenzir
  • group.id: tenzir

The Kafka topic to use.

Exit successfully after having consumed count messages.

Exit successfully after having received the last message.

Without this option, the operator waits for new messages after having consumed the last one.

The offset to start consuming from. Possible values are:

  • "beginning": first offset
  • "end": last offset
  • "stored": stored offset
  • <value>: absolute offset
  • -<value>: relative offset from end

A record of key-value configuration options for librdkafka, e.g., {"auto.offset.reset" : "earliest", "enable.partition.eof": true}.

The load_kafka operator passes the key-value pairs directly to librdkafka. Consult the list of available configuration options to configure Kafka according to your needs.

We recommand factoring these options into the plugin-specific kafka.yaml so that they are indpendent of the load_kafka arguments.

If specified, enables using AWS IAM Authentication for MSK. The keys must be non-empty when specified.

Available keys:

  • region: Region of the MSK Clusters. Must be specified when using IAM.
  • assume_role: Optional role ARN to assume.
  • session_name: Optional session name to use when assume a role.
  • external_id: Optional external id to use when assuming a role.

The operator will try to get credentials in the following order:

  1. Checks your environment variables for AWS Credentials.
  2. Checks your $HOME/.aws/credentials file for a profile and credentials
  3. Contacts and logs in to a trusted identity provider. The login information to these providers can either be on the environment variables: AWS_ROLE_ARN, AWS_WEB_IDENTITY_TOKEN_FILE, AWS_ROLE_SESSION_NAME or on a profile in your $HOME/.aws/credentials.
  4. Checks for an external method set as part of a profile on $HOME/.aws/config to generate or look up credentials that isn’t directly supported by AWS.
  5. Contacts the ECS Task Role to request credentials if Environment variable AWS_CONTAINER_CREDENTIALS_RELATIVE_URI has been set.
  6. Contacts the EC2 Instance Metadata service to request credentials if AWS_EC2_METADATA_DISABLED is NOT set to ON.

Read 100 JSON messages from the topic tenzir

Section titled “Read 100 JSON messages from the topic tenzir”
load_kafka "tenzir", count=100
read_json

Read Zeek Streaming JSON logs starting at the beginning

Section titled “Read Zeek Streaming JSON logs starting at the beginning”
load_kafka "zeek", offset="beginning"
read_zeek_json

save_kafka