StompSession supports custom headers for UNSUBSCRIBE

Issue: SPR-15131
This commit is contained in:
Rossen Stoyanchev 2017-01-31 17:55:53 -05:00
parent 949bb55ef5
commit 60517b23e2
3 changed files with 69 additions and 22 deletions

View File

@ -291,8 +291,7 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
@Override @Override
public Subscription subscribe(StompHeaders stompHeaders, StompFrameHandler handler) { public Subscription subscribe(StompHeaders stompHeaders, StompFrameHandler handler) {
String destination = stompHeaders.getDestination(); Assert.hasText(stompHeaders.getDestination(), "Destination header is required");
Assert.hasText(destination, "Destination header is required");
Assert.notNull(handler, "StompFrameHandler must not be null"); Assert.notNull(handler, "StompFrameHandler must not be null");
String subscriptionId = stompHeaders.getId(); String subscriptionId = stompHeaders.getId();
@ -300,8 +299,8 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
subscriptionId = String.valueOf(DefaultStompSession.this.subscriptionIndex.getAndIncrement()); subscriptionId = String.valueOf(DefaultStompSession.this.subscriptionIndex.getAndIncrement());
stompHeaders.setId(subscriptionId); stompHeaders.setId(subscriptionId);
} }
String receiptId = checkOrAddReceipt(stompHeaders); checkOrAddReceipt(stompHeaders);
Subscription subscription = new DefaultSubscription(subscriptionId, destination, receiptId, handler); Subscription subscription = new DefaultSubscription(stompHeaders, handler);
StompHeaderAccessor accessor = createHeaderAccessor(StompCommand.SUBSCRIBE); StompHeaderAccessor accessor = createHeaderAccessor(StompCommand.SUBSCRIBE);
accessor.addNativeHeaders(stompHeaders); accessor.addNativeHeaders(stompHeaders);
@ -333,8 +332,11 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
return receiptable; return receiptable;
} }
private void unsubscribe(String id) { private void unsubscribe(String id, StompHeaders stompHeaders) {
StompHeaderAccessor accessor = createHeaderAccessor(StompCommand.UNSUBSCRIBE); StompHeaderAccessor accessor = createHeaderAccessor(StompCommand.UNSUBSCRIBE);
if (stompHeaders != null) {
accessor.addNativeHeaders(stompHeaders);
}
accessor.setSubscriptionId(id); accessor.setSubscriptionId(id);
Message<byte[]> message = createMessage(accessor, EMPTY_PAYLOAD); Message<byte[]> message = createMessage(accessor, EMPTY_PAYLOAD);
execute(message); execute(message);
@ -600,29 +602,27 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
private class DefaultSubscription extends ReceiptHandler implements Subscription { private class DefaultSubscription extends ReceiptHandler implements Subscription {
private final String id; private final StompHeaders headers;
private final String destination;
private final StompFrameHandler handler; private final StompFrameHandler handler;
public DefaultSubscription(String id, String destination, String receiptId, StompFrameHandler handler) { public DefaultSubscription(StompHeaders headers, StompFrameHandler handler) {
super(receiptId); super(headers.getReceipt());
Assert.notNull(destination, "Destination must not be null"); Assert.notNull(headers.getDestination(), "Destination must not be null");
Assert.notNull(handler, "StompFrameHandler must not be null"); Assert.notNull(handler, "StompFrameHandler must not be null");
this.id = id; this.headers = headers;
this.destination = destination;
this.handler = handler; this.handler = handler;
DefaultStompSession.this.subscriptions.put(id, this); DefaultStompSession.this.subscriptions.put(headers.getId(), this);
} }
@Override @Override
public String getSubscriptionId() { public String getSubscriptionId() {
return this.id; return this.headers.getId();
} }
public String getDestination() { @Override
return this.destination; public StompHeaders getSubscriptionHeaders() {
return this.headers;
} }
public StompFrameHandler getHandler() { public StompFrameHandler getHandler() {
@ -631,13 +631,20 @@ public class DefaultStompSession implements ConnectionHandlingStompSession {
@Override @Override
public void unsubscribe() { public void unsubscribe() {
DefaultStompSession.this.subscriptions.remove(getSubscriptionId()); unsubscribe(null);
DefaultStompSession.this.unsubscribe(getSubscriptionId()); }
@Override
public void unsubscribe(StompHeaders stompHeaders) {
String id = this.headers.getId();
DefaultStompSession.this.subscriptions.remove(id);
DefaultStompSession.this.unsubscribe(id, stompHeaders);
} }
@Override @Override
public String toString() { public String toString() {
return "Subscription [id=" + getSubscriptionId() + ", destination='" + getDestination() + return "Subscription [id=" + getSubscriptionId() +
", destination='" + this.headers.getDestination() +
"', receiptId='" + getReceiptId() + "', handler=" + getHandler() + "]"; "', receiptId='" + getReceiptId() + "', handler=" + getHandler() + "]";
} }
} }

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2016 the original author or authors. * Copyright 2002-2017 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -141,10 +141,22 @@ public interface StompSession {
*/ */
String getSubscriptionId(); String getSubscriptionId();
/**
* Return the headers used on the SUBSCRIBE frame.
*/
StompHeaders getSubscriptionHeaders();
/** /**
* Remove the subscription by sending an UNSUBSCRIBE frame. * Remove the subscription by sending an UNSUBSCRIBE frame.
*/ */
void unsubscribe(); void unsubscribe();
/**
* Alternative to {@link #unsubscribe()} with additional custom headers
* to send to the server.
* <p><strong>Note:</strong> There is no need to set the subscription id.
*/
void unsubscribe(StompHeaders stompHeaders);
} }
} }

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2016 the original author or authors. * Copyright 2002-2017 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -471,6 +471,34 @@ public class DefaultStompSessionTests {
assertEquals(subscription.getSubscriptionId(), stompHeaders.getId()); assertEquals(subscription.getSubscriptionId(), stompHeaders.getId());
} }
@Test // SPR-15131
public void unsubscribeWithCustomHeader() throws Exception {
this.session.afterConnected(this.connection);
assertTrue(this.session.isConnected());
String headerName = "durable-subscription-name";
String headerValue = "123";
StompHeaders subscribeHeaders = new StompHeaders();
subscribeHeaders.setDestination("/topic/foo");
subscribeHeaders.set(headerName, headerValue);
StompFrameHandler frameHandler = mock(StompFrameHandler.class);
Subscription subscription = this.session.subscribe(subscribeHeaders, frameHandler);
StompHeaders unsubscribeHeaders = new StompHeaders();
unsubscribeHeaders.set(headerName, subscription.getSubscriptionHeaders().getFirst(headerName));
subscription.unsubscribe(unsubscribeHeaders);
Message<byte[]> message = this.messageCaptor.getValue();
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
assertEquals(StompCommand.UNSUBSCRIBE, accessor.getCommand());
StompHeaders stompHeaders = StompHeaders.readOnlyStompHeaders(accessor.getNativeHeaders());
assertEquals(stompHeaders.toString(), 2, stompHeaders.size());
assertEquals(subscription.getSubscriptionId(), stompHeaders.getId());
assertEquals(headerValue, stompHeaders.getFirst(headerName));
}
@Test @Test
public void ack() throws Exception { public void ack() throws Exception {
this.session.afterConnected(this.connection); this.session.afterConnected(this.connection);