Consume
Consume
type: "io.kestra.plugin.pulsar.Consume"
Consume messages from Pulsar topic(s).
Examples
id: pulsar_consume
namespace: company.team
tasks:
- id: consume
type: io.kestra.plugin.pulsar.Consume
uri: pulsar://localhost:26650
topic: test_kestra
deserializer: JSON
subscriptionName: kestra_flow
Properties
deserializer
- Type: object
- Dynamic: ❓
- Required: ✔️
initialPosition
- Type: string
- Dynamic: ❌
- Required: ✔️
- Default:
Earliest
- Possible Values:
Latest
Earliest
The position of a subscription to the topic.
pollDuration
- Type: string
- Dynamic: ✔️
- Required: ✔️
- Default:
2.000000000
- Format:
duration
Duration waiting for record to be polled.
If no records are available, the maximum wait to wait for a new record.
subscriptionName
- Type: string
- Dynamic: ✔️
- Required: ✔️
The subscription name.
Using subscription name, we will fetch only records that haven't been consumed yet.
subscriptionType
- Type: string
- Dynamic: ❌
- Required: ✔️
- Default:
Exclusive
- Possible Values:
Exclusive
Shared
Failover
Key_Shared
The subscription type.
topic
- Type: object
- Dynamic: ✔️
- Required: ✔️
Pulsar topic(s) where to consume messages from.
Can be a string or a list of strings to consume from multiple topics.
uri
- Type: string
- Dynamic: ✔️
- Required: ✔️
Connection URLs.
You need to specify a Pulsar protocol URL.
- Example of localhost:
pulsar://localhost:6650
- If you have multiple brokers:
pulsar://localhost:6650,localhost:6651,localhost:6652
- If you use TLS authentication:
pulsar+ssl://pulsar.us-west.example.com:6651
authenticationToken
- Type: string
- Dynamic: ✔️
- Required: ❌
Authentication token.
Authentication token that can be required by some providers such as Clever Cloud.
consumerName
- Type: string
- Dynamic: ✔️
- Required: ❌
The consumer name.
consumerProperties
- Type: object
- SubType: string
- Dynamic: ✔️
- Required: ❌
Add all the properties in the provided map to the consumer.
encryptionKey
- Type: string
- Dynamic: ✔️
- Required: ❌
Add a public encryption key to the producer/consumer.
maxDuration
- Type: string
- Dynamic: ❌
- Required: ❌
- Format:
duration
The maximum duration waiting for new record.
It's not a hard limit and is evaluated every second.
maxRecords
- Type: integer
- Dynamic: ❌
- Required: ❌
The maximum number of records to fetch before stopping.
It's not a hard limit and is evaluated every second.
schemaString
- Type: string
- Dynamic: ✔️
- Required: ❌
JSON string of the topic's schema
Required for connecting with topics with a defined schema and strict schema checking
schemaType
- Type: string
- Dynamic: ✔️
- Required: ❌
- Default:
NONE
- Possible Values:
NONE
AVRO
JSON
The schema type of the topic
Can be one of NONE, AVRO or JSON. None means there will be no schema enforced.
tlsOptions
- Type: AbstractPulsarConnection-TlsOptions
- Dynamic: ❌
- Required: ❌
TLS authentication options.
You need to use "pulsar+ssl://" in serviceUrl to enable TLS support.
Outputs
messagesCount
- Type: integer
- Required: ❌
Number of messages consumed.
uri
- Type: string
- Required: ❌
- Format:
uri
URI of a Kestra internal storage file containing the consumed messages.
Definitions
io.kestra.plugin.pulsar.AbstractPulsarConnection-TlsOptions
Properties
ca
- Type: string
- Dynamic: ❓
- Required: ❌
The ca certificate.
Must be a base64-encoded pem file.
cert
- Type: string
- Dynamic: ❓
- Required: ❌
The client certificate.
Must be a base64-encoded pem file.
key
- Type: string
- Dynamic: ❓
- Required: ❌
The key certificate.
Must be a base64-encoded pem file.
Was this page helpful?