rabbitmq-server/deps/rabbitmq_stream/docs/PROTOCOL.adoc

651 lines
13 KiB
Plaintext
Raw Normal View History

2020-05-25 21:55:51 +08:00
= RabbitMQ Stream Protocol Reference
2020-06-09 19:53:49 +08:00
This is the reference of the RabbitMQ stream protocol. Note the protocol
is still under development and is subject to change.
The https://github.com/rabbitmq/rabbitmq-stream-java-client[RabbitMQ Stream Java client]
2020-06-09 19:53:49 +08:00
is currently the reference implementation.
2020-05-25 21:55:51 +08:00
== Types
int8, int16, int32, int64 - Signed integers (big endian order)
uint8, uint16, uint32, uint64 - Unsigned integers (big endian order)
2020-05-25 21:55:51 +08:00
bytes - int32 for the length followed by the bytes of content, length of -1 indicates null.
string - int16 for the length followed by the bytes of content, length of -1 indicates null.
arrays - int32 for the length followed by the repetition of the structure, notation uses [], e.g.
[int32] for an array of int32.
== Frame Structure
```
Frame => Size (Request | Response | Command)
Size => int32 (size without the 4 bytes of the size element)
Request => Key Version (CorrelationId) Content
Key => int16
Version => int16
CorrelationId => int32
Command => bytes // see command details below
Response => Key Version CorrelationId ResponseCode
Key => int16
Version => int16
CorrelationId => int32
ResponseCode => int16
Command => Key Version Content
Key => int16
Version => int16
Content => bytes // see command details below
```
Most commands are request/reply, but some commands (e.g. `Deliver`) are one-direction only and thus
does not contain a correlation ID.
Some responses may carry additional information than just the response code, this is specified in the command definition.
2020-05-25 21:55:51 +08:00
Keys are int16, but the actual value is defined on the last 15 bits, the most significant bit being
used to make the difference between a request (0) and a response (1). Example for `subscribe`
(key is 6):
```
0b00000000 00000110 => subscribe request
0b10000000 00000110 => subscribe response
```
== Response Codes
.Stream Protocol Response Codes
|===
|Response|Code
2021-02-26 18:48:45 +08:00
|OK|1
|Stream does not exist|2
|Subscription ID already exists|3
|Subscription ID does not exist|4
|Stream already exists|5
|Stream not available|6
|SASL mechanism not supported|7
|Authentication failure|8
|SASL error|9
|SASL challenge|10
|SASL authentication failure loopback|11
|Virtual host access failure|12
|Unknown frame|13
|Frame too large|14
|Internal error|15
|Access refused|16
|Precondition failed|17
|Publisher does not exist|18
|===
2020-05-25 21:55:51 +08:00
== Commands
.Stream Protocol Commands
|===
|Command |From |Key | Expects response?
|<<declarepublisher>>
2020-05-25 21:55:51 +08:00
|Client
2021-02-26 18:48:45 +08:00
|1
|Yes
2020-05-25 21:55:51 +08:00
|<<publish>>
|Client
2021-02-26 18:48:45 +08:00
|2
2020-05-25 21:55:51 +08:00
|No
|<<publishconfirm>>
|Server
2021-02-26 18:48:45 +08:00
|3
|No
|<<publisherror>>
|Server
2021-02-26 18:48:45 +08:00
|4
|No
|<<querypublishersequence>>
|Client
2021-02-26 18:48:45 +08:00
|5
|Yes
|<<deletepublisher>>
|Client
2021-02-26 18:48:45 +08:00
|6
|Yes
|<<subscribe>>
|Client
2021-02-26 18:48:45 +08:00
|7
|Yes
|<<deliver>>
|Server
2021-02-26 18:48:45 +08:00
|8
|No
|<<credit>>
|Client
2021-02-26 18:48:45 +08:00
|9
|No
|<<commitoffset>>
|Client
2021-02-26 18:48:45 +08:00
|10
|No
|<<queryoffset>>
|Client
2021-02-26 18:48:45 +08:00
|11
|Yes
|<<unsubscribe>>
|Client
2021-02-26 18:48:45 +08:00
|12
|Yes
|<<create>>
|Client
2021-02-26 18:48:45 +08:00
|13
|Yes
|<<delete>>
|Client
2021-02-26 18:48:45 +08:00
|14
|Yes
|<<metadata>>
|Client
2021-02-26 18:48:45 +08:00
|15
|Yes
|<<metadataupdate>>
|Server
2021-02-26 18:48:45 +08:00
|16
|No
|<<peerproperties>>
2020-09-15 23:52:05 +08:00
|Client
2021-02-26 18:48:45 +08:00
|17
|Yes
2020-09-15 23:52:05 +08:00
|<<saslhandshake>>
2020-09-15 23:52:05 +08:00
|Client
2021-02-26 18:48:45 +08:00
|18
2020-09-15 23:52:05 +08:00
|Yes
|<<saslauthenticate>>
|Client
2021-02-26 18:48:45 +08:00
|19
|Yes
|<<tune>>
|Server
2021-02-26 18:48:45 +08:00
|20
|Yes
|<<open>>
|Server
2021-02-26 18:48:45 +08:00
|21
2020-11-26 17:02:13 +08:00
|Yes
|<<close>>
|Client & Server
2021-02-26 18:48:45 +08:00
|22
|Yes
|<<heartbeat>>
|Client & Server
2021-02-26 18:48:45 +08:00
|23
|No
|<<route>> (experimental)
|Client
2021-02-26 18:48:45 +08:00
|24
|Yes
|<<partitions>> (experimental)
|Client
2021-02-26 18:48:45 +08:00
|25
|Yes
2020-05-25 21:55:51 +08:00
|===
=== DeclarePublisher
```
DeclarePublisherRequest => Key Version CorrelationId PublisherId [PublisherReference] Stream
2021-02-26 18:48:45 +08:00
Key => int16 // 1
Version => int16
CorrelationId => int32
PublisherId => uint8
PublisherReference => string // max 256 characters
Stream => string
DeclarePublisherResponse => Key Version CorrelationId ResponseCode PublisherId
2021-02-26 18:48:45 +08:00
Key => int16 // 1
Version => int16
CorrelationId => int32
ResponseCode => int16
```
2020-05-25 21:55:51 +08:00
=== Publish
```
Publish => Key Version Stream PublishedMessages
2021-02-26 18:48:45 +08:00
Key => int16 // 2
2020-05-25 21:55:51 +08:00
Version => int16
PublisherId => uint8
2020-05-25 21:55:51 +08:00
PublishedMessages => [PublishedMessage]
PublishedMessage => PublishingId Message
PublishingId => int64
2020-05-25 21:55:51 +08:00
Message => bytes
```
=== PublishConfirm
```
PublishConfirm => Key Version PublishingIds
2021-02-26 18:48:45 +08:00
Key => int16 // 3
2020-05-25 21:55:51 +08:00
Version => int16
PublisherId => uint8
2020-05-25 21:55:51 +08:00
PublishingIds => [int64] // to correlate with the messages sent
```
=== PublishError
```
PublishError => Key Version [PublishingError]
2021-02-26 18:48:45 +08:00
Key => int16 // 4
Version => int16
PublisherId => uint8
PublishingError => PublishingId Code
PublishingId => int64
Code => int16 // code to identify the problem
```
=== QueryPublisherSequence
```
QueryPublisherRequest => Key Version CorrelationId PublisherReference Stream
2021-02-26 18:48:45 +08:00
Key => int16 // 5
Version => int16
CorrelationId => int32
PublisherReference => string // max 256 characters
Stream => string
QueryPublisherResponse => Key Version CorrelationId ResponseCode Sequence
2021-02-26 18:48:45 +08:00
Key => int16 // 5
Version => int16
CorrelationId => int32
ResponseCode => int16
Sequence => uint64
```
=== DeletePublisher
```
DeletePublisherRequest => Key Version CorrelationId PublisherId
2021-02-26 18:48:45 +08:00
Key => int16 // 6
Version => int16
CorrelationId => int32
PublisherId => uint8
DeletePublisherResponse => Key Version CorrelationId ResponseCode
2021-02-26 18:48:45 +08:00
Key => int16 // 6
Version => int16
CorrelationId => int32
ResponseCode => int16
```
=== Subscribe
```
Subscribe => Key Version CorrelationId SubscriptionId Stream OffsetSpecification Credit
2021-02-26 18:48:45 +08:00
Key => int16 // 7
Version => int16
CorrelationId => int32 // correlation id to correlate the response
SubscriptionId => uint8 // client-supplied id to identify the subscription
Stream => string // the name of the stream
OffsetSpecification => OffsetType Offset
2021-02-26 18:48:45 +08:00
OffsetType => int16 // 1 (first), 2 (last), 3 (next), 4 (offset), 5 (timestamp)
Offset => uint64 (for offset) | int64 (for timestamp)
Credit => int16
```
=== Deliver
```
Deliver => Key Version SubscriptionId OsirisChunk
2021-02-26 18:48:45 +08:00
Key => int16 // 8
Version => int32
SubscriptionId => uint8
OsirisChunk => MagicVersion NumEntries NumRecords Epoch ChunkFirstOffset ChunkCrc DataLength Messages
MagicVersion => int8
NumEntries => uint16
NumRecords => uint32
Epoch => uint64
ChunkFirstOffset => uint64
ChunkCrc => int32
DataLength => uint32
Messages => [Message] // no int32 for the size for this array
Message => EntryTypeAndSize
Data => bytes
```
2020-06-02 17:13:19 +08:00
NB: See the https://github.com/rabbitmq/osiris/blob/348db0528986d6025b823bcf1ae0570aa63f5e25/src/osiris_log.erl#L49-L81[Osiris project]
for details on the structure of messages.
=== Credit
```
Credit => Key Version SubscriptionId Credit
2021-02-26 18:48:45 +08:00
Key => int16 // 9
Version => int16
SubscriptionId => int8
Credit => int16 // the number of chunks that can be sent
CreditResponse => Key Version ResponseCode SubscriptionId
2021-02-26 18:48:45 +08:00
Key => int16 // 9
Version => int16
ResponseCode => int16
SubscriptionId => int8
```
NB: the server sent a response only in case of problem, e.g. crediting an unknown subscription.
=== CommitOffset
```
CommitOffset => Key Version Reference Stream Offset
2021-02-26 18:48:45 +08:00
Key => int16 // 10
Version => int16
CorrelationId => int32 // not used yet
Reference => string // max 256 characters
SubscriptionId => uint8
Offset => int64
```
=== QueryOffset
```
QueryOffsetRequest => Key Version CorrelationId Reference Stream
2021-02-26 18:48:45 +08:00
Key => int16 // 11
Version => int16
CorrelationId => int32
Reference => string // max 256 characters
Stream => string
QueryOffsetResponse => Key Version CorrelationId ResponseCode Offset
2021-02-26 18:48:45 +08:00
Key => int16 // 11
Version => int16
CorrelationId => int32
ResponseCode => int16
Offset => uint64
```
=== Unsubscribe
```
Unsubscribe => Key Version CorrelationId SubscriptionId
2021-02-26 18:48:45 +08:00
Key => int16 // 12
Version => int16
CorrelationId => int32
SubscriptionId => int8
```
=== Create
```
Create => Key Version CorrelationId Stream Arguments
2021-02-26 18:48:45 +08:00
Key => int16 // 13
Version => int16
CorrelationId => int32
Stream => string
Arguments => [Argument]
Argument => Key Value
Key => string
Value => string
```
=== Delete
```
Delete => Key Version CorrelationId Stream
2021-02-26 18:48:45 +08:00
Key => int16 // 14
Version => int16
CorrelationId => int32
Stream => string
```
=== Metadata
```
MetadataQuery => Key Version CorrelationId [Stream]
2021-02-26 18:48:45 +08:00
Key => int16 // 15
Version => int16
CorrelationId => int32
Stream => string
MetadataResponse => Key Version CorrelationId [Broker] [StreamMetadata]
2021-02-26 18:48:45 +08:00
Key => int16 // 15
Version => int16
CorrelationId => int32
Broker => Reference Host Port
Reference => int16
Host => string
Port => int32
StreamMetadata => StreamName LeaderReference ReplicasReferences
StreamName => string
ResponseCode => int16
LeaderReference => int16
ReplicasReferences => [int16]
```
=== MetadataUpdate
```
MetadataUpdate => Key Version MetadataInfo
2021-02-26 18:48:45 +08:00
Key => int16 // 16
Version => int16
MetadataInfo => Code Stream
Code => int16 // code to identify the information
Stream => string // the stream implied
```
=== PeerProperties
```
PeerPropertiesRequest => Key Version PeerProperties
2021-02-26 18:48:45 +08:00
Key => int16 // 17
Version => int16
CorrelationId => int32
PeerProperties => [PeerProperty]
PeerProperty => Key Value
Key => string
Value => string
PeerPropertiesResponse => Key Version CorrelationId ResponseCode PeerProperties
2021-02-26 18:48:45 +08:00
Key => int16 // 17
Version => int16
CorrelationId => int32
ResponseCode => int16
PeerProperties => [PeerProperty]
PeerProperty => Key Value
Key => string
Value => string
```
=== SaslHandshake
2020-09-15 23:52:05 +08:00
```
SaslHandshakeRequest => Key Version CorrelationId Mechanism
2021-02-26 18:48:45 +08:00
Key => int16 // 18
2020-09-15 23:52:05 +08:00
Version => int16
CorrelationId => int32
SaslHandshakeResponse => Key Version CorrelationId ResponseCode [Mechanism]
2021-02-26 18:48:45 +08:00
Key => int16 // 18
2020-09-15 23:52:05 +08:00
Version => int16
CorrelationId => int32
ResponseCode => int16
Mechanism => string
2020-09-15 23:52:05 +08:00
```
=== SaslAuthenticate
```
SaslAuthenticateRequest => Key Version CorrelationId Mechanism SaslOpaqueData
2021-02-26 18:48:45 +08:00
Key => int16 // 19
Version => int16
CorrelationId => int32
Mechanism => string
SaslOpaqueData => bytes
SaslAuthenticateResponse => Key Version CorrelationId ResponseCode SaslOpaqueData
2021-02-26 18:48:45 +08:00
Key => int16 // 19
Version => int16
CorrelationId => int32
ResponseCode => int16
SaslOpaqueData => bytes
```
=== Tune
```
TuneRequest => Key Version FrameMax Heartbeat
2021-02-26 18:48:45 +08:00
Key => int16 // 20
Version => int16
FrameMax => int32 // in bytes, 0 means no limit
Heartbeat => int32 // in seconds, 0 means no heartbeat
TuneResponse => TuneRequest
```
=== Open
2020-11-26 17:02:13 +08:00
```
OpenRequest => Key Version CorrelationId VirtualHost
2021-02-26 18:48:45 +08:00
Key => int16 // 21
2020-11-26 17:02:13 +08:00
Version => int16
CorrelationId => int32
VirtualHost => string
2020-11-26 17:02:13 +08:00
OpenResponse => Key Version CorrelationId ResponseCode
2021-02-26 18:48:45 +08:00
Key => int16 // 21
2020-11-26 17:02:13 +08:00
Version => int16
CorrelationId => int32
ResponseCode => int16
```
=== Close
```
CloseRequest => Key Version CorrelationId ClosingCode ClosingReason
2021-02-26 18:48:45 +08:00
Key => int16 // 22
Version => int16
CorrelationId => int32
ClosingCode => int16
ClosingReason => string
CloseResponse => Key Version CorrelationId ResponseCode
2021-02-26 18:48:45 +08:00
Key => int16 // 22
Version => int16
CorrelationId => int32
ResponseCode => int16
```
=== Heartbeat
```
Heartbeat => Key Version
2021-02-26 18:48:45 +08:00
Key => int16 // 23
Version => int16
```
2020-05-25 23:53:34 +08:00
=== Route
_Experimental_
```
RouteQuery => Key Version CorrelationId RoutingKey SuperStream
2021-02-26 18:48:45 +08:00
Key => int16 // 24
Version => int16
CorrelationId => int32
RoutingKey => string
SuperStream => string
RouteResponse => Key Version CorrelationId Stream
2021-02-26 18:48:45 +08:00
Key => int16 // 24
Version => int16
CorrelationId => int32
Stream => string
```
2021-02-26 18:48:45 +08:00
=== Partitions
_Experimental_
```
PartitionsQuery => Key Version CorrelationId SuperStream
2021-02-26 18:48:45 +08:00
Key => int16 // 25
Version => int16
CorrelationId => int32
SuperStream => string
PartitionsResponse => Key Version CorrelationId [Stream]
2021-02-26 18:48:45 +08:00
Key => int16 // 25
Version => int16
CorrelationId => int32
Stream => string
```
2020-05-25 23:53:34 +08:00
== Authentication
Once a client is connected to the server, it initiates an authentication
sequence. The next figure shows the steps of the sequence:
[ditaa]
.Authentication Sequence
....
Client Server
+ +
| Peer Properties Exchange |
|-------------------------->|
|<--------------------------|
| |
2020-05-25 23:53:34 +08:00
| SASL Handshake |
|-------------------------->|
|<--------------------------|
| |
| SASL Authenticate |
|-------------------------->|
|<--------------------------|
| |
| Tune |
|<--------------------------|
|-------------------------->|
| |
| Open |
|-------------------------->|
|<--------------------------|
| |
+ +
....
* SaslHandshake: the client asks about the SASL mechanisms the server supports. It
can then pick one from the list the server returns.
* SaslAuthenticate: the client answers to the server's challenge(s), using the
SASL mechanism it picked. The server will send a `Tune` frame once it is satisfied
with the client authentication response.
2020-05-25 23:56:02 +08:00
* Tune: the server sends a `Tune` frame to suggest some settings (max frame size, heartbeat).
The client answers with a `Tune` frame with the settings he agrees on, possibly adjusted
from the server's suggestions.
* Open: the client sends an `Open` frame to pick a virtual host to connect to. The server
answers whether it accepts the access or not.