API Reference

class kafka_async.Producer(config: Mapping[str, Any])
async abort_transaction(timeout: float | None = None) None
begin_transaction() None
async commit_transaction(timeout: float | None = None) None
async flush(timeout: float | None = None) None
async init_transactions(timeout: float | None = None) None
async list_topics(topic: str | None = None, timeout: float | None = None) ClusterMetadata
async poll(timeout: float | None = None) int
produce(topic: str, value: str | bytes | None = None, *, key: str | bytes | None = None, partition: int | None = None, on_delivery: Callable[[KafkaError | None, Message], Awaitable[None] | None] | None = None, timestamp: int | datetime | None = None, headers: Mapping[str, str | bytes | None] | Sequence[tuple[str, str | bytes | None]] | None = None) None
async purge(in_queue: bool = True, in_flight: bool = True, blocking: bool = True) None
async send_offsets_to_transaction(positions: Iterable[TopicPartition], group_metadata: object, timeout: float | None = None) None
set_sasl_credentials(username: str, password: str) None
class kafka_async.Consumer(config: Mapping[str, Any])
assign(partitions: Iterable[TopicPartition]) None
assignment() list[TopicPartition]
async close() None
commit(message: Message | None = None, offsets: Iterable[TopicPartition] | None = None, *, asynchronous: Literal[True] = True) None
commit(message: Message | None = None, offsets: Iterable[TopicPartition] | None = None, *, asynchronous: Literal[False]) list[TopicPartition]
async committed(partitions: Iterable[TopicPartition], timeout: float | None = None) list[TopicPartition]
async consume(num_messages: int = 1, timeout: float | None = None) list[Message]
consumer_group_metadata() object
get_watermark_offsets(partition: TopicPartition, *, cached: Literal[True]) tuple[int, int]
get_watermark_offsets(partition: TopicPartition, timeout: float | None = None, *, cached: Literal[False] = False) tuple[int, int]
incremental_assign(partitions: Iterable[TopicPartition]) None
incremental_unassign(partitions: Iterable[TopicPartition]) None
async list_topics(topic: str | None = None, timeout: float | None = None) ClusterMetadata
memberid() str | None
async offsets_for_times(partitions: Iterable[TopicPartition], timeout: float | None = None) list[TopicPartition]
pause(partitions: Iterable[TopicPartition]) None
async poll(timeout: float | None = None) Message | None
position(partitions: Iterable[TopicPartition]) list[TopicPartition]
resume(partitions: Iterable[TopicPartition]) None
seek(partition: TopicPartition) None
set_sasl_credentials(username: str, password: str) None
store_offsets(message: Message | None = None, offsets: Iterable[TopicPartition] | None = None) None
subscribe(topics: Iterable[str], *, on_assign: Callable[[Consumer, list[TopicPartition]], Awaitable[None] | None] | None = None, on_revoke: Callable[[Consumer, list[TopicPartition]], Awaitable[None] | None] | None = None, on_lost: Callable[[Consumer, list[TopicPartition]], Awaitable[None] | None] | None = None) None
unassign() None
unsubscribe() None
class kafka_async.AdminClient(config: Mapping[str, Any], *, logger: Logger | None = None)
create_topics(new_topics: Iterable[NewTopic], *, operation_timeout: float | None = None, request_timeout: float | None = None, validate_only: bool = False) FuturesDict[str, None]
delete_topics(topics: Iterable[str], *, operation_timeout: float | None = None, request_timeout: float | None = None) FuturesDict[str, None]
async list_topics(topic: str | None = None, timeout: float | None = None) ClusterMetadata
create_partitions(new_partitions: Iterable[NewPartitions], *, operation_timeout: float | None = None, request_timeout: float | None = None, validate_only: bool = False) FuturesDict[str, None]
describe_configs(resources: Iterable[ConfigResource], *, request_timeout: float | None = None) FuturesDict[ConfigResource, dict[str, ConfigEntry]]
incremental_alter_configs(resources: Iterable[ConfigResource], *, request_timeout: float | None = None, validate_only: bool = False, broker: int | None = None) FuturesDict[ConfigResource, None]
create_acls(acls: Iterable[AclBinding], *, request_timeout: float | None = None) FuturesDict[AclBinding, None]
async describe_acls(acl_binding_filter: AclBindingFilter, *, request_timeout: float | None = None) list[AclBinding]
delete_acls(acl_binding_filters: Iterable[AclBindingFilter], *, request_timeout: float | None = None) FuturesDict[AclBindingFilter, list[AclBinding]]
async list_consumer_groups(*, request_timeout: float | None = None, states: Iterable[ConsumerGroupState] | None = None, types: Iterable[ConsumerGroupType] | None = None) ListConsumerGroupsResult
describe_consumer_groups(group_ids: Iterable[str], *, include_authorized_operations: bool = False, request_timeout: float | None = None) FuturesDict[str, ConsumerGroupDescription]
describe_topics(topics: TopicCollection | Iterable[str], *, include_authorized_operations: bool = False, request_timeout: float | None = None) FuturesDict[str, TopicDescription]
async describe_cluster(*, include_authorized_operations: bool = False, request_timeout: float | None = None) DescribeClusterResult
delete_consumer_groups(group_ids: Iterable[str], *, request_timeout: float | None = None) FuturesDict[str, None]
list_consumer_group_offsets(list_consumer_group_offsets_request: Iterable[ConsumerGroupTopicPartitions], *, require_stable: bool = False, request_timeout: float | None = None) FuturesDict[str, ConsumerGroupTopicPartitions]
alter_consumer_group_offsets(alter_consumer_group_offsets_request: Iterable[ConsumerGroupTopicPartitions], *, request_timeout: float | None = None) FuturesDict[ConsumerGroupTopicPartitions, ConsumerGroupTopicPartitions]
set_sasl_credentials(username: str, password: str) None
describe_user_scram_credentials(users: None = None, *, request_timeout: float | None = None) dict[str, UserScramCredentialsDescription]
describe_user_scram_credentials(users: Iterable[str], *, request_timeout: float | None = None) FuturesDict[str, UserScramCredentialsDescription]
describe_user_scram_credentials(users: None | Iterable[str], *, request_timeout: float | None = None) Coroutine[Any, Any, dict[str, UserScramCredentialsDescription]] | FuturesDict[str, UserScramCredentialsDescription]
alter_user_scram_credentials(alterations: Iterable[UserScramCredentialAlteration], *, request_timeout: float | None = None) FuturesDict[str, None]
list_offsets(topic_partition_offsets: Mapping[TopicPartition, OffsetSpec], *, isolation_level: IsolationLevel | None = None, request_timeout: float | None = None) FuturesDict[TopicPartition, ListOffsetsResultInfo]
delete_records(topic_partition_offsets: Iterable[TopicPartition], *, request_timeout: float | None = None, operation_timeout: float | None = None) FuturesDict[TopicPartition, DeletedRecords]
async elect_leaders(election_type: ElectionType, partitions: Iterable[TopicPartition] | None = None, *, request_timeout: float | None = None, operation_timeout: float | None = None) dict[TopicPartition, KafkaException | None]
class kafka_async.FuturesDict

A dictionary which values are awaitable objects. It can be awaited to get a dictionary of results.

async def foo(x: int) -> int:
    return x ** 2

data = FuturesDict({1: foo(1), 2: foo(2)})
assert await data == {1: 1, 2: 4}
classmethod from_concurrent_futures(futures: Mapping[_K, Future[_V]]) Self