841 lines
19 KiB
Plaintext
841 lines
19 KiB
Plaintext
= RabbitMQ Streams Protocol Reference
|
|
|
|
This is the reference of the RabbitMQ Streams protocol.
|
|
|
|
The https://github.com/rabbitmq/rabbitmq-stream-java-client[RabbitMQ Stream Java client]
|
|
is currently the reference implementation.
|
|
|
|
== Types
|
|
|
|
int8, int16, int32, int64 - Signed integers (big endian order)
|
|
|
|
uint8, uint16, uint32, uint64 - Unsigned integers (big endian order)
|
|
|
|
bytes - int32 for the length followed by the bytes of content, length of -1 indicates null.
|
|
|
|
string - int16 for the length followed by the bytes of UTF8-encoded content, length of -1 indicates null.
|
|
|
|
arrays - int32 for the length followed by the repetition of the structure, notation uses [], e.g.
|
|
[int32] for an array of int32.
|
|
|
|
== Frame Structure
|
|
|
|
```
|
|
Frame => Size (Request | Response | Command)
|
|
Size => uint32 (size without the 4 bytes of the size element)
|
|
|
|
Request => Key Version (CorrelationId) Content
|
|
Key => uint16
|
|
Version => uint16
|
|
CorrelationId => uint32
|
|
Command => bytes // see command details below
|
|
|
|
Response => Key Version CorrelationId ResponseCode
|
|
Key => uint16
|
|
Version => uint16
|
|
CorrelationId => uint32
|
|
ResponseCode => uint16
|
|
|
|
Command => Key Version Content
|
|
Key => uint16
|
|
Version => uint16
|
|
Content => bytes // see command details below
|
|
```
|
|
|
|
Most commands are request/reply, but some commands (e.g. `Deliver`) are one-direction only and thus
|
|
does not contain a correlation ID.
|
|
|
|
Some responses may carry additional information than just the response code, this is specified in the command definition.
|
|
|
|
Keys are uint16, but the actual value is defined on the last 15 bits, the most significant bit being
|
|
used to make the difference between a request (0) and a response (1). Example for `subscribe`
|
|
(key is 6):
|
|
|
|
```
|
|
0x0006 => subscribe request
|
|
0x8006 => subscribe response
|
|
```
|
|
|
|
== Response Codes
|
|
|
|
.Stream Protocol Response Codes
|
|
|===
|
|
|Response|Code
|
|
|
|
|OK|0x01
|
|
|Stream does not exist|0x02
|
|
|Subscription ID already exists|0x03
|
|
|Subscription ID does not exist|0x04
|
|
|Stream already exists|0x05
|
|
|Stream not available|0x06
|
|
|SASL mechanism not supported|0x07
|
|
|Authentication failure|0x08
|
|
|SASL error|0x09
|
|
|SASL challenge|0x0a
|
|
|SASL authentication failure loopback|0x0b
|
|
|Virtual host access failure|0x0c
|
|
|Unknown frame|0x0d
|
|
|Frame too large|0x0e
|
|
|Internal error|0x0f
|
|
|Access refused|0x10
|
|
|Precondition failed|0x11
|
|
|Publisher does not exist|0x12
|
|
|No offset|0x13
|
|
|
|
|===
|
|
|
|
== Commands
|
|
|
|
.Stream Protocol Commands
|
|
|===
|
|
|Command |From |Key | Expects response?
|
|
|
|
|<<declarepublisher>>
|
|
|Client
|
|
|0x0001
|
|
|Yes
|
|
|
|
|<<publish>>
|
|
|Client
|
|
|0x0002
|
|
|No
|
|
|
|
|<<publishconfirm>>
|
|
|Server
|
|
|0x0003
|
|
|No
|
|
|
|
|<<publisherror>>
|
|
|Server
|
|
|0x0004
|
|
|No
|
|
|
|
|<<querypublishersequence>>
|
|
|Client
|
|
|0x0005
|
|
|Yes
|
|
|
|
|<<deletepublisher>>
|
|
|Client
|
|
|0x0006
|
|
|Yes
|
|
|
|
|<<subscribe>>
|
|
|Client
|
|
|0x0007
|
|
|Yes
|
|
|
|
|<<deliver>>
|
|
|Server
|
|
|0x0008
|
|
|No
|
|
|
|
|<<credit>>
|
|
|Client
|
|
|0x0009
|
|
|No
|
|
|
|
|<<storeoffset>>
|
|
|Client
|
|
|0x000a
|
|
|No
|
|
|
|
|<<queryoffset>>
|
|
|Client
|
|
|0x000b
|
|
|Yes
|
|
|
|
|<<unsubscribe>>
|
|
|Client
|
|
|0x000c
|
|
|Yes
|
|
|
|
|<<create>>
|
|
|Client
|
|
|0x000d
|
|
|Yes
|
|
|
|
|<<delete>>
|
|
|Client
|
|
|0x000e
|
|
|Yes
|
|
|
|
|<<metadata>>
|
|
|Client
|
|
|0x000f
|
|
|Yes
|
|
|
|
|<<metadataupdate>>
|
|
|Server
|
|
|0x0010
|
|
|No
|
|
|
|
|<<peerproperties>>
|
|
|Client
|
|
|0x0011
|
|
|Yes
|
|
|
|
|<<saslhandshake>>
|
|
|Client
|
|
|0x0012
|
|
|Yes
|
|
|
|
|<<saslauthenticate>>
|
|
|Client
|
|
|0x0013
|
|
|Yes
|
|
|
|
|<<tune>>
|
|
|Server
|
|
|0x0014
|
|
|Yes
|
|
|
|
|<<open>>
|
|
|Client
|
|
|0x0015
|
|
|Yes
|
|
|
|
|<<close>>
|
|
|Client & Server
|
|
|0x0016
|
|
|Yes
|
|
|
|
|<<heartbeat>>
|
|
|Client & Server
|
|
|0x0017
|
|
|No
|
|
|
|
|<<route>>
|
|
|Client
|
|
|0x0018
|
|
|Yes
|
|
|
|
|<<partitions>>
|
|
|Client
|
|
|0x0019
|
|
|Yes
|
|
|
|
|<<consumerupdate>>
|
|
|Server
|
|
|0x001a
|
|
|Yes
|
|
|
|
|<<exchangecommandversions>>
|
|
|Client
|
|
|0x001b
|
|
|Yes
|
|
|
|
|<<streamstats>>
|
|
|Client
|
|
|0x001c
|
|
|Yes
|
|
|
|
|<<createsuperstream>>
|
|
|Client
|
|
|0x001d
|
|
|Yes
|
|
|
|
|<<deletesuperstream>>
|
|
|Client
|
|
|0x001e
|
|
|Yes
|
|
|
|
|===
|
|
|
|
=== DeclarePublisher
|
|
|
|
```
|
|
DeclarePublisherRequest => Key Version CorrelationId PublisherId [PublisherReference] Stream
|
|
Key => uint16 // 0x0001
|
|
Version => uint16
|
|
CorrelationId => uint32
|
|
PublisherId => uint8
|
|
PublisherReference => string // max 256 characters
|
|
Stream => string
|
|
|
|
DeclarePublisherResponse => Key Version CorrelationId ResponseCode
|
|
Key => uint16 // 0x8001
|
|
Version => uint16
|
|
CorrelationId => uint32
|
|
ResponseCode => uint16
|
|
```
|
|
|
|
=== Publish
|
|
|
|
Version 1
|
|
|
|
```
|
|
Publish => Key Version PublisherId PublishedMessages
|
|
Key => uint16 // 0x0002
|
|
Version => uint16
|
|
PublisherId => uint8
|
|
PublishedMessages => [PublishedMessage]
|
|
PublishedMessage => PublishingId Message
|
|
PublishingId => uint64
|
|
Message => bytes
|
|
```
|
|
|
|
Version 2
|
|
|
|
```
|
|
Publish => Key Version PublisherId PublishedMessages
|
|
Key => uint16 // 0x0002
|
|
Version => uint16
|
|
PublisherId => uint8
|
|
PublishedMessages => [PublishedMessage]
|
|
PublishedMessage => PublishingId Message
|
|
PublishingId => uint64
|
|
FilterValue => string
|
|
Message => bytes
|
|
```
|
|
|
|
1. Use version 1 if there is no filter value.
|
|
2. Use version 2 if there is a filter value.
|
|
|
|
=== PublishConfirm
|
|
|
|
```
|
|
PublishConfirm => Key Version PublishingIds
|
|
Key => uint16 // 0x0003
|
|
Version => uint16
|
|
PublisherId => uint8
|
|
PublishingIds => [uint64] // to correlate with the messages sent
|
|
```
|
|
|
|
=== PublishError
|
|
|
|
```
|
|
PublishError => Key Version [PublishingError]
|
|
Key => uint16 // 0x0004
|
|
Version => uint16
|
|
PublisherId => uint8
|
|
PublishingError => PublishingId Code
|
|
PublishingId => uint64
|
|
Code => uint16 // code to identify the problem
|
|
```
|
|
|
|
=== QueryPublisherSequence
|
|
|
|
```
|
|
QueryPublisherRequest => Key Version CorrelationId PublisherReference Stream
|
|
Key => uint16 // 0x0005
|
|
Version => uint16
|
|
CorrelationId => uint32
|
|
PublisherReference => string // max 256 characters
|
|
Stream => string
|
|
|
|
QueryPublisherResponse => Key Version CorrelationId ResponseCode Sequence
|
|
Key => uint16 // 0x8005
|
|
Version => uint16
|
|
CorrelationId => uint32
|
|
ResponseCode => uint16
|
|
Sequence => uint64
|
|
```
|
|
|
|
=== DeletePublisher
|
|
|
|
```
|
|
DeletePublisherRequest => Key Version CorrelationId PublisherId
|
|
Key => uint16 // 0x0006
|
|
Version => uint16
|
|
CorrelationId => uint32
|
|
PublisherId => uint8
|
|
|
|
DeletePublisherResponse => Key Version CorrelationId ResponseCode
|
|
Key => uint16 // 0x8006
|
|
Version => uint16
|
|
CorrelationId => uint32
|
|
ResponseCode => uint16
|
|
```
|
|
|
|
=== Subscribe
|
|
|
|
```
|
|
Subscribe => Key Version CorrelationId SubscriptionId Stream OffsetSpecification Credit Properties
|
|
Key => uint16 // 0x0007
|
|
Version => uint16
|
|
CorrelationId => uint32 // correlation id to correlate the response
|
|
SubscriptionId => uint8 // client-supplied id to identify the subscription
|
|
Stream => string // the name of the stream
|
|
OffsetSpecification => OffsetType Offset
|
|
OffsetType => uint16 // 1 (first), 2 (last), 3 (next), 4 (offset), 5 (timestamp)
|
|
Offset => uint64 (for offset) | int64 (for timestamp)
|
|
Credit => uint16
|
|
Properties => [Property]
|
|
Property => Key Value
|
|
Key => string
|
|
Value => string
|
|
```
|
|
|
|
NB: Timestamp is https://www.erlang.org/doc/apps/erts/time_correction.html#Erlang_System_Time[Erlang system time],
|
|
milliseconds from epoch
|
|
|
|
Supported properties:
|
|
|
|
* `single-active-consumer`: set to `true` to enable https://blog.rabbitmq.com/posts/2022/07/rabbitmq-3-11-feature-preview-single-active-consumer-for-streams/[single active consumer] for this subscription.
|
|
* `super-stream`: set to the name of the super stream the subscribed is a partition of.
|
|
* `filter.` (e.g. `filter.0`, `filter.1`, etc): prefix to use to define filter values for the subscription.
|
|
* `match-unfiltered`: whether to return messages without any filter value or not.
|
|
|
|
=== Deliver
|
|
|
|
Version 1
|
|
|
|
```
|
|
Deliver => Key Version SubscriptionId OsirisChunk
|
|
Key => uint16 // 0x0008
|
|
Version => uint16
|
|
SubscriptionId => uint8
|
|
OsirisChunk => MagicVersion ChunkType NumEntries NumRecords Timestamp Epoch ChunkFirstOffset ChunkCrc DataLength TrailerLength BloomSize Reserved Messages
|
|
MagicVersion => int8
|
|
ChunkType => int8 // 0: user, 1: tracking delta, 2: tracking snapshot
|
|
NumEntries => uint16
|
|
NumRecords => uint32
|
|
Timestamp => int64 // erlang system time in milliseconds, since epoch
|
|
Epoch => uint64
|
|
ChunkFirstOffset => uint64
|
|
ChunkCrc => int32
|
|
DataLength => uint32
|
|
TrailerLength => uint32
|
|
BloomSize => uint8 // size of bloom filter data, ignored at the moment
|
|
Reserved => uint24 // 24 bits reserved for future use
|
|
Messages => [Message] // a continous collection of messages, the size of the array is defined by NumEntries
|
|
Message => EntryTypeAndSize
|
|
Data => bytes
|
|
```
|
|
|
|
Version 2
|
|
|
|
```
|
|
Deliver => Key Version SubscriptionId CommittedOffset OsirisChunk
|
|
Key => uint16 // 0x0008
|
|
Version => uint16
|
|
SubscriptionId => uint8
|
|
CommittedChunkId => uint64
|
|
OsirisChunk => MagicVersion ChunkType NumEntries NumRecords Timestamp Epoch ChunkFirstOffset ChunkCrc DataLength TrailerLength BloomSize Reserved Messages
|
|
MagicVersion => int8
|
|
ChunkType => int8 // 0: user, 1: tracking delta, 2: tracking snapshot
|
|
NumEntries => uint16
|
|
NumRecords => uint32
|
|
Timestamp => int64 // erlang system time in milliseconds, since epoch
|
|
Epoch => uint64
|
|
ChunkFirstOffset => uint64
|
|
ChunkCrc => int32
|
|
DataLength => uint32
|
|
TrailerLength => uint32
|
|
BloomSize => uint8 // size of bloom filter data, ignored at the moment
|
|
Reserved => uint24 // 24 bits reserved for future use
|
|
Messages => [Message] // a continous collection of messages, the size of the array is defined by NumEntries
|
|
Message => EntryTypeAndSize
|
|
Data => bytes
|
|
```
|
|
|
|
|
|
NB: See the https://github.com/rabbitmq/osiris/blob/12a430b11be2c2be3f26ce4f2d7268954c7ec02b/src/osiris_log.erl#L126-L195[Osiris project]
|
|
for details on the structure of messages.
|
|
|
|
=== Credit
|
|
|
|
```
|
|
Credit => Key Version SubscriptionId Credit
|
|
Key => uint16 // 0x0009
|
|
Version => uint16
|
|
SubscriptionId => uint8
|
|
Credit => uint16 // the number of chunks that can be sent
|
|
|
|
CreditResponse => Key Version ResponseCode SubscriptionId
|
|
Key => uint16 // 0x8009
|
|
Version => uint16
|
|
ResponseCode => uint16
|
|
SubscriptionId => uint8
|
|
```
|
|
|
|
NB: the server sent a response only in case of problem, e.g. crediting an unknown subscription.
|
|
|
|
=== StoreOffset
|
|
|
|
```
|
|
StoreOffset => Key Version Reference Stream Offset
|
|
Key => uint16 // 0x000a
|
|
Version => uint16
|
|
Reference => string // max 256 characters
|
|
Stream => string // the name of the stream
|
|
Offset => uint64
|
|
```
|
|
|
|
=== QueryOffset
|
|
|
|
```
|
|
QueryOffsetRequest => Key Version CorrelationId Reference Stream
|
|
Key => uint16 // 0x000b
|
|
Version => uint16
|
|
CorrelationId => uint32
|
|
Reference => string // max 256 characters
|
|
Stream => string
|
|
|
|
QueryOffsetResponse => Key Version CorrelationId ResponseCode Offset
|
|
Key => uint16 // 0x800b
|
|
Version => uint16
|
|
CorrelationId => uint32
|
|
ResponseCode => uint16
|
|
Offset => uint64
|
|
```
|
|
|
|
=== Unsubscribe
|
|
|
|
```
|
|
Unsubscribe => Key Version CorrelationId SubscriptionId
|
|
Key => uint16 // 0x000c
|
|
Version => uint16
|
|
CorrelationId => uint32
|
|
SubscriptionId => uint8
|
|
|
|
UnsubscribeResponse => Key Version CorrelationId ResponseCode
|
|
Key => uint16 // 0x800c
|
|
Version => uint16
|
|
CorrelationId => uint32
|
|
ResponseCode => uint16
|
|
```
|
|
|
|
=== Create
|
|
|
|
```
|
|
Create => Key Version CorrelationId Stream Arguments
|
|
Key => uint16 // 0x000d
|
|
Version => uint16
|
|
CorrelationId => uint32
|
|
Stream => string
|
|
Arguments => [Argument]
|
|
Argument => Key Value
|
|
Key => string
|
|
Value => string
|
|
```
|
|
|
|
=== Delete
|
|
|
|
```
|
|
Delete => Key Version CorrelationId Stream
|
|
Key => uint16 // 0x000e
|
|
Version => uint16
|
|
CorrelationId => uint32
|
|
Stream => string
|
|
```
|
|
|
|
=== Metadata
|
|
|
|
```
|
|
MetadataQuery => Key Version CorrelationId [Stream]
|
|
Key => uint16 // 0x000f
|
|
Version => uint16
|
|
CorrelationId => uint32
|
|
Stream => string
|
|
|
|
MetadataResponse => Key Version CorrelationId [Broker] [StreamMetadata]
|
|
Key => uint16 // 0x800f
|
|
Version => uint16
|
|
CorrelationId => uint32
|
|
Broker => Reference Host Port
|
|
Reference => uint16
|
|
Host => string
|
|
Port => uint32
|
|
StreamMetadata => StreamName ResponseCode LeaderReference ReplicasReferences
|
|
StreamName => string
|
|
ResponseCode => uint16
|
|
LeaderReference => uint16
|
|
ReplicasReferences => [uint16]
|
|
```
|
|
|
|
=== MetadataUpdate
|
|
|
|
```
|
|
MetadataUpdate => Key Version MetadataInfo
|
|
Key => uint16 // 0x0010
|
|
Version => uint16
|
|
MetadataInfo => Code Stream
|
|
Code => uint16 // code to identify the information
|
|
Stream => string // the stream implied
|
|
```
|
|
|
|
=== PeerProperties
|
|
|
|
```
|
|
PeerPropertiesRequest => Key Version PeerProperties
|
|
Key => uint16 // 0x0011
|
|
Version => uint16
|
|
CorrelationId => uint32
|
|
PeerProperties => [PeerProperty]
|
|
PeerProperty => Key Value
|
|
Key => string
|
|
Value => string
|
|
|
|
PeerPropertiesResponse => Key Version CorrelationId ResponseCode PeerProperties
|
|
Key => uint16 // 0x8011
|
|
Version => uint16
|
|
CorrelationId => uint32
|
|
ResponseCode => uint16
|
|
PeerProperties => [PeerProperty]
|
|
PeerProperty => Key Value
|
|
Key => string
|
|
Value => string
|
|
```
|
|
|
|
=== SaslHandshake
|
|
|
|
```
|
|
SaslHandshakeRequest => Key Version CorrelationId Mechanism
|
|
Key => uint16 // 0x0012
|
|
Version => uint16
|
|
CorrelationId => uint32
|
|
|
|
SaslHandshakeResponse => Key Version CorrelationId ResponseCode [Mechanisms]
|
|
Key => uint16 // 0x8012
|
|
Version => uint16
|
|
CorrelationId => uint32
|
|
ResponseCode => uint16
|
|
Mechanisms => [Mechanism]
|
|
Mechanism => string
|
|
```
|
|
|
|
=== SaslAuthenticate
|
|
|
|
```
|
|
SaslAuthenticateRequest => Key Version CorrelationId Mechanism SaslOpaqueData
|
|
Key => uint16 // 0x0013
|
|
Version => uint16
|
|
CorrelationId => uint32
|
|
Mechanism => string
|
|
SaslOpaqueData => bytes
|
|
|
|
SaslAuthenticateResponse => Key Version CorrelationId ResponseCode SaslOpaqueData
|
|
Key => uint16 // 0x8013
|
|
Version => uint16
|
|
CorrelationId => uint32
|
|
ResponseCode => uint16
|
|
SaslOpaqueData => bytes
|
|
```
|
|
|
|
=== Tune
|
|
|
|
```
|
|
TuneRequest => Key Version FrameMax Heartbeat
|
|
Key => uint16 // 0x0014
|
|
Version => uint16
|
|
FrameMax => uint32 // in bytes, 0 means no limit
|
|
Heartbeat => uint32 // in seconds, 0 means no heartbeat
|
|
|
|
TuneResponse => TuneRequest
|
|
```
|
|
|
|
=== Open
|
|
|
|
```
|
|
OpenRequest => Key Version CorrelationId VirtualHost
|
|
Key => uint16 // 0x0015
|
|
Version => uint16
|
|
CorrelationId => uint32
|
|
VirtualHost => string
|
|
|
|
OpenResponse => Key Version CorrelationId ResponseCode ConnectionProperties
|
|
Key => uint16 // 0x8015
|
|
Version => uint16
|
|
CorrelationId => uint32
|
|
ResponseCode => uint16
|
|
ConnectionProperties => [ConnectionProperty]
|
|
ConnectionProperty => Key Value
|
|
Key => string
|
|
Value => string
|
|
```
|
|
|
|
=== Close
|
|
|
|
```
|
|
CloseRequest => Key Version CorrelationId ClosingCode ClosingReason
|
|
Key => uint16 // 0x0016
|
|
Version => uint16
|
|
CorrelationId => uint32
|
|
ClosingCode => uint16
|
|
ClosingReason => string
|
|
|
|
CloseResponse => Key Version CorrelationId ResponseCode
|
|
Key => uint16 // 0x8016
|
|
Version => uint16
|
|
CorrelationId => uint32
|
|
ResponseCode => uint16
|
|
```
|
|
|
|
=== Heartbeat
|
|
|
|
```
|
|
Heartbeat => Key Version
|
|
Key => uint16 // 0x0017
|
|
Version => uint16
|
|
```
|
|
|
|
=== Route
|
|
|
|
```
|
|
RouteQuery => Key Version CorrelationId RoutingKey SuperStream
|
|
Key => uint16 // 0x0018
|
|
Version => uint16
|
|
CorrelationId => uint32
|
|
RoutingKey => string
|
|
SuperStream => string
|
|
|
|
RouteResponse => Key Version CorrelationId ResponseCode [Stream]
|
|
Key => uint16 // 0x8018
|
|
Version => uint16
|
|
CorrelationId => uint32
|
|
ResponseCode => uint16
|
|
Stream => string
|
|
```
|
|
|
|
=== Partitions
|
|
|
|
```
|
|
PartitionsQuery => Key Version CorrelationId SuperStream
|
|
Key => uint16 // 0x0019
|
|
Version => uint16
|
|
CorrelationId => uint32
|
|
SuperStream => string
|
|
|
|
PartitionsResponse => Key Version CorrelationId ResponseCode [Stream]
|
|
Key => uint16 // 0x8019
|
|
Version => uint16
|
|
CorrelationId => uint32
|
|
ResponseCode => uint16
|
|
Stream => string
|
|
```
|
|
|
|
=== ConsumerUpdate
|
|
|
|
```
|
|
ConsumerUpdateQuery => Key Version CorrelationId SubscriptionId Active
|
|
Key => uint16 // 0x001a
|
|
Version => uint16
|
|
CorrelationId => uint32
|
|
SubscriptionId => uint8
|
|
Active => uint8 (boolean, 0 = false, 1 = true)
|
|
|
|
ConsumerUpdateResponse => Key Version CorrelationId ResponseCode OffsetSpecification
|
|
Key => uint16 // 0x801a
|
|
Version => uint16
|
|
CorrelationId => uint32
|
|
ResponseCode => uint16
|
|
OffsetSpecification => OffsetType Offset
|
|
OffsetType => uint16 // 0 (none), 1 (first), 2 (last), 3 (next), 4 (offset), 5 (timestamp)
|
|
Offset => uint64 (for offset) | int64 (for timestamp)
|
|
```
|
|
|
|
=== ExchangeCommandVersions
|
|
|
|
```
|
|
CommandVersionsExchangeRequest => Key Version CorrelationId [Command]
|
|
Key => uint16 // 0x001b
|
|
Version => uint16
|
|
CorrelationId => uint32
|
|
Command => Key MinVersion MaxVersion
|
|
Key => uint16
|
|
MinVersion => uint16
|
|
MaxVersion => uint16
|
|
|
|
CommandVersionsExchangeResponse => Key Version CorrelationId ResponseCode [Command]
|
|
Key => uint16 // 0x801b
|
|
Version => uint16
|
|
CorrelationId => uint32
|
|
ResponseCode => uint16
|
|
Command => Key MinVersion MaxVersion
|
|
Key => uint16
|
|
MinVersion => uint16
|
|
MaxVersion => uint16
|
|
```
|
|
|
|
=== StreamStats
|
|
|
|
```
|
|
StreamStatsRequest => Key Version CorrelationId Stream
|
|
Key => uint16 // 0x001c
|
|
Version => uint16
|
|
CorrelationId => uint32
|
|
Stream => string
|
|
|
|
StreamStatsResponse => Key Version CorrelationId ResponseCode Stats
|
|
Key => uint16 // 0x801c
|
|
Version => uint16
|
|
CorrelationId => uint32
|
|
ResponseCode => uint16
|
|
Stats => [Statistic]
|
|
Statistic => Key Value
|
|
Key => string
|
|
Value => int64
|
|
```
|
|
|
|
=== CreateSuperStream
|
|
|
|
```
|
|
CreateSuperStream => Key Version CorrelationId Name [Partition] [BindingKey] Arguments
|
|
Key => uint16 // 0x001d
|
|
Version => uint16
|
|
CorrelationId => uint32
|
|
Name => string
|
|
Partition => string
|
|
BindingKey => string
|
|
Arguments => [Argument]
|
|
Argument => Key Value
|
|
Key => string
|
|
Value => string
|
|
```
|
|
|
|
=== DeleteSuperStream
|
|
|
|
```
|
|
Delete => Key Version CorrelationId Name
|
|
Key => uint16 // 0x001e
|
|
Version => uint16
|
|
CorrelationId => uint32
|
|
Name => string
|
|
```
|
|
|
|
== Authentication
|
|
|
|
Once a client is connected to the server, it initiates an authentication
|
|
sequence. The next figure shows the steps of the sequence:
|
|
|
|
[ditaa]
|
|
.Authentication Sequence
|
|
....
|
|
Client Server
|
|
+ +
|
|
| Peer Properties Exchange |
|
|
|-------------------------->|
|
|
|<--------------------------|
|
|
| |
|
|
| SASL Handshake |
|
|
|-------------------------->|
|
|
|<--------------------------|
|
|
| |
|
|
| SASL Authenticate |
|
|
|-------------------------->|
|
|
|<--------------------------|
|
|
| |
|
|
| Tune |
|
|
|<--------------------------|
|
|
|-------------------------->|
|
|
| |
|
|
| Open |
|
|
|-------------------------->|
|
|
|<--------------------------|
|
|
| |
|
|
+ +
|
|
....
|
|
|
|
* SaslHandshake: the client asks about the SASL mechanisms the server supports. It
|
|
can then pick one from the list the server returns.
|
|
* SaslAuthenticate: the client answers to the server's challenge(s), using the
|
|
SASL mechanism it picked. The server will send a `Tune` frame once it is satisfied
|
|
with the client authentication response.
|
|
* Tune: the server sends a `Tune` frame to suggest some settings (max frame size, heartbeat).
|
|
The client answers with a `Tune` frame with the settings he agrees on, possibly adjusted
|
|
from the server's suggestions.
|
|
* Open: the client sends an `Open` frame to pick a virtual host to connect to. The server
|
|
answers whether it accepts the access or not.
|