Kafka Producer Module
The kafka module produces records to Apache Kafka topics
asynchronously. Inbound requests pause while the record is in flight
and resume once the broker has acknowledged the write (or rejected
it), so the outcome is reflected in the module’s return code.
Setting names here correspond to librdkafka configuration properties;
for example client_id maps to client.id. A full reference for
each property is at:
- server
-
Initial list of brokers. One is enough - the rest of the cluster is discovered automatically. Specify additional servers one per line if you want bootstrap redundancy.
- client_id
-
Identifier the brokers see for this client. Shows up in broker logs and metrics; useful when multiple FreeRADIUS instances share a cluster.
- idempotence
-
Guarantee exactly-once, in-order delivery per partition. Requires the broker to acknowledge every message; slightly slower, but safe against retries creating duplicates.
- compression_type
-
One of
none,gzip,snappy,lz4,zstd. - queue_max_messages
-
Upper bound on unsent records buffered on this FreeRADIUS instance. Once the buffer is full, further produce attempts fail immediately rather than blocking.
- queue_max_delay
-
How long to wait to batch records before sending. Higher values increase throughput; lower values reduce end-to-end latency.
Declared topics. The module only writes to topics listed here, so typos in a virtual server are caught at startup instead of silently creating unintended topics on the broker.
Per-topic settings (acknowledgement policy, compression, partitioning, etc.) are nested inside each topic’s block.
- value
-
Payload for each record produced to this topic. Required. Evaluated per-call, so you can interpolate request attributes here.
- key
-
Optional partition key. When omitted, librdkafka picks a partition according to the configured partitioner.
- flush_timeout
-
How long to wait when shutting down for in-flight records to drain. Any records still queued after this expires are lost.
- properties
-
Escape hatch for librdkafka settings the module doesn’t expose as named knobs above. Values here are passed to librdkafka verbatim, so use its native units - for example
"500"rather than"500ms"for time values, and raw byte counts rather than"1M"for sizes.
Each declared topic also accepts its own properties { } block for
per-topic librdkafka settings that aren’t named knobs:
topic {
radius-accounting {
request_required_acks = -1
properties {
"compression.codec" = "lz4"
}
}
}
Usage
The module exposes one method, kafka.produce, and one xlat
expansion, %kafka.produce(…). Both send a record and pause the
request until the broker acknowledges or the broker rejects it; the
return code reflects the outcome:
-
okon successful delivery. -
failon transient errors (network or broker timeout). -
rejecton permanent errors (record too large, unknown topic).
Module method
Call the method as kafka.produce.<topic>; the topic name must match
one declared in the topic { } block above, and typos fail at
startup. kafka.send.<topic> and kafka.recv.<topic> are aliases
that do the same thing - pick whichever reads most naturally inside
the surrounding section (e.g. kafka.recv.auth inside recv
Access-Request { … }).
As a shortcut, you can write just kafka inside a recv or send
section and the topic is inferred from the section’s packet type. For
example:
recv Accounting-Request {
kafka # publishes to topic "Accounting-Request"
}
This is tidy when you name topics after packet types; if you don’t,
stick with the explicit kafka.produce.<topic> form. Either way, the
topic must be declared in topic { } or the virtual server will fail
to compile.
The record’s payload (and optional key) are declared per-topic, so different topics can publish different shapes from one module instance:
kafka {
server = "localhost:9092"
topic {
radius-accounting {
request_required_acks = -1
value = %json.encode(&request.[*])
key = &User-Name
}
radius-auth {
value = %{User-Name}
}
}
}
In a virtual server:
recv Accounting-Request {
kafka.produce.radius-accounting
}
recv Access-Request {
kafka.produce.radius-auth
}
value is required per topic. key is optional; when omitted,
librdkafka picks a partition according to the configured partitioner.
Xlat expansion
Use the xlat when the topic or payload is chosen per-request. The
first argument is the topic name, and the rest are the payload (or key
and payload). It returns true on successful delivery and false on
failure:
send Accounting-Response {
if (!%kafka.produce('radius-accounting', %json.encode(&request.[*]))) {
reject
}
}
As with the method, the topic argument must name a declared topic.
Default Configuration
kafka {
server = "localhost:9092"
# server = "broker2.example.com:9092"
# client_id = "freeradius"
# idempotence = yes
# compression_type = "lz4"
# queue_max_messages = 100000
# queue_max_delay = 5ms
topic {
radius-accounting {
request_required_acks = -1
# message_timeout = 10s
value = %json.encode(&request.[*])
# key = &User-Name
}
}
flush_timeout = 5s
# properties {
# "ssl.engine.id" = "pkcs11"
# "socket.keepalive.enable" = "true"
# }
}