rabbitmq-server/deps/rabbitmq_stream/docs/PROTOCOL.adoc

565 lines
12 KiB
Plaintext
Raw Normal View History

2020-05-25 21:55:51 +08:00
= RabbitMQ Stream Protocol Reference
2020-06-09 19:53:49 +08:00
This is the reference of the RabbitMQ stream protocol. Note the protocol
is still under development and is subject to change.
The https://github.com/rabbitmq/rabbitmq-stream-java-client[RabbitMQ stream Java client]
is currently the reference implementation.
2020-05-25 21:55:51 +08:00
== Types
int8, int16, int32, int64 - Signed integers (big endian order)
uint8, uint16, uint32, uint64 - Unsigned integers (big endian order)
2020-05-25 21:55:51 +08:00
bytes - int32 for the length followed by the bytes of content, length of -1 indicates null.
string - int16 for the length followed by the bytes of content, length of -1 indicates null.
arrays - int32 for the length followed by the repetition of the structure, notation uses [], e.g.
[int32] for an array of int32.
== Frame Structure
```
Frame => Size (Request | Response | Command)
Size => int32 (size without the 4 bytes of the size element)
Request => Key Version (CorrelationId) Content
Key => int16
Version => int16
CorrelationId => int32
Command => bytes // see command details below
Response => Key Version CorrelationId ResponseCode
Key => int16
Version => int16
CorrelationId => int32
ResponseCode => int16
Command => Key Version Content
Key => int16
Version => int16
Content => bytes // see command details below
```
Most commands are request/reply, but some commands (e.g. `Deliver`) are one-direction only and thus
doest not contain a correlation ID.
== Commands
.Stream Protocol Commands
|===
|Command |From |Key | Expects response?
|<<declareproducer>>
2020-05-25 21:55:51 +08:00
|Client
|0
|Yes
2020-05-25 21:55:51 +08:00
|<<publish>>
|Client
2020-05-25 21:55:51 +08:00
|1
|No
|<<publishconfirm>>
|Server
|2
|No
|<<publisherror>>
|Server
|3
|No
|<<querypublishersequence>>
|Client
|4
|Yes
|<<deleteproducer>>
|Client
|5
|Yes
|<<subscribe>>
|Client
|6
|Yes
|<<deliver>>
|Server
|7
|No
|<<credit>>
|Client
|8
|No
|<<commitoffset>>
|Client
|9
|No
|<<queryoffset>>
|Client
|10
|Yes
|<<unsubscribe>>
|Client
|11
|Yes
|<<create>>
|Client
|12
|Yes
|<<delete>>
|Client
|13
|Yes
|<<metadata>>
|Client
|14
|No
|<<metadataupdate>>
|Server
|15
|No
|<<peerproperties>>
2020-09-15 23:52:05 +08:00
|Client
|16
|Yes
2020-09-15 23:52:05 +08:00
|<<saslhandshake>>
2020-09-15 23:52:05 +08:00
|Client
|17
|Yes
|<<saslauthenticate>>
|Client
|18
|Yes
|<<tune>>
|Server
|19
|Yes
|<<open>>
|Server
2020-11-26 17:02:13 +08:00
|20
|Yes
|<<close>>
|Client & Server
|21
|Yes
|<<heartbeat>>
|Client & Server
|22
|No
2020-05-25 21:55:51 +08:00
|===
=== DeclarePublisher
```
DeclarePublisherRequest => Key Version CorrelationId PublisherId [PublisherReference] Stream
Key => int16 // 0
Version => int16
CorrelationId => int32
PublisherId => uint8
PublisherReference => string // max 256 characters
Stream => string
DeclarePublisherResponse => Key Version CorrelationId ResponseCode PublisherId
Key => int16 // 18
Version => int16
CorrelationId => int32
ResponseCode => int16
```
2020-05-25 21:55:51 +08:00
=== Publish
```
Publish => Key Version Stream PublishedMessages
Key => int16 // 1
2020-05-25 21:55:51 +08:00
Version => int16
PublisherId => uint8
2020-05-25 21:55:51 +08:00
PublishedMessages => [PublishedMessage]
PublishedMessage => PublishingId Message
PublishingId => int64
2020-05-25 21:55:51 +08:00
Message => bytes
```
=== PublishConfirm
```
PublishConfirm => Key Version PublishingIds
Key => int16 // 2
2020-05-25 21:55:51 +08:00
Version => int16
PublisherId => uint8
2020-05-25 21:55:51 +08:00
PublishingIds => [int64] // to correlate with the messages sent
```
=== PublishError
```
PublishError => Key Version [PublishingError]
Key => int16 // 3
Version => int16
PublisherId => uint8
PublishingError => PublishingId Code
PublishingId => int64
Code => int16 // code to identify the problem
```
=== QueryPublisherSequence
```
QueryPublisherRequest => Key Version CorrelationId PublisherReference Stream
Key => int16 // 4
Version => int16
CorrelationId => int32
PublisherReference => string // max 256 characters
Stream => string
QueryPublisherResponse => Key Version CorrelationId ResponseCode Sequence
Key => int16 // 4
Version => int16
CorrelationId => int32
ResponseCode => int16
Sequence => uint64
```
=== DeletePublisher
```
DeletePublisherRequest => Key Version CorrelationId PublisherId
Key => int16 // 5
Version => int16
CorrelationId => int32
PublisherId => uint8
DeletePublisherResponse => Key Version CorrelationId ResponseCode
Key => int16 // 5
Version => int16
CorrelationId => int32
ResponseCode => int16
```
=== Subscribe
```
Subscribe => Key Version CorrelationId SubscriptionId Stream OffsetSpecification Credit
Key => int16 // 6
Version => int16
CorrelationId => int32 // correlation id to correlate the response
SubscriptionId => uint8 // client-supplied id to identify the subscription
Stream => string // the name of the stream
OffsetSpecification => OffsetType Offset
OffsetType => int16 // 0 (first), 1 (last), 2 (next), 3 (offset), 4 (timestamp)
Offset => uint64 (for offset) | int64 (for timestamp)
Credit => int16
```
=== Deliver
```
Deliver => Key Version SubscriptionId OsirisChunk
Key => int16 // 7
Version => int32
SubscriptionId => uint8
OsirisChunk => MagicVersion NumEntries NumRecords Epoch ChunkFirstOffset ChunkCrc DataLength Messages
MagicVersion => int8
NumEntries => uint16
NumRecords => uint32
Epoch => uint64
ChunkFirstOffset => uint64
ChunkCrc => int32
DataLength => uint32
Messages => [Message] // no int32 for the size for this array
Message => EntryTypeAndSize
Data => bytes
```
2020-06-02 17:13:19 +08:00
NB: See the https://github.com/rabbitmq/osiris/blob/348db0528986d6025b823bcf1ae0570aa63f5e25/src/osiris_log.erl#L49-L81[Osiris project]
for details on the structure of messages.
=== Credit
```
Credit => Key Version SubscriptionId Credit
Key => int16 // 8
Version => int16
SubscriptionId => int8
Credit => int16 // the number of chunks that can be sent
CreditResponse => Key Version ResponseCode SubscriptionId
Key => int16 // 8
Version => int16
ResponseCode => int16
SubscriptionId => int8
```
NB: the server sent a response only in case of problem, e.g. crediting an unknown subscription.
=== CommitOffset
```
CommitOffset => Key Version Reference Stream Offset
Key => int16 // 9
Version => int16
CorrelationId => int32 // not used yet
Reference => string // max 256 characters
SubscriptionId => uint8
Offset => int64
```
=== QueryOffset
```
QueryOffsetRequest => Key Version CorrelationId Reference Stream
Key => int16 // 10
Version => int16
CorrelationId => int32
Reference => string // max 256 characters
Stream => string
QueryOffsetResponse => Key Version CorrelationId ResponseCode Offset
Key => int16 // 10
Version => int16
CorrelationId => int32
ResponseCode => int16
Offset => uint64
```
=== Unsubscribe
```
Unsubscribe => Key Version CorrelationId SubscriptionId
Key => int16 // 11
Version => int16
CorrelationId => int32
SubscriptionId => int8
```
=== Create
```
Create => Key Version CorrelationId Stream Arguments
Key => int16 // 12
Version => int16
CorrelationId => int32
Stream => string
Arguments => [Argument]
Argument => Key Value
Key => string
Value => string
```
=== Delete
```
Delete => Key Version CorrelationId Stream
Key => int16 // 13
Version => int16
CorrelationId => int32
Stream => string
```
=== Metadata
```
MetadataQuery => Key Version CorrelationId [Stream]
Key => int16 // 14
Version => int16
CorrelationId => int32
Stream => string
MetadataResponse => Key Version CorrelationId [Broker] [StreamMetadata]
Key => int16 // 14
Version => int16
CorrelationId => int32
Broker => Reference Host Port
Reference => int16
Host => string
Port => int32
StreamMetadata => StreamName LeaderReference ReplicasReferences
StreamName => string
ResponseCode => int16
LeaderReference => int16
ReplicasReferences => [int16]
```
=== MetadataUpdate
```
MetadataUpdate => Key Version MetadataInfo
Key => int16 // 15
Version => int16
MetadataInfo => Code Stream
Code => int16 // code to identify the information
Stream => string // the stream implied
```
=== PeerProperties
```
PeerPropertiesRequest => Key Version PeerProperties
Key => int16 // 16
Version => int16
CorrelationId => int32
PeerProperties => [PeerProperty]
PeerProperty => Key Value
Key => string
Value => string
PeerPropertiesResponse => Key Version CorrelationId ResponseCode PeerProperties
Key => int16 // 16
Version => int16
CorrelationId => int32
ResponseCode => int16
PeerProperties => [PeerProperty]
PeerProperty => Key Value
Key => string
Value => string
```
=== SaslHandshake
2020-09-15 23:52:05 +08:00
```
SaslHandshakeRequest => Key Version CorrelationId Mechanism
2020-09-15 23:52:05 +08:00
Key => int16 // 17
Version => int16
CorrelationId => int32
SaslHandshakeResponse => Key Version CorrelationId ResponseCode [Mechanism]
2020-09-15 23:52:05 +08:00
Key => int16 // 17
Version => int16
CorrelationId => int32
ResponseCode => int16
Mechanism => string
2020-09-15 23:52:05 +08:00
```
=== SaslAuthenticate
```
SaslAuthenticateRequest => Key Version CorrelationId Mechanism SaslOpaqueData
Key => int16 // 18
Version => int16
CorrelationId => int32
Mechanism => string
SaslOpaqueData => bytes
SaslAuthenticateResponse => Key Version CorrelationId ResponseCode SaslOpaqueData
Key => int16 // 18
Version => int16
CorrelationId => int32
ResponseCode => int16
SaslOpaqueData => bytes
```
=== Tune
```
TuneRequest => Key Version FrameMax Heartbeat
Key => int16 // 19
Version => int16
FrameMax => int32 // in bytes, 0 means no limit
Heartbeat => int32 // in seconds, 0 means no heartbeat
TuneResponse => TuneRequest
```
=== Open
2020-11-26 17:02:13 +08:00
```
OpenRequest => Key Version CorrelationId VirtualHost
2020-11-26 17:02:13 +08:00
Key => int16 // 20
Version => int16
CorrelationId => int32
VirtualHost => string
2020-11-26 17:02:13 +08:00
OpenResponse => Key Version CorrelationId ResponseCode
2020-11-26 17:02:13 +08:00
Key => int16 // 20
Version => int16
CorrelationId => int32
ResponseCode => int16
```
=== Close
```
CloseRequest => Key Version CorrelationId ClosingCode ClosingReason
Key => int16 // 21
Version => int16
CorrelationId => int32
ClosingCode => int16
ClosingReason => string
CloseResponse => Key Version CorrelationId ResponseCode
Key => int16 // 21
Version => int16
CorrelationId => int32
ResponseCode => int16
```
=== Heartbeat
```
Heartbeat => Key Version
Key => int16 // 22
Version => int16
```
2020-05-25 23:53:34 +08:00
== Authentication
Once a client is connected to the server, it initiates an authentication
sequence. The next figure shows the steps of the sequence:
[ditaa]
.Authentication Sequence
....
Client Server
+ +
| Peer Properties Exchange |
|-------------------------->|
|<--------------------------|
| |
2020-05-25 23:53:34 +08:00
| SASL Handshake |
|-------------------------->|
|<--------------------------|
| |
| SASL Authenticate |
|-------------------------->|
|<--------------------------|
| |
| Tune |
|<--------------------------|
|-------------------------->|
| |
| Open |
|-------------------------->|
|<--------------------------|
| |
+ +
....
* SaslHandshake: the client asks about the SASL mechanisms the server supports. It
can then pick one from the list the server returns.
* SaslAuthenticate: the client answers to the server's challenge(s), using the
SASL mechanism it picked. The server will send a `Tune` frame once it is satisfied
with the client authentication response.
2020-05-25 23:56:02 +08:00
* Tune: the server sends a `Tune` frame to suggest some settings (max frame size, heartbeat).
The client answers with a `Tune` frame with the settings he agrees on, possibly adjusted
from the server's suggestions.
* Open: the client sends an `Open` frame to pick a virtual host to connect to. The server
2020-05-25 23:53:34 +08:00
answers whether it accepts the access or not.