API

asynckafka.set_debug()

Set the debug status in asynckafka. By default some debug logging calls in the performance critical path are disabled by default. Activating the debug mode has a performance cost even if you do not have the python logging enabled, since the calls to the logger are made and the strings for it are built.

Parameters

set_debug (bool) – Enable of disable the debug.

asynckafka.is_in_debug()
Returns

True if debug False if not.

Return type

bool

Consumer

class asynckafka.Consumer

TODO DOC

Example

Consumer works as a async iterator:

consumer = Consumer(['my_topic'])
consumer.start()

async for message in consumer:
    print(message.payload)
Parameters
  • brokers (str) – Brokers separated with “,”, example: “192.168.1.1:9092,192.168.1.2:9092”.

  • topics (list) – Topics to consume.

  • group_id (str) – Consumer group identifier.

  • rdk_consumer_config (dict) – Rdkafka consumer settings.

  • rdk_topic_config (dict) – Rdkafka topic settings.

  • error_callback (Coroutine[asyncio.exceptions.KafkaError]) – Coroutine with one argument (KafkaError). It is scheduled in the loop when there is an error, for example, if the broker is down.

  • loop (asyncio.AbstractEventLoop) – Asyncio event loop.

assignment()

Partition assignment of the consumer.

Returns

List with

the current partition assignment of the consumer.

Return type

[asynckafka.consumer.topic_partition.TopicPartition]

commit_partitions()

Commit topic partitions.

Parameters

[asynckafka.consumer.topic_partition.TopicPartition] – topic partitions to commit.

is_consuming()

Method for check the consumer state.

Returns

True if the consumer is consuming false if not.

Return type

bool

is_stopped()

Method for check the consumer state.

Returns

True if the consumer is stopped false if not.

Return type

bool

start()

Start the consumer. It is necessary call this method before start to consume messages.

Raises
  • asynckafka.exceptions.ConsumerError – Error in the initialization of the consumer client.

  • asynckafka.exceptions.InvalidSetting – Invalid setting in consumer_settings or topic_settings.

  • asynckafka.exceptions.UnknownSetting – Unknown setting in consumer_settings or topic_settings.

stop()

Stop the consumer. It is advisable to call this method before closing the python interpreter. It are going to stop the asynciterator, asyncio tasks opened by this client and free the memory used by the consumer.

Raises
  • asynckafka.exceptions.ConsumerError – Error in the shut down of the consumer client.

  • asynckafka.exceptions.InvalidSetting – Invalid setting in consumer_settings or topic_settings.

  • asynckafka.exceptions.UnknownSetting – Unknown setting in consumer_settings or topic_settings.

Producer

class asynckafka.Producer

TODO DOC

Parameters
  • brokers (str) – Brokers separated with “,”, example: “192.168.1.1:9092,192.168.1.2:9092”.

  • rdk_producer_config (dict) – Rdkafka producer settings.

  • rdk_topic_config (dict) – Rdkafka topic settings.

  • error_callback (Coroutine[asyncio.exceptions.KafkaError]) – Coroutine with one argument (KafkaError). It is scheduled in the loop when there is an error, for example, if the broker is down.

  • loop (asyncio.AbstractEventLoop) – Asyncio event loop.

produce()

Produce one message to a certain topic. The producer should be running. The message will be copied to internal queues and it will be sent in batches before the end of this coroutine, so this function does not guarantee the delivery of the message.

Parameters
  • topic (str) –

  • message (bytes) –

  • key (bytes) –

start()

Start the producer. It is necessary call this method before start to produce messages.

Raises
  • asynckafka.exceptions.ProducerError – Error in the initialization of the producer client.

  • asynckafka.exceptions.InvalidSetting – Invalid setting in producer_settings or topic_settings.

  • asynckafka.exceptions.UnknownSetting – Unknown setting in producer_settings or topic_settings.

stop()

Stop the producer. Tt is advisable to call this method before closing the python interpreter. Once the producer is stopped, all calls to produce should raise a ProducerError.

Parameters

timeout (float, int) – Maximum time available time to flush the messages.

Raises
  • asynckafka.exceptions.ProducerError – Error in the shut down of the producer client.

  • asynckafka.exceptions.InvalidSetting – Invalid setting in consumer_settings or topic_settings.

  • asynckafka.exceptions.UnknownSetting – Unknown setting in consumer_settings or topic_settings.

Message

class asynckafka.Message
error

1 if the message is a error 0 if not

Kafka Error

class asynckafka.exceptions.KafkaError(rk_name, error_code, error_str, reason, consumer_or_producer)

Input of error callback.

property consumer_or_producer

Returns the producer or consumer that trigger the error.

Returns

Consumer or producer object that triggered the error.

Return type

Union[asynckafka.Consumer, asynckafka.Producer]

property error_code

Rdkafka error code.

Returns

RdKafka error code.

Return type

int

property error_str

Text provided by rdkafka with a human readable description of the error.

Returns

Text describing the error.

Return type

str

is_from_consumer()

Check if the error is triggered by a consumer.

Returns

True if it is a consumer else if not

Return type

bool

is_from_producer()

Check if the error is triggered by a producer.

Returns

True if it is a producer else if not

Return type

bool

property reason

Text provided by rdkafka describing possible causes of the error.

Returns

Text describing the reason of the error.

Return type

str

property rk_name

Rdkafka name of the producer or consumer that trigger the error_callback.

Returns

RDK name to identify the consumer or producer.

Return type

str