Use MSB in keys to distinguish requests/responses

In stream protocol.
This commit is contained in:
Arnaud Cogoluègnes 2021-02-26 11:03:36 +01:00 committed by Michael Klishin
parent 4b98a0db7e
commit ad0a2cd1b9
No known key found for this signature in database
GPG Key ID: E80EDCFA0CDB21EE
4 changed files with 168 additions and 72 deletions

View File

@ -48,6 +48,15 @@ does not contain a correlation ID.
Some responses may carry additional information than just the response code, this is specified in the command definition.
Keys are int16, but the actual value is defined on the last 15 bits, the most significant bit being
used to make the difference between a request (0) and a response (1). Example for `subscribe`
(key is 6):
```
0b00000000 00000110 => subscribe request
0b10000000 00000110 => subscribe response
```
== Response Codes
.Stream Protocol Response Codes
@ -559,7 +568,9 @@ Heartbeat => Key Version
Version => int16
```
=== Route (experimental)
=== Route
_Experimental_
```
RouteQuery => Key Version CorrelationId RoutingKey SuperStream
@ -578,6 +589,8 @@ RouteResponse => Key Version CorrelationId Stream
=== Partitions (experimental)
_Experimental_
```
PartitionsQuery => Key Version CorrelationId SuperStream
Key => int16 // 24

View File

@ -24,6 +24,9 @@
-define(COMMAND_ROUTE, 23).
-define(COMMAND_PARTITIONS, 24).
-define(REQUEST, 0).
-define(RESPONSE, 1).
-define(VERSION_0, 0).
-define(RESPONSE_CODE_OK, 0).

View File

@ -294,7 +294,8 @@ listen_loop_pre_auth(Transport,
case ConnectionStep of
authenticated ->
TuneFrame =
<<?COMMAND_TUNE:16,
<<?REQUEST:1,
?COMMAND_TUNE:15,
?VERSION_0:16,
FrameMax:32,
Heartbeat:32>>,
@ -448,7 +449,8 @@ listen_loop_post_auth(Transport,
FrameSize = 2 + 2 + 2 + 2 + StreamSize,
Transport:send(S,
[<<FrameSize:32,
?COMMAND_METADATA_UPDATE:16,
?REQUEST:1,
?COMMAND_METADATA_UPDATE:15,
?VERSION_0:16,
?RESPONSE_CODE_STREAM_NOT_AVAILABLE:16,
StreamSize:16,
@ -483,7 +485,8 @@ listen_loop_post_auth(Transport,
%% in practice, this should be necessary only for very large chunks and for very small frame size limits
Transport:send(S,
[<<FrameSize:32,
?COMMAND_PUBLISH_CONFIRM:16,
?REQUEST:1,
?COMMAND_PUBLISH_CONFIRM:15,
?VERSION_0:16>>,
<<CurrentPublisherId:8>>,
<<Count:32>>,
@ -501,7 +504,9 @@ listen_loop_post_auth(Transport,
{FirstPublisherId, <<>>, 0}, CorrelationList),
FrameSize = 2 + 2 + 1 + 4 + LastCount * 8,
Transport:send(S,
[<<FrameSize:32, ?COMMAND_PUBLISH_CONFIRM:16,
[<<FrameSize:32,
?REQUEST:1,
?COMMAND_PUBLISH_CONFIRM:15,
?VERSION_0:16>>,
<<LastPublisherId:8>>,
<<LastCount:32>>,
@ -547,7 +552,9 @@ listen_loop_post_auth(Transport,
PublishingIdCount = length(CorrelationList),
FrameSize = 2 + 2 + 1 + 4 + PublishingIdCount * 8,
Transport:send(S,
[<<FrameSize:32, ?COMMAND_PUBLISH_CONFIRM:16,
[<<FrameSize:32,
?REQUEST:1,
?COMMAND_PUBLISH_CONFIRM:15,
?VERSION_0:16>>,
<<PublisherId:8>>,
<<PublishingIdCount:32>>,
@ -635,7 +642,7 @@ listen_loop_post_auth(Transport,
State1,
Configuration);
heartbeat_send ->
Frame = <<?COMMAND_HEARTBEAT:16, ?VERSION_0:16>>,
Frame = <<?REQUEST:1, ?COMMAND_HEARTBEAT:15, ?VERSION_0:16>>,
case catch frame(Transport, Connection, Frame) of
ok ->
listen_loop_post_auth(Transport,
@ -787,7 +794,8 @@ handle_inbound_data(Transport,
CloseReason = <<"frame too large">>,
CloseReasonLength = byte_size(CloseReason),
CloseFrame =
<<?COMMAND_CLOSE:16,
<<?REQUEST:1,
?COMMAND_CLOSE:15,
?VERSION_0:16,
1:32,
?RESPONSE_CODE_FRAME_TOO_LARGE:16,
@ -841,7 +849,8 @@ generate_publishing_error_details(Acc, Code,
handle_frame_pre_auth(Transport,
#stream_connection{socket = S} = Connection,
State,
<<?COMMAND_PEER_PROPERTIES:16,
<<?REQUEST:1,
?COMMAND_PEER_PROPERTIES:15,
?VERSION_0:16,
CorrelationId:32,
ClientPropertiesCount:32,
@ -891,7 +900,8 @@ handle_frame_pre_auth(Transport,
<<>>, ServerProperties),
Frame =
<<?COMMAND_PEER_PROPERTIES:16,
<<?RESPONSE:1,
?COMMAND_PEER_PROPERTIES:15,
?VERSION_0:16,
CorrelationId:32,
?RESPONSE_CODE_OK:16,
@ -907,7 +917,9 @@ handle_frame_pre_auth(Transport,
handle_frame_pre_auth(Transport,
#stream_connection{socket = S} = Connection,
State,
<<?COMMAND_SASL_HANDSHAKE:16, ?VERSION_0:16,
<<?REQUEST:1,
?COMMAND_SASL_HANDSHAKE:15,
?VERSION_0:16,
CorrelationId:32>>,
Rest) ->
Mechanisms = rabbit_stream_utils:auth_mechanisms(S),
@ -919,7 +931,8 @@ handle_frame_pre_auth(Transport,
<<>>, Mechanisms),
MechanismsCount = length(Mechanisms),
Frame =
<<?COMMAND_SASL_HANDSHAKE:16,
<<?RESPONSE:1,
?COMMAND_SASL_HANDSHAKE:15,
?VERSION_0:16,
CorrelationId:32,
?RESPONSE_CODE_OK:16,
@ -935,7 +948,8 @@ handle_frame_pre_auth(Transport,
host = Host} =
Connection0,
State,
<<?COMMAND_SASL_AUTHENTICATE:16,
<<?REQUEST:1,
?COMMAND_SASL_AUTHENTICATE:15,
?VERSION_0:16,
CorrelationId:32,
MechanismLength:16,
@ -1031,7 +1045,8 @@ handle_frame_pre_auth(Transport,
end
end,
Frame =
<<?COMMAND_SASL_AUTHENTICATE:16,
<<?RESPONSE:1,
?COMMAND_SASL_AUTHENTICATE:15,
?VERSION_0:16,
CorrelationId:32,
FrameFragment/binary>>,
@ -1039,7 +1054,8 @@ handle_frame_pre_auth(Transport,
{C2, Rest};
{error, _} ->
Frame =
<<?COMMAND_SASL_AUTHENTICATE:16,
<<?RESPONSE:1,
?COMMAND_SASL_AUTHENTICATE:15,
?VERSION_0:16,
CorrelationId:32,
?RESPONSE_SASL_MECHANISM_NOT_SUPPORTED:16>>,
@ -1054,7 +1070,8 @@ handle_frame_pre_auth(_Transport,
name = ConnectionName} =
Connection,
State,
<<?COMMAND_TUNE:16,
<<?RESPONSE:1,
?COMMAND_TUNE:15,
?VERSION_0:16,
FrameMax:32,
Heartbeat:32>>,
@ -1088,7 +1105,8 @@ handle_frame_pre_auth(_Transport,
handle_frame_pre_auth(Transport,
#stream_connection{user = User, socket = S} = Connection,
State,
<<?COMMAND_OPEN:16,
<<?REQUEST:1,
?COMMAND_OPEN:15,
?VERSION_0:16,
CorrelationId:32,
VirtualHostLength:16,
@ -1101,7 +1119,8 @@ handle_frame_pre_auth(Transport,
VirtualHost,
{socket, S},
#{}),
F = <<?COMMAND_OPEN:16,
F = <<?RESPONSE:1,
?COMMAND_OPEN:15,
?VERSION_0:16,
CorrelationId:32,
?RESPONSE_CODE_OK:16>>,
@ -1111,7 +1130,8 @@ handle_frame_pre_auth(Transport,
F}
catch
exit:_ ->
Fr = <<?COMMAND_OPEN:16,
Fr = <<?RESPONSE:1,
?COMMAND_OPEN:15,
?VERSION_0:16,
CorrelationId:32,
?RESPONSE_VHOST_ACCESS_FAILURE:16>>,
@ -1124,7 +1144,7 @@ handle_frame_pre_auth(Transport,
handle_frame_pre_auth(_Transport,
Connection,
State,
<<?COMMAND_HEARTBEAT:16, ?VERSION_0:16>>,
<<?REQUEST:1, ?COMMAND_HEARTBEAT:15, ?VERSION_0:16>>,
Rest) ->
rabbit_log:info("Received heartbeat frame pre auth~n"),
{Connection, State, Rest};
@ -1172,7 +1192,8 @@ handle_frame_post_auth(Transport,
publisher_to_ids = RefIds0} =
Connection0,
State,
<<?COMMAND_DECLARE_PUBLISHER:16,
<<?REQUEST:1,
?COMMAND_DECLARE_PUBLISHER:15,
?VERSION_0:16,
CorrelationId:32,
PublisherId:8,
@ -1261,7 +1282,8 @@ handle_frame_post_auth(Transport,
publishers = Publishers} =
Connection,
State,
<<?COMMAND_PUBLISH:16,
<<?REQUEST:1,
?COMMAND_PUBLISH:15,
?VERSION_0:16,
PublisherId:8/unsigned,
MessageCount:32,
@ -1299,7 +1321,8 @@ handle_frame_post_auth(Transport,
Messages),
Transport:send(S,
[<<FrameSize:32,
?COMMAND_PUBLISH_ERROR:16,
?REQUEST:1,
?COMMAND_PUBLISH_ERROR:15,
?VERSION_0:16,
PublisherId:8,
MessageCount:32,
@ -1315,7 +1338,8 @@ handle_frame_post_auth(Transport,
Messages),
Transport:send(S,
[<<FrameSize:32,
?COMMAND_PUBLISH_ERROR:16,
?REQUEST:1,
?COMMAND_PUBLISH_ERROR:15,
?VERSION_0:16,
PublisherId:8,
MessageCount:32,
@ -1328,7 +1352,8 @@ handle_frame_post_auth(Transport,
user = User} =
Connection,
State,
<<?COMMAND_QUERY_PUBLISHER_SEQUENCE:16,
<<?REQUEST:1,
?COMMAND_QUERY_PUBLISHER_SEQUENCE:15,
?VERSION_0:16,
CorrelationId:32,
ReferenceSize:16,
@ -1364,7 +1389,9 @@ handle_frame_post_auth(Transport,
{?RESPONSE_CODE_ACCESS_REFUSED, 0}
end,
Transport:send(S,
[<<FrameSize:32, ?COMMAND_QUERY_PUBLISHER_SEQUENCE:16,
[<<FrameSize:32,
?RESPONSE:1,
?COMMAND_QUERY_PUBLISHER_SEQUENCE:15,
?VERSION_0:16>>,
<<CorrelationId:32>>,
<<ResponseCode:16>>,
@ -1375,7 +1402,8 @@ handle_frame_post_auth(Transport,
publisher_to_ids = PubToIds} =
Connection0,
State,
<<?COMMAND_DELETE_PUBLISHER:16,
<<?REQUEST:1,
?COMMAND_DELETE_PUBLISHER:15,
?VERSION_0:16,
CorrelationId:32,
PublisherId:8>>,
@ -1418,7 +1446,8 @@ handle_frame_post_auth(Transport,
send_file_oct = SendFileOct} =
Connection,
#stream_connection_state{consumers = Consumers} = State,
<<?COMMAND_SUBSCRIBE:16,
<<?REQUEST:1,
?COMMAND_SUBSCRIBE:15,
?VERSION_0:16,
CorrelationId:32,
SubscriptionId:8/unsigned,
@ -1574,7 +1603,8 @@ handle_frame_post_auth(Transport,
send_file_oct = SendFileOct} =
Connection,
#stream_connection_state{consumers = Consumers} = State,
<<?COMMAND_CREDIT:16,
<<?REQUEST:1,
?COMMAND_CREDIT:15,
?VERSION_0:16,
SubscriptionId:8/unsigned,
Credit:16/signed>>,
@ -1599,7 +1629,8 @@ handle_frame_post_auth(Transport,
rabbit_log:warning("Giving credit to unknown subscription: ~p~n",
[SubscriptionId]),
Frame =
<<?COMMAND_CREDIT:16,
<<?RESPONSE:1,
?COMMAND_CREDIT:15,
?VERSION_0:16,
?RESPONSE_CODE_SUBSCRIPTION_ID_DOES_NOT_EXIST:16,
SubscriptionId:8>>,
@ -1612,7 +1643,8 @@ handle_frame_post_auth(_Transport,
user = User} =
Connection,
State,
<<?COMMAND_COMMIT_OFFSET:16,
<<?REQUEST:1,
?COMMAND_COMMIT_OFFSET:15,
?VERSION_0:16,
_CorrelationId:32,
ReferenceSize:16,
@ -1651,7 +1683,8 @@ handle_frame_post_auth(Transport,
user = User} =
Connection,
State,
<<?COMMAND_QUERY_OFFSET:16,
<<?REQUEST:1,
?COMMAND_QUERY_OFFSET:15,
?VERSION_0:16,
CorrelationId:32,
ReferenceSize:16,
@ -1686,7 +1719,10 @@ handle_frame_post_auth(Transport,
{?RESPONSE_CODE_ACCESS_REFUSED, 0}
end,
Transport:send(S,
[<<FrameSize:32, ?COMMAND_QUERY_OFFSET:16, ?VERSION_0:16>>,
[<<FrameSize:32,
?RESPONSE:1,
?COMMAND_QUERY_OFFSET:15,
?VERSION_0:16>>,
<<CorrelationId:32>>,
<<ResponseCode:16>>,
<<Offset:64>>]),
@ -1696,7 +1732,8 @@ handle_frame_post_auth(Transport,
StreamSubscriptions} =
Connection,
#stream_connection_state{consumers = Consumers} = State,
<<?COMMAND_UNSUBSCRIBE:16,
<<?REQUEST:1,
?COMMAND_UNSUBSCRIBE:15,
?VERSION_0:16,
CorrelationId:32,
SubscriptionId:8/unsigned>>,
@ -1748,7 +1785,8 @@ handle_frame_post_auth(Transport,
User} =
Connection,
State,
<<?COMMAND_CREATE_STREAM:16,
<<?REQUEST:1,
?COMMAND_CREATE_STREAM:15,
?VERSION_0:16,
CorrelationId:32,
StreamSize:16,
@ -1831,7 +1869,8 @@ handle_frame_post_auth(Transport,
User} =
Connection,
State,
<<?COMMAND_DELETE_STREAM:16,
<<?REQUEST:1,
?COMMAND_DELETE_STREAM:15,
?VERSION_0:16,
CorrelationId:32,
StreamSize:16,
@ -1862,7 +1901,8 @@ handle_frame_post_auth(Transport,
FrameSize = 2 + 2 + 2 + 2 + StreamSize,
Transport:send(S,
[<<FrameSize:32,
?COMMAND_METADATA_UPDATE:16,
?REQUEST:1,
?COMMAND_METADATA_UPDATE:15,
?VERSION_0:16,
?RESPONSE_CODE_STREAM_NOT_AVAILABLE:16,
StreamSize:16,
@ -1893,7 +1933,8 @@ handle_frame_post_auth(Transport,
virtual_host = VirtualHost} =
Connection,
State,
<<?COMMAND_METADATA:16,
<<?REQUEST:1,
?COMMAND_METADATA:15,
?VERSION_0:16,
CorrelationId:32,
StreamCount:32,
@ -2018,7 +2059,8 @@ handle_frame_post_auth(Transport,
end,
<<StreamCount:32>>, Streams),
Frame =
<<?COMMAND_METADATA:16,
<<?RESPONSE:1,
?COMMAND_METADATA:15,
?VERSION_0:16,
CorrelationId:32,
BrokersBin/binary,
@ -2101,7 +2143,8 @@ handle_frame_post_auth(Transport,
handle_frame_post_auth(Transport,
Connection,
State,
<<?COMMAND_CLOSE:16,
<<?REQUEST:1,
?COMMAND_CLOSE:15,
?VERSION_0:16,
CorrelationId:32,
ClosingCode:16,
@ -2111,7 +2154,8 @@ handle_frame_post_auth(Transport,
rabbit_log:info("Received close command ~p ~p~n",
[ClosingCode, ClosingReason]),
Frame =
<<?COMMAND_CLOSE:16,
<<?RESPONSE:1,
?COMMAND_CLOSE:15,
?VERSION_0:16,
CorrelationId:32,
?RESPONSE_CODE_OK:16>>,
@ -2121,7 +2165,7 @@ handle_frame_post_auth(Transport,
handle_frame_post_auth(_Transport,
Connection,
State,
<<?COMMAND_HEARTBEAT:16, ?VERSION_0:16>>,
<<?REQUEST:1, ?COMMAND_HEARTBEAT:15, ?VERSION_0:16>>,
Rest) ->
rabbit_log:info("Received heartbeat frame post auth~n"),
{Connection, State, Rest};
@ -2131,7 +2175,8 @@ handle_frame_post_auth(Transport, Connection, State, Frame, Rest) ->
CloseReason = <<"unknown frame">>,
CloseReasonLength = byte_size(CloseReason),
CloseFrame =
<<?COMMAND_CLOSE:16,
<<?REQUEST:1,
?COMMAND_CLOSE:15,
?VERSION_0:16,
1:32,
?RESPONSE_CODE_UNKNOWN_FRAME:16,
@ -2168,7 +2213,8 @@ notify_connection_closed(#stream_connection{name = Name,
handle_frame_post_close(_Transport,
Connection,
State,
<<?COMMAND_CLOSE:16,
<<?RESPONSE:1,
?COMMAND_CLOSE:15,
?VERSION_0:16,
_CorrelationId:32,
_ResponseCode:16>>,
@ -2179,7 +2225,7 @@ handle_frame_post_close(_Transport,
handle_frame_post_close(_Transport,
Connection,
State,
<<?COMMAND_HEARTBEAT:16, ?VERSION_0:16>>,
<<?REQUEST:1, ?COMMAND_HEARTBEAT:15, ?VERSION_0:16>>,
Rest) ->
rabbit_log:info("Received heartbeat frame post close~n"),
{Connection, State, Rest};
@ -2380,7 +2426,10 @@ response(Transport,
CorrelationId,
ResponseCode) ->
Transport:send(S,
[<<?RESPONSE_FRAME_SIZE:32, CommandId:16, ?VERSION_0:16>>,
[<<?RESPONSE_FRAME_SIZE:32,
?RESPONSE:1,
CommandId:15,
?VERSION_0:16>>,
<<CorrelationId:32>>, <<ResponseCode:16>>]).
subscription_exists(StreamSubscriptions, SubscriptionId) ->
@ -2399,7 +2448,8 @@ send_file_callback(Transport,
FrameSize = 2 + 2 + 1 + Size,
FrameBeginning =
<<FrameSize:32,
?COMMAND_DELIVER:16,
?REQUEST:1,
?COMMAND_DELIVER:15,
?VERSION_0:16,
SubscriptionId:8/unsigned>>,
Transport:send(S, FrameBeginning),

View File

@ -183,13 +183,18 @@ test_server(Port) ->
test_peer_properties(S) ->
PeerPropertiesFrame =
<<?COMMAND_PEER_PROPERTIES:16, ?VERSION_0:16, 1:32, 0:32>>,
<<?REQUEST:1,
?COMMAND_PEER_PROPERTIES:15,
?VERSION_0:16,
1:32,
0:32>>,
PeerPropertiesFrameSize = byte_size(PeerPropertiesFrame),
gen_tcp:send(S,
<<PeerPropertiesFrameSize:32, PeerPropertiesFrame/binary>>),
{ok,
<<_Size:32,
?COMMAND_PEER_PROPERTIES:16,
?RESPONSE:1,
?COMMAND_PEER_PROPERTIES:15,
?VERSION_0:16,
1:32,
?RESPONSE_CODE_OK:16,
@ -198,7 +203,7 @@ test_peer_properties(S) ->
test_authenticate(S) ->
SaslHandshakeFrame =
<<?COMMAND_SASL_HANDSHAKE:16, ?VERSION_0:16, 1:32>>,
<<?REQUEST:1, ?COMMAND_SASL_HANDSHAKE:15, ?VERSION_0:16, 1:32>>,
SaslHandshakeFrameSize = byte_size(SaslHandshakeFrame),
gen_tcp:send(S,
<<SaslHandshakeFrameSize:32, SaslHandshakeFrame/binary>>),
@ -209,7 +214,8 @@ test_authenticate(S) ->
ok =
case SaslAvailable of
<<31:32,
?COMMAND_SASL_HANDSHAKE:16,
?RESPONSE:1,
?COMMAND_SASL_HANDSHAKE:15,
?VERSION_0:16,
1:32,
?RESPONSE_CODE_OK:16,
@ -220,7 +226,8 @@ test_authenticate(S) ->
AmqPlain:8/binary>> ->
ok;
<<31:32,
?COMMAND_SASL_HANDSHAKE:16,
?RESPONSE:1,
?COMMAND_SASL_HANDSHAKE:15,
?VERSION_0:16,
1:32,
?RESPONSE_CODE_OK:16,
@ -241,7 +248,8 @@ test_authenticate(S) ->
PlainSaslSize = byte_size(PlainSasl),
SaslAuthenticateFrame =
<<?COMMAND_SASL_AUTHENTICATE:16,
<<?REQUEST:1,
?COMMAND_SASL_AUTHENTICATE:15,
?VERSION_0:16,
2:32,
5:16,
@ -256,7 +264,8 @@ test_authenticate(S) ->
SaslAuthenticateFrame/binary>>),
{ok,
<<10:32,
?COMMAND_SASL_AUTHENTICATE:16,
?RESPONSE:1,
?COMMAND_SASL_AUTHENTICATE:15,
?VERSION_0:16,
2:32,
?RESPONSE_CODE_OK:16,
@ -265,7 +274,8 @@ test_authenticate(S) ->
TuneExpected =
<<12:32,
?COMMAND_TUNE:16,
?REQUEST:1,
?COMMAND_TUNE:15,
?VERSION_0:16,
?DEFAULT_FRAME_MAX:32,
?DEFAULT_HEARTBEAT:32>>,
@ -277,14 +287,19 @@ test_authenticate(S) ->
end,
TuneFrame =
<<?COMMAND_TUNE:16, ?VERSION_0:16, ?DEFAULT_FRAME_MAX:32, 0:32>>,
<<?RESPONSE:1,
?COMMAND_TUNE:15,
?VERSION_0:16,
?DEFAULT_FRAME_MAX:32,
0:32>>,
TuneFrameSize = byte_size(TuneFrame),
gen_tcp:send(S, <<TuneFrameSize:32, TuneFrame/binary>>),
VirtualHost = <<"/">>,
VirtualHostLength = byte_size(VirtualHost),
OpenFrame =
<<?COMMAND_OPEN:16,
<<?REQUEST:1,
?COMMAND_OPEN:15,
?VERSION_0:16,
3:32,
VirtualHostLength:16,
@ -293,7 +308,8 @@ test_authenticate(S) ->
gen_tcp:send(S, <<OpenFrameSize:32, OpenFrame/binary>>),
{ok,
<<10:32,
?COMMAND_OPEN:16,
?RESPONSE:1,
?COMMAND_OPEN:15,
?VERSION_0:16,
3:32,
?RESPONSE_CODE_OK:16>>} =
@ -302,7 +318,8 @@ test_authenticate(S) ->
test_create_stream(S, Stream) ->
StreamSize = byte_size(Stream),
CreateStreamFrame =
<<?COMMAND_CREATE_STREAM:16,
<<?REQUEST:1,
?COMMAND_CREATE_STREAM:15,
?VERSION_0:16,
1:32,
StreamSize:16,
@ -312,7 +329,8 @@ test_create_stream(S, Stream) ->
gen_tcp:send(S, <<FrameSize:32, CreateStreamFrame/binary>>),
{ok,
<<_Size:32,
?COMMAND_CREATE_STREAM:16,
?RESPONSE:1,
?COMMAND_CREATE_STREAM:15,
?VERSION_0:16,
1:32,
?RESPONSE_CODE_OK:16>>} =
@ -321,7 +339,8 @@ test_create_stream(S, Stream) ->
test_delete_stream(S, Stream) ->
StreamSize = byte_size(Stream),
DeleteStreamFrame =
<<?COMMAND_DELETE_STREAM:16,
<<?REQUEST:1,
?COMMAND_DELETE_STREAM:15,
?VERSION_0:16,
1:32,
StreamSize:16,
@ -331,7 +350,8 @@ test_delete_stream(S, Stream) ->
ResponseFrameSize = 10,
{ok,
<<ResponseFrameSize:32,
?COMMAND_DELETE_STREAM:16,
?RESPONSE:1,
?COMMAND_DELETE_STREAM:15,
?VERSION_0:16,
1:32,
?RESPONSE_CODE_OK:16>>} =
@ -340,7 +360,8 @@ test_delete_stream(S, Stream) ->
test_declare_publisher(S, PublisherId, Stream) ->
StreamSize = byte_size(Stream),
DeclarePublisherFrame =
<<?COMMAND_DECLARE_PUBLISHER:16,
<<?REQUEST:1,
?COMMAND_DECLARE_PUBLISHER:15,
?VERSION_0:16,
1:32,
PublisherId:8,
@ -352,7 +373,8 @@ test_declare_publisher(S, PublisherId, Stream) ->
Res = gen_tcp:recv(S, 0, 5000),
{ok,
<<_Size:32,
?COMMAND_DECLARE_PUBLISHER:16,
?RESPONSE:1,
?COMMAND_DECLARE_PUBLISHER:15,
?VERSION_0:16,
1:32,
?RESPONSE_CODE_OK:16,
@ -363,7 +385,8 @@ test_declare_publisher(S, PublisherId, Stream) ->
test_publish_confirm(S, PublisherId, Body) ->
BodySize = byte_size(Body),
PublishFrame =
<<?COMMAND_PUBLISH:16,
<<?REQUEST:1,
?COMMAND_PUBLISH:15,
?VERSION_0:16,
PublisherId:8,
1:32,
@ -374,7 +397,8 @@ test_publish_confirm(S, PublisherId, Body) ->
gen_tcp:send(S, <<FrameSize:32, PublishFrame/binary>>),
{ok,
<<_Size:32,
?COMMAND_PUBLISH_CONFIRM:16,
?REQUEST:1,
?COMMAND_PUBLISH_CONFIRM:15,
?VERSION_0:16,
PublisherId:8,
1:32,
@ -384,7 +408,8 @@ test_publish_confirm(S, PublisherId, Body) ->
test_subscribe(S, SubscriptionId, Stream) ->
StreamSize = byte_size(Stream),
SubscribeFrame =
<<?COMMAND_SUBSCRIBE:16,
<<?REQUEST:1,
?COMMAND_SUBSCRIBE:15,
?VERSION_0:16,
1:32,
SubscriptionId:8,
@ -398,7 +423,8 @@ test_subscribe(S, SubscriptionId, Stream) ->
Res = gen_tcp:recv(S, 0, 5000),
{ok,
<<_Size:32,
?COMMAND_SUBSCRIBE:16,
?RESPONSE:1,
?COMMAND_SUBSCRIBE:15,
?VERSION_0:16,
1:32,
?RESPONSE_CODE_OK:16,
@ -410,7 +436,8 @@ test_deliver(S, Rest, SubscriptionId, Body) ->
BodySize = byte_size(Body),
Frame = read_frame(S, Rest),
<<58:32,
?COMMAND_DELIVER:16,
?REQUEST:1,
?COMMAND_DELIVER:15,
?VERSION_0:16,
SubscriptionId:8,
5:4/unsigned,
@ -434,7 +461,8 @@ test_metadata_update_stream_deleted(S, Stream) ->
FrameSize = 2 + 2 + 2 + 2 + StreamSize,
{ok,
<<FrameSize:32,
?COMMAND_METADATA_UPDATE:16,
?REQUEST:1,
?COMMAND_METADATA_UPDATE:15,
?VERSION_0:16,
?RESPONSE_CODE_STREAM_NOT_AVAILABLE:16,
StreamSize:16,
@ -445,7 +473,8 @@ test_close(S) ->
CloseReason = <<"OK">>,
CloseReasonSize = byte_size(CloseReason),
CloseFrame =
<<?COMMAND_CLOSE:16,
<<?REQUEST:1,
?COMMAND_CLOSE:15,
?VERSION_0:16,
1:32,
?RESPONSE_CODE_OK:16,
@ -455,7 +484,8 @@ test_close(S) ->
gen_tcp:send(S, <<CloseFrameSize:32, CloseFrame/binary>>),
{ok,
<<10:32,
?COMMAND_CLOSE:16,
?RESPONSE:1,
?COMMAND_CLOSE:15,
?VERSION_0:16,
1:32,
?RESPONSE_CODE_OK:16>>} =