rabbitmq-server/deps/rabbitmq_stream/docs/PROTOCOL.adoc

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

841 lines
19 KiB
Plaintext
Raw Permalink Normal View History

= RabbitMQ Streams Protocol Reference
2020-05-25 21:55:51 +08:00
2023-03-21 18:58:50 +08:00
This is the reference of the RabbitMQ Streams protocol.
2020-06-09 19:53:49 +08:00
The https://github.com/rabbitmq/rabbitmq-stream-java-client[RabbitMQ Stream Java client]
2020-06-09 19:53:49 +08:00
is currently the reference implementation.
2020-05-25 21:55:51 +08:00
== Types
int8, int16, int32, int64 - Signed integers (big endian order)
uint8, uint16, uint32, uint64 - Unsigned integers (big endian order)
2020-05-25 21:55:51 +08:00
bytes - int32 for the length followed by the bytes of content, length of -1 indicates null.
string - int16 for the length followed by the bytes of UTF8-encoded content, length of -1 indicates null.
2020-05-25 21:55:51 +08:00
arrays - int32 for the length followed by the repetition of the structure, notation uses [], e.g.
[int32] for an array of int32.
== Frame Structure
```
Frame => Size (Request | Response | Command)
Size => uint32 (size without the 4 bytes of the size element)
2020-05-25 21:55:51 +08:00
Request => Key Version (CorrelationId) Content
Key => uint16
Version => uint16
CorrelationId => uint32
2020-05-25 21:55:51 +08:00
Command => bytes // see command details below
Response => Key Version CorrelationId ResponseCode
Key => uint16
Version => uint16
CorrelationId => uint32
ResponseCode => uint16
2020-05-25 21:55:51 +08:00
Command => Key Version Content
Key => uint16
Version => uint16
2020-05-25 21:55:51 +08:00
Content => bytes // see command details below
```
Most commands are request/reply, but some commands (e.g. `Deliver`) are one-direction only and thus
does not contain a correlation ID.
Some responses may carry additional information than just the response code, this is specified in the command definition.
2020-05-25 21:55:51 +08:00
Keys are uint16, but the actual value is defined on the last 15 bits, the most significant bit being
used to make the difference between a request (0) and a response (1). Example for `subscribe`
(key is 6):
```
0x0006 => subscribe request
0x8006 => subscribe response
```
== Response Codes
.Stream Protocol Response Codes
|===
|Response|Code
|OK|0x01
|Stream does not exist|0x02
|Subscription ID already exists|0x03
|Subscription ID does not exist|0x04
|Stream already exists|0x05
|Stream not available|0x06
|SASL mechanism not supported|0x07
|Authentication failure|0x08
|SASL error|0x09
|SASL challenge|0x0a
|SASL authentication failure loopback|0x0b
|Virtual host access failure|0x0c
|Unknown frame|0x0d
|Frame too large|0x0e
|Internal error|0x0f
|Access refused|0x10
|Precondition failed|0x11
|Publisher does not exist|0x12
|No offset|0x13
|===
2020-05-25 21:55:51 +08:00
== Commands
.Stream Protocol Commands
|===
|Command |From |Key | Expects response?
|<<declarepublisher>>
2020-05-25 21:55:51 +08:00
|Client
|0x0001
|Yes
2020-05-25 21:55:51 +08:00
|<<publish>>
|Client
|0x0002
2020-05-25 21:55:51 +08:00
|No
|<<publishconfirm>>
|Server
|0x0003
|No
|<<publisherror>>
|Server
|0x0004
|No
|<<querypublishersequence>>
|Client
|0x0005
|Yes
|<<deletepublisher>>
|Client
|0x0006
|Yes
|<<subscribe>>
|Client
|0x0007
|Yes
|<<deliver>>
|Server
|0x0008
|No
|<<credit>>
|Client
|0x0009
|No
|<<storeoffset>>
|Client
|0x000a
|No
|<<queryoffset>>
|Client
|0x000b
|Yes
|<<unsubscribe>>
|Client
|0x000c
|Yes
|<<create>>
|Client
|0x000d
|Yes
|<<delete>>
|Client
|0x000e
|Yes
|<<metadata>>
|Client
|0x000f
|Yes
|<<metadataupdate>>
|Server
|0x0010
|No
|<<peerproperties>>
2020-09-15 23:52:05 +08:00
|Client
|0x0011
|Yes
2020-09-15 23:52:05 +08:00
|<<saslhandshake>>
2020-09-15 23:52:05 +08:00
|Client
|0x0012
2020-09-15 23:52:05 +08:00
|Yes
|<<saslauthenticate>>
|Client
|0x0013
|Yes
|<<tune>>
|Server
|0x0014
|Yes
|<<open>>
|Client
|0x0015
2020-11-26 17:02:13 +08:00
|Yes
|<<close>>
|Client & Server
|0x0016
|Yes
|<<heartbeat>>
|Client & Server
|0x0017
|No
|<<route>>
|Client
|0x0018
|Yes
|<<partitions>>
|Client
|0x0019
|Yes
|<<consumerupdate>>
|Server
|0x001a
|Yes
|<<exchangecommandversions>>
|Client
|0x001b
|Yes
|<<streamstats>>
|Client
|0x001c
|Yes
|<<createsuperstream>>
|Client
|0x001d
|Yes
|<<deletesuperstream>>
|Client
|0x001e
|Yes
2020-05-25 21:55:51 +08:00
|===
=== DeclarePublisher
```
DeclarePublisherRequest => Key Version CorrelationId PublisherId [PublisherReference] Stream
Key => uint16 // 0x0001
Version => uint16
CorrelationId => uint32
PublisherId => uint8
PublisherReference => string // max 256 characters
Stream => string
DeclarePublisherResponse => Key Version CorrelationId ResponseCode
Key => uint16 // 0x8001
Version => uint16
CorrelationId => uint32
ResponseCode => uint16
```
2020-05-25 21:55:51 +08:00
=== Publish
2023-05-23 20:41:31 +08:00
Version 1
```
Publish => Key Version PublisherId PublishedMessages
Key => uint16 // 0x0002
Version => uint16
PublisherId => uint8
PublishedMessages => [PublishedMessage]
PublishedMessage => PublishingId Message
PublishingId => uint64
Message => bytes
```
Version 2
2020-05-25 21:55:51 +08:00
```
2021-07-21 16:13:55 +08:00
Publish => Key Version PublisherId PublishedMessages
Key => uint16 // 0x0002
Version => uint16
PublisherId => uint8
2020-05-25 21:55:51 +08:00
PublishedMessages => [PublishedMessage]
PublishedMessage => PublishingId Message
PublishingId => uint64
2023-05-23 20:41:31 +08:00
FilterValue => string
2020-05-25 21:55:51 +08:00
Message => bytes
```
1. Use version 1 if there is no filter value.
2. Use version 2 if there is a filter value.
2020-05-25 21:55:51 +08:00
=== PublishConfirm
```
PublishConfirm => Key Version PublishingIds
Key => uint16 // 0x0003
Version => uint16
PublisherId => uint8
PublishingIds => [uint64] // to correlate with the messages sent
2020-05-25 21:55:51 +08:00
```
=== PublishError
```
PublishError => Key Version [PublishingError]
Key => uint16 // 0x0004
Version => uint16
PublisherId => uint8
PublishingError => PublishingId Code
PublishingId => uint64
Code => uint16 // code to identify the problem
```
=== QueryPublisherSequence
```
QueryPublisherRequest => Key Version CorrelationId PublisherReference Stream
Key => uint16 // 0x0005
Version => uint16
CorrelationId => uint32
PublisherReference => string // max 256 characters
Stream => string
QueryPublisherResponse => Key Version CorrelationId ResponseCode Sequence
Key => uint16 // 0x8005
Version => uint16
CorrelationId => uint32
ResponseCode => uint16
Sequence => uint64
```
=== DeletePublisher
```
DeletePublisherRequest => Key Version CorrelationId PublisherId
Key => uint16 // 0x0006
Version => uint16
CorrelationId => uint32
PublisherId => uint8
DeletePublisherResponse => Key Version CorrelationId ResponseCode
Key => uint16 // 0x8006
Version => uint16
CorrelationId => uint32
ResponseCode => uint16
```
=== Subscribe
```
Subscribe => Key Version CorrelationId SubscriptionId Stream OffsetSpecification Credit Properties
Key => uint16 // 0x0007
Version => uint16
CorrelationId => uint32 // correlation id to correlate the response
SubscriptionId => uint8 // client-supplied id to identify the subscription
Stream => string // the name of the stream
OffsetSpecification => OffsetType Offset
OffsetType => uint16 // 1 (first), 2 (last), 3 (next), 4 (offset), 5 (timestamp)
Offset => uint64 (for offset) | int64 (for timestamp)
Credit => uint16
Properties => [Property]
Property => Key Value
Key => string
Value => string
```
NB: Timestamp is https://www.erlang.org/doc/apps/erts/time_correction.html#Erlang_System_Time[Erlang system time],
milliseconds from epoch
Supported properties:
* `single-active-consumer`: set to `true` to enable https://blog.rabbitmq.com/posts/2022/07/rabbitmq-3-11-feature-preview-single-active-consumer-for-streams/[single active consumer] for this subscription.
* `super-stream`: set to the name of the super stream the subscribed is a partition of.
* `filter.` (e.g. `filter.0`, `filter.1`, etc): prefix to use to define filter values for the subscription.
* `match-unfiltered`: whether to return messages without any filter value or not.
=== Deliver
Version 1
```
Deliver => Key Version SubscriptionId OsirisChunk
Key => uint16 // 0x0008
Version => uint16
SubscriptionId => uint8
OsirisChunk => MagicVersion ChunkType NumEntries NumRecords Timestamp Epoch ChunkFirstOffset ChunkCrc DataLength TrailerLength BloomSize Reserved Messages
MagicVersion => int8
ChunkType => int8 // 0: user, 1: tracking delta, 2: tracking snapshot
NumEntries => uint16
NumRecords => uint32
Timestamp => int64 // erlang system time in milliseconds, since epoch
Epoch => uint64
ChunkFirstOffset => uint64
ChunkCrc => int32
DataLength => uint32
TrailerLength => uint32
BloomSize => uint8 // size of bloom filter data, ignored at the moment
2024-05-27 06:26:12 +08:00
Reserved => uint24 // 24 bits reserved for future use
2024-05-27 06:21:26 +08:00
Messages => [Message] // a continous collection of messages, the size of the array is defined by NumEntries
Message => EntryTypeAndSize
Data => bytes
```
Version 2
```
Deliver => Key Version SubscriptionId CommittedOffset OsirisChunk
Key => uint16 // 0x0008
Version => uint16
SubscriptionId => uint8
CommittedChunkId => uint64
OsirisChunk => MagicVersion ChunkType NumEntries NumRecords Timestamp Epoch ChunkFirstOffset ChunkCrc DataLength TrailerLength BloomSize Reserved Messages
MagicVersion => int8
ChunkType => int8 // 0: user, 1: tracking delta, 2: tracking snapshot
NumEntries => uint16
NumRecords => uint32
Timestamp => int64 // erlang system time in milliseconds, since epoch
Epoch => uint64
ChunkFirstOffset => uint64
ChunkCrc => int32
DataLength => uint32
TrailerLength => uint32
BloomSize => uint8 // size of bloom filter data, ignored at the moment
2024-05-27 06:26:12 +08:00
Reserved => uint24 // 24 bits reserved for future use
2024-05-27 06:21:26 +08:00
Messages => [Message] // a continous collection of messages, the size of the array is defined by NumEntries
Message => EntryTypeAndSize
Data => bytes
```
NB: See the https://github.com/rabbitmq/osiris/blob/12a430b11be2c2be3f26ce4f2d7268954c7ec02b/src/osiris_log.erl#L126-L195[Osiris project]
for details on the structure of messages.
=== Credit
```
Credit => Key Version SubscriptionId Credit
Key => uint16 // 0x0009
Version => uint16
SubscriptionId => uint8
Credit => uint16 // the number of chunks that can be sent
CreditResponse => Key Version ResponseCode SubscriptionId
Key => uint16 // 0x8009
Version => uint16
ResponseCode => uint16
SubscriptionId => uint8
```
NB: the server sent a response only in case of problem, e.g. crediting an unknown subscription.
=== StoreOffset
```
StoreOffset => Key Version Reference Stream Offset
Key => uint16 // 0x000a
Version => uint16
Reference => string // max 256 characters
Stream => string // the name of the stream
Offset => uint64
```
=== QueryOffset
```
QueryOffsetRequest => Key Version CorrelationId Reference Stream
Key => uint16 // 0x000b
Version => uint16
CorrelationId => uint32
Reference => string // max 256 characters
Stream => string
QueryOffsetResponse => Key Version CorrelationId ResponseCode Offset
Key => uint16 // 0x800b
Version => uint16
CorrelationId => uint32
ResponseCode => uint16
Offset => uint64
```
=== Unsubscribe
```
Unsubscribe => Key Version CorrelationId SubscriptionId
Key => uint16 // 0x000c
Version => uint16
CorrelationId => uint32
SubscriptionId => uint8
UnsubscribeResponse => Key Version CorrelationId ResponseCode
Key => uint16 // 0x800c
Version => uint16
CorrelationId => uint32
ResponseCode => uint16
```
=== Create
```
Create => Key Version CorrelationId Stream Arguments
Key => uint16 // 0x000d
Version => uint16
CorrelationId => uint32
Stream => string
Arguments => [Argument]
Argument => Key Value
Key => string
Value => string
```
=== Delete
```
Delete => Key Version CorrelationId Stream
Key => uint16 // 0x000e
Version => uint16
CorrelationId => uint32
Stream => string
```
=== Metadata
```
MetadataQuery => Key Version CorrelationId [Stream]
Key => uint16 // 0x000f
Version => uint16
CorrelationId => uint32
Stream => string
MetadataResponse => Key Version CorrelationId [Broker] [StreamMetadata]
Key => uint16 // 0x800f
Version => uint16
CorrelationId => uint32
Broker => Reference Host Port
Reference => uint16
Host => string
Port => uint32
StreamMetadata => StreamName ResponseCode LeaderReference ReplicasReferences
StreamName => string
ResponseCode => uint16
LeaderReference => uint16
ReplicasReferences => [uint16]
```
=== MetadataUpdate
```
MetadataUpdate => Key Version MetadataInfo
Key => uint16 // 0x0010
Version => uint16
MetadataInfo => Code Stream
Code => uint16 // code to identify the information
Stream => string // the stream implied
```
=== PeerProperties
```
PeerPropertiesRequest => Key Version PeerProperties
Key => uint16 // 0x0011
Version => uint16
CorrelationId => uint32
PeerProperties => [PeerProperty]
PeerProperty => Key Value
Key => string
Value => string
PeerPropertiesResponse => Key Version CorrelationId ResponseCode PeerProperties
Key => uint16 // 0x8011
Version => uint16
CorrelationId => uint32
ResponseCode => uint16
PeerProperties => [PeerProperty]
PeerProperty => Key Value
Key => string
Value => string
```
=== SaslHandshake
2020-09-15 23:52:05 +08:00
```
SaslHandshakeRequest => Key Version CorrelationId Mechanism
Key => uint16 // 0x0012
Version => uint16
CorrelationId => uint32
2020-09-15 23:52:05 +08:00
SaslHandshakeResponse => Key Version CorrelationId ResponseCode [Mechanisms]
Key => uint16 // 0x8012
Version => uint16
CorrelationId => uint32
ResponseCode => uint16
Mechanisms => [Mechanism]
Mechanism => string
2020-09-15 23:52:05 +08:00
```
=== SaslAuthenticate
```
SaslAuthenticateRequest => Key Version CorrelationId Mechanism SaslOpaqueData
Key => uint16 // 0x0013
Version => uint16
CorrelationId => uint32
Mechanism => string
SaslOpaqueData => bytes
SaslAuthenticateResponse => Key Version CorrelationId ResponseCode SaslOpaqueData
Key => uint16 // 0x8013
Version => uint16
CorrelationId => uint32
ResponseCode => uint16
SaslOpaqueData => bytes
```
=== Tune
```
TuneRequest => Key Version FrameMax Heartbeat
Key => uint16 // 0x0014
Version => uint16
FrameMax => uint32 // in bytes, 0 means no limit
Heartbeat => uint32 // in seconds, 0 means no heartbeat
TuneResponse => TuneRequest
```
=== Open
2020-11-26 17:02:13 +08:00
```
OpenRequest => Key Version CorrelationId VirtualHost
Key => uint16 // 0x0015
Version => uint16
CorrelationId => uint32
VirtualHost => string
2020-11-26 17:02:13 +08:00
OpenResponse => Key Version CorrelationId ResponseCode ConnectionProperties
Key => uint16 // 0x8015
Version => uint16
CorrelationId => uint32
ResponseCode => uint16
ConnectionProperties => [ConnectionProperty]
ConnectionProperty => Key Value
Key => string
Value => string
2020-11-26 17:02:13 +08:00
```
=== Close
```
CloseRequest => Key Version CorrelationId ClosingCode ClosingReason
Key => uint16 // 0x0016
Version => uint16
CorrelationId => uint32
ClosingCode => uint16
ClosingReason => string
CloseResponse => Key Version CorrelationId ResponseCode
Key => uint16 // 0x8016
Version => uint16
CorrelationId => uint32
ResponseCode => uint16
```
=== Heartbeat
```
Heartbeat => Key Version
Key => uint16 // 0x0017
Version => uint16
```
2020-05-25 23:53:34 +08:00
=== Route
```
RouteQuery => Key Version CorrelationId RoutingKey SuperStream
Key => uint16 // 0x0018
Version => uint16
CorrelationId => uint32
RoutingKey => string
SuperStream => string
RouteResponse => Key Version CorrelationId ResponseCode [Stream]
Key => uint16 // 0x8018
Version => uint16
CorrelationId => uint32
ResponseCode => uint16
Stream => string
```
2021-02-26 18:48:45 +08:00
=== Partitions
```
PartitionsQuery => Key Version CorrelationId SuperStream
Key => uint16 // 0x0019
Version => uint16
CorrelationId => uint32
SuperStream => string
PartitionsResponse => Key Version CorrelationId ResponseCode [Stream]
Key => uint16 // 0x8019
Version => uint16
CorrelationId => uint32
ResponseCode => uint16
Stream => string
```
=== ConsumerUpdate
```
ConsumerUpdateQuery => Key Version CorrelationId SubscriptionId Active
Key => uint16 // 0x001a
Version => uint16
CorrelationId => uint32
SubscriptionId => uint8
Active => uint8 (boolean, 0 = false, 1 = true)
ConsumerUpdateResponse => Key Version CorrelationId ResponseCode OffsetSpecification
Key => uint16 // 0x801a
Version => uint16
CorrelationId => uint32
ResponseCode => uint16
OffsetSpecification => OffsetType Offset
OffsetType => uint16 // 0 (none), 1 (first), 2 (last), 3 (next), 4 (offset), 5 (timestamp)
Offset => uint64 (for offset) | int64 (for timestamp)
```
=== ExchangeCommandVersions
```
CommandVersionsExchangeRequest => Key Version CorrelationId [Command]
Key => uint16 // 0x001b
Version => uint16
CorrelationId => uint32
Command => Key MinVersion MaxVersion
Key => uint16
MinVersion => uint16
MaxVersion => uint16
CommandVersionsExchangeResponse => Key Version CorrelationId ResponseCode [Command]
Key => uint16 // 0x801b
Version => uint16
CorrelationId => uint32
ResponseCode => uint16
Command => Key MinVersion MaxVersion
Key => uint16
MinVersion => uint16
MaxVersion => uint16
```
=== StreamStats
```
StreamStatsRequest => Key Version CorrelationId Stream
Key => uint16 // 0x001c
Version => uint16
CorrelationId => uint32
Stream => string
StreamStatsResponse => Key Version CorrelationId ResponseCode Stats
Key => uint16 // 0x801c
Version => uint16
CorrelationId => uint32
ResponseCode => uint16
Stats => [Statistic]
Statistic => Key Value
Key => string
Value => int64
```
=== CreateSuperStream
```
CreateSuperStream => Key Version CorrelationId Name [Partition] [BindingKey] Arguments
Key => uint16 // 0x001d
Version => uint16
CorrelationId => uint32
Name => string
Partition => string
BindingKey => string
Arguments => [Argument]
Argument => Key Value
Key => string
Value => string
```
=== DeleteSuperStream
```
Delete => Key Version CorrelationId Name
Key => uint16 // 0x001e
Version => uint16
CorrelationId => uint32
Name => string
```
2020-05-25 23:53:34 +08:00
== Authentication
Once a client is connected to the server, it initiates an authentication
sequence. The next figure shows the steps of the sequence:
[ditaa]
.Authentication Sequence
....
Client Server
+ +
| Peer Properties Exchange |
|-------------------------->|
|<--------------------------|
| |
2020-05-25 23:53:34 +08:00
| SASL Handshake |
|-------------------------->|
|<--------------------------|
| |
| SASL Authenticate |
|-------------------------->|
|<--------------------------|
| |
| Tune |
|<--------------------------|
|-------------------------->|
| |
| Open |
|-------------------------->|
|<--------------------------|
| |
+ +
....
* SaslHandshake: the client asks about the SASL mechanisms the server supports. It
can then pick one from the list the server returns.
* SaslAuthenticate: the client answers to the server's challenge(s), using the
SASL mechanism it picked. The server will send a `Tune` frame once it is satisfied
with the client authentication response.
2020-05-25 23:56:02 +08:00
* Tune: the server sends a `Tune` frame to suggest some settings (max frame size, heartbeat).
The client answers with a `Tune` frame with the settings he agrees on, possibly adjusted
from the server's suggestions.
* Open: the client sends an `Open` frame to pick a virtual host to connect to. The server
answers whether it accepts the access or not.