rabbitmq-server/deps/rabbitmq_stream/docs/PROTOCOL.adoc

406 lines
8.2 KiB
Plaintext

= RabbitMQ Stream Protocol Reference
== Types
int8, int16, int32, int64 - Signed integers (big endian order)
uint16, uint32, uint64 - Unsigned integers (big endian order)
bytes - int32 for the length followed by the bytes of content, length of -1 indicates null.
string - int16 for the length followed by the bytes of content, length of -1 indicates null.
arrays - int32 for the length followed by the repetition of the structure, notation uses [], e.g.
[int32] for an array of int32.
== Frame Structure
```
Frame => Size (Request | Response | Command)
Size => int32 (size without the 4 bytes of the size element)
Request => Key Version (CorrelationId) Content
Key => int16
Version => int16
CorrelationId => int32
Command => bytes // see command details below
Response => Key Version CorrelationId ResponseCode
Key => int16
Version => int16
CorrelationId => int32
ResponseCode => int16
Command => Key Version Content
Key => int16
Version => int16
Content => bytes // see command details below
```
Most commands are request/reply, but some commands (e.g. `Deliver`) are one-direction only and thus
doest not contain a correlation ID.
== Commands
.Stream Protocol Commands
|===
|Command |From |Key | Expects response?
|<<publish>>
|Client
|0
|No
|<<publishconfirm>>
|Server
|1
|No
|<<subscribe>>
|Client
|2
|Yes
|<<deliver>>
|Server
|3
|No
|<<credit>>
|Client
|4
|No
|<<unsubscribe>>
|Client
|5
|Yes
|<<publisherror>>
|Server
|6
|No
|<<metadataupdate>>
|Server
|7
|No
|<<metadata>>
|Client
|8
|No
|<<saslhandshake>>
|Client
|9
|Yes
|<<saslauthenticate>>
|Client
|10
|Yes
|<<tune>>
|Server
|11
|Yes
|<<open>>
|Server
|12
|Yes
|<<close>>
|Client & Server
|13
|Yes
|<<heartbeat>>
|Client & Server
|14
|No
|<<create>>
|Client
|998
|Yes
|<<delete>>
|Client
|999
|Yes
|===
=== Publish
```
Publish => Key Version Stream PublishedMessages
Key => int16 // 0
Version => int16
Stream => string // the name of the stream
PublishedMessages => [PublishedMessage]
PublishedMessage => PublishingId Message
Message => bytes
```
=== PublishConfirm
```
PublishConfirm => Key Version PublishingIds
Key => int16 // 1
Version => int16
PublishingIds => [int64] // to correlate with the messages sent
```
=== Subscribe
```
Subscribe => Key Version CorrelationId SubscriptionId Stream Offset
Key => int16 // 2
Version => int16
CorrelationId => int32 // correlation id to correlate the response
SubscriptionId => int32 // client-supplied id to identify the subscription
Stream => string // the name of the stream
Offset => uint64 // the offset to start consuming from
```
=== Deliver
```
Deliver => Key Version SubscriptionId OsirisChunk
Key => int16 // 3
Version => int32
SubscriptionId => int32
OsirisChunk => MagicVersion NumEntries NumRecords Epoch ChunkFirstOffset ChunkCrc DataLength Messages
MagicVersion => int8
NumEntries => uint16
NumRecords => uint32
Epoch => uint64
ChunkFirstOffset => uint64
ChunkCrc => int32
DataLength => uint32
Messages => [Message] // no int32 for the size for this array
Message => EntryTypeAndSize
Data => bytes
```
NB: See the https://github.com/rabbitmq/osiris/blob/8e30a2c78902f2721c1bd6a0d331a8b82cfe774c/src/osiris_log.erl#L46-L77[Osiris project]
for details on the structure of messages.
=== Credit
```
Credit => Key Version SubscriptionId Credit
Key => int16 // 4
Version => int16
SubscriptionId => int32
Credit => int16 // the number of chunks that can be sent
```
=== Unsubscribe
```
Unsubscribe => Key Version CorrelationId SubscriptionId
Key => int16 // 5
Version => int16
CorrelationId => int32
SubscriptionId => int32
```
=== PublishError
```
PublishError => Key Version [PublishingError]
Key => int16 // 6
Version => int16
PublishingError => PublishingId Code
PublishingId => int64
Code => int16 // code to identify the problem
```
=== MetadataUpdate
```
MetadataUpdate => Key Version MetadataInfo
Key => int16 // 7
Version => int16
MetadataInfo => Code Stream
Code => int16 // code to identify the information
Stream => string // the stream implied
```
=== Metadata
```
MetadataQuery => Key Version CorrelationId [Stream]
Key => int16 // 8
Version => int16
CorrelationId => int32
Stream => string
MetadataResponse => Key Version CorrelationId [Broker] [StreamMetadata]
Key => int16 // 8
Version => int16
CorrelationId => int32
Broker => Reference Host Port
Reference => int16
Host => string
Port => int32
StreamMetadata => StreamName LeaderReference ReplicasReferences
StreamName => string
ResponseCode => int16
LeaderReference => int16
ReplicasReferences => [int16]
```
=== SaslHandshake
```
SaslHandshakeRequest => Key Version CorrelationId Mechanism
Key => int16 // 9
Version => int16
CorrelationId => int32
SaslHandshakeResponse => Key Version CorrelationId ResponseCode [Mechanism]
Key => int16 // 9
Version => int16
CorrelationId => int32
ResponseCode => int16
Mechanism => string
```
=== SaslAuthenticate
```
SaslAuthenticateRequest => Key Version CorrelationId Mechanism SaslOpaqueData
Key => int16 // 10
Version => int16
CorrelationId => int32
Mechanism => string
SaslOpaqueData => bytes
SaslAuthenticateResponse => Key Version CorrelationId ResponseCode SaslOpaqueData
Key => int16 // 10
Version => int16
CorrelationId => int32
ResponseCode => int16
SaslOpaqueData => bytes
```
=== Tune
```
TuneRequest => Key Version FrameMax Heartbeat
Key => int16 // 11, to identify the command
Version => int16
FrameMax => int32 // in bytes, 0 means no limit
Heartbeat => int32 // in seconds, 0 means no heartbeat
TuneResponse => TuneRequest
```
=== Open
```
OpenRequest => Key Version CorrelationId VirtualHost
Key => int16 // 12
Version => int16
CorrelationId => int32
VirtualHost => string
OpenResponse => Key Version CorrelationId ResponseCode
Key => int16 // 12
Version => int16
CorrelationId => int32
ResponseCode => int16
```
=== Close
```
CloseRequest => Key Version CorrelationId ClosingCode ClosingReason
Key => int16 // 13
Version => int16
CorrelationId => int32
ClosingCode => int16
ClosingReason => string
CloseResponse => Key Version CorrelationId ResponseCode
Key => int16 // 13
Version => int16
CorrelationId => int32
ResponseCode => int16
```
=== Heartbeat
```
Heartbeat => Key Version
Key => int16 // 14
Version => int16
```
=== Create
```
Create => Key Version CorrelationId Stream Arguments
Key => int16 // 998
Version => int16
CorrelationId => int32
Stream => string
Arguments => [Argument]
Argument => Key Value
Key => string
Value => string
```
=== Delete
```
Delete => Key Version CorrelationId Stream
Key => int16 // 999
Version => int16
CorrelationId => int32
Stream => string
```
== Authentication
Once a client is connected to the server, it initiates an authentication
sequence. The next figure shows the steps of the sequence:
[ditaa]
.Authentication Sequence
....
Client Server
+ +
| SASL Handshake |
|-------------------------->|
|<--------------------------|
| |
| SASL Authenticate |
|-------------------------->|
|<--------------------------|
| |
| Tune |
|<--------------------------|
|-------------------------->|
| |
| Open |
|-------------------------->|
|<--------------------------|
| |
+ +
....
* SaslHandshake: the client asks about the SASL mechanisms the server supports. It
can then pick one from the list the server returns.
* SaslAuthenticate: the client answers to the server's challenge(s), using the
SASL mechanism it picked. The server will send a `Tune` frame once it is satisfied
with the client authentication response.
* Tune: the server sends a `Tune` to suggest some settings (max frame size, heartbeat).
The client answers with a `Tune` with the settings he agrees on, possibly adjusted
from the server's suggestion.
* Open: the client sends a `Open` frame to pick a virtual host to connect. The server
answers whether it accepts the access or not.