[#13524] Some enhance behavior for nacos mcp client. (#13627)

* Refactor ReleaseMcpServerRequestHandler throw exception when target mcp server and version has been released.

* Nacos AI service support subscribe target mcp server version.
This commit is contained in:
杨翊 SionYang 2025-07-22 17:09:14 +08:00 committed by GitHub
parent aa69ad8075
commit 00e579e765
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 217 additions and 93 deletions

View File

@ -99,15 +99,15 @@ public class ReleaseMcpServerRequestHandler extends RequestHandler<ReleaseMcpSer
meta.getConnectionId());
ReleaseMcpServerResponse response = new ReleaseMcpServerResponse();
try {
// mcp server and version found, do nothing, directly return mcpId to client.
// mcp server and version found, means this version of mcp server has been release, throw exception.
McpServerBasicInfo existMcpServer = mcpServerOperationService.getMcpServerDetail(namespaceId,
serverSpecification.getId(), serverSpecification.getName(),
serverSpecification.getVersionDetail().getVersion());
String version = existMcpServer.getVersionDetail().getVersion();
response.setMcpId(existMcpServer.getId());
response.setMessage(String.format("Mcp Server %s and target version %s already exist, do not do release",
existMcpServer.getName(), version));
LOGGER.info("Mcp Server {} and target version {} already exist.", existMcpServer.getName(), version);
throw new NacosApiException(NacosException.CONFLICT, ErrorCode.MCP_SERVER_VERSION_EXIST,
String.format("Mcp Server %s and target version %s already exist, do not do release",
existMcpServer.getName(), version));
} catch (NacosApiException e) {
if (ErrorCode.MCP_SERVER_NOT_FOUND.getCode() == e.getDetailErrCode()) {
// mcp server not found, create new mcp server.

View File

@ -98,10 +98,7 @@ class ReleaseMcpServerRequestHandlerTest {
when(mcpServerOperationService.getMcpServerDetail(AiConstants.Mcp.MCP_DEFAULT_NAMESPACE, null, "test",
"1.0.0")).thenReturn(detailInfo);
when(meta.getConnectionId()).thenReturn("111");
ReleaseMcpServerResponse response = requestHandler.handle(request, meta);
assertEquals(detailInfo.getId(), response.getMcpId());
assertEquals("Mcp Server test and target version 1.0.0 already exist, do not do release",
response.getMessage());
assertThrows(NacosApiException.class, () -> requestHandler.handle(request, meta));
}
@Test

View File

@ -113,7 +113,20 @@ public interface AiService {
* @return The detail info of mcp server at current time
* @throws NacosException if request parameter is invalid or handle error
*/
McpServerDetailInfo subscribeMcpServer(String mcpName, AbstractNacosMcpServerListener mcpServerListener) throws NacosException;
default McpServerDetailInfo subscribeMcpServer(String mcpName, AbstractNacosMcpServerListener mcpServerListener) throws NacosException {
return subscribeMcpServer(mcpName, null, mcpServerListener);
}
/**
* Subscribe mcp server.
*
* @param mcpName name of mcp server
* @param version version of mcp server
* @param mcpServerListener listener of mcp server, callback when mcp server is changed
* @return The detail info of mcp server at current time
* @throws NacosException if request parameter is invalid or handle error
*/
McpServerDetailInfo subscribeMcpServer(String mcpName, String version, AbstractNacosMcpServerListener mcpServerListener) throws NacosException;
/**
* Un-subscribe mcp server.
@ -122,7 +135,19 @@ public interface AiService {
* @param mcpServerListener listener of mcp server
* @throws NacosException if request parameter is invalid or handle error
*/
void unsubscribeMcpServer(String mcpName, AbstractNacosMcpServerListener mcpServerListener) throws NacosException;
default void unsubscribeMcpServer(String mcpName, AbstractNacosMcpServerListener mcpServerListener) throws NacosException {
unsubscribeMcpServer(mcpName, null, mcpServerListener);
}
/**
* Un-subscribe mcp server.
*
* @param mcpName name of mcp server
* @param version version of mcp server
* @param mcpServerListener listener of mcp server
* @throws NacosException if request parameter is invalid or handle error
*/
void unsubscribeMcpServer(String mcpName, String version, AbstractNacosMcpServerListener mcpServerListener) throws NacosException;
/**
* Shutdown the AI service and close resources.

View File

@ -235,10 +235,18 @@ public enum ErrorCode {
*/
API_FUNCTION_DISABLED(40001, "API function disabled."),
/**
* MCP Server not found any version.
*/
MCP_SERVER_NOT_FOUND(50000, "MCP server not found"),
/**
* MCP Server target version not found.
*/
MCP_SEVER_VERSION_NOT_FOUND(50001, "MCP server version not found"),
MCP_SERVER_VERSION_EXIST(50002, "MCP server version has existed"),
/**
* Config use 100001 ~ 100999.
**/

View File

@ -61,14 +61,14 @@ class AiServiceDefaultMethodTest {
}
@Override
public McpServerDetailInfo subscribeMcpServer(String mcpName,
public McpServerDetailInfo subscribeMcpServer(String mcpName, String version,
AbstractNacosMcpServerListener mcpServerListener) throws NacosException {
return null;
}
@Override
public void unsubscribeMcpServer(String mcpName, AbstractNacosMcpServerListener mcpServerListener)
throws NacosException {
public void unsubscribeMcpServer(String mcpName, String version,
AbstractNacosMcpServerListener mcpServerListener) throws NacosException {
}
@Override

View File

@ -140,8 +140,8 @@ public class NacosAiService implements AiService {
}
@Override
public McpServerDetailInfo subscribeMcpServer(String mcpName, AbstractNacosMcpServerListener mcpServerListener)
throws NacosException {
public McpServerDetailInfo subscribeMcpServer(String mcpName, String version,
AbstractNacosMcpServerListener mcpServerListener) throws NacosException {
if (StringUtils.isBlank(mcpName)) {
throw new NacosApiException(NacosException.INVALID_PARAM, ErrorCode.PARAMETER_MISSING,
"parameters `mcpName` can't be empty or null");
@ -151,8 +151,8 @@ public class NacosAiService implements AiService {
"parameters `mcpServerListener` can't be empty or null");
}
McpServerListenerInvoker listenerInvoker = new McpServerListenerInvoker(mcpServerListener);
mcpServerNotifier.registerListener(mcpName, listenerInvoker);
McpServerDetailInfo result = grpcClient.subscribeMcpServer(mcpName);
mcpServerNotifier.registerListener(mcpName, version, listenerInvoker);
McpServerDetailInfo result = grpcClient.subscribeMcpServer(mcpName, version);
if (!listenerInvoker.isInvoked()) {
listenerInvoker.invoke(new NacosMcpServerEvent(result));
}
@ -160,7 +160,7 @@ public class NacosAiService implements AiService {
}
@Override
public void unsubscribeMcpServer(String mcpName, AbstractNacosMcpServerListener mcpServerListener)
public void unsubscribeMcpServer(String mcpName, String version, AbstractNacosMcpServerListener mcpServerListener)
throws NacosException {
if (StringUtils.isBlank(mcpName)) {
throw new NacosApiException(NacosException.INVALID_PARAM, ErrorCode.PARAMETER_MISSING,
@ -170,9 +170,9 @@ public class NacosAiService implements AiService {
return;
}
McpServerListenerInvoker listenerInvoker = new McpServerListenerInvoker(mcpServerListener);
mcpServerNotifier.deregisterListener(mcpName, listenerInvoker);
mcpServerNotifier.deregisterListener(mcpName, version, listenerInvoker);
if (!mcpServerNotifier.isSubscribed(mcpName)) {
grpcClient.unsubscribeMcpServer(mcpName);
grpcClient.unsubscribeMcpServer(mcpName, version);
}
}

View File

@ -21,6 +21,7 @@ import com.alibaba.nacos.api.ai.model.mcp.McpServerDetailInfo;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.client.ai.event.McpServerChangedEvent;
import com.alibaba.nacos.client.ai.remote.AiGrpcClient;
import com.alibaba.nacos.client.ai.utils.McpServerUtils;
import com.alibaba.nacos.client.env.NacosClientProperties;
import com.alibaba.nacos.common.executor.NameThreadFactory;
import com.alibaba.nacos.common.lifecycle.Closeable;
@ -77,7 +78,7 @@ public class NacosMcpServerCacheHolder implements Closeable {
}
public McpServerDetailInfo getMcpServer(String mcpName, String version) {
String key = buildCacheKey(mcpName, version);
String key = McpServerUtils.buildMcpServerKey(mcpName, version);
return mcpServerCache.get(key);
}
@ -90,16 +91,16 @@ public class NacosMcpServerCacheHolder implements Closeable {
String mcpName = detailInfo.getName();
String version = detailInfo.getVersionDetail().getVersion();
Boolean isLatest = detailInfo.getVersionDetail().getIs_latest();
String key = buildCacheKey(mcpName, version);
String key = McpServerUtils.buildMcpServerKey(mcpName, version);
McpServerDetailInfo oldMcpServer = mcpServerCache.get(key);
mcpServerCache.put(key, detailInfo);
if (null != isLatest && isLatest) {
String latestVersionKey = buildCacheKey(mcpName, null);
String latestVersionKey = McpServerUtils.buildMcpServerKey(mcpName, null);
mcpServerCache.put(latestVersionKey, detailInfo);
}
if (isMcpServerChanged(oldMcpServer, detailInfo)) {
LOGGER.info("mcp server {} changed.", detailInfo.getName());
NotifyCenter.publishEvent(new McpServerChangedEvent(detailInfo.getName(), detailInfo));
NotifyCenter.publishEvent(new McpServerChangedEvent(detailInfo));
}
}
@ -107,10 +108,12 @@ public class NacosMcpServerCacheHolder implements Closeable {
* Add new update task for mcp server.
*
* @param mcpName name of mcp server
* @param version version of mcp server
*/
public void addMcpServerUpdateTask(String mcpName) {
this.updateTaskMap.computeIfAbsent(mcpName, s -> {
McpServerUpdater updateTask = new McpServerUpdater(mcpName);
public void addMcpServerUpdateTask(String mcpName, String version) {
String mcpServerKey = McpServerUtils.buildMcpServerKey(mcpName, version);
this.updateTaskMap.computeIfAbsent(mcpServerKey, s -> {
McpServerUpdater updateTask = new McpServerUpdater(mcpName, version);
updaterExecutor.schedule(updateTask, updateIntervalMillis, TimeUnit.MILLISECONDS);
return updateTask;
});
@ -120,9 +123,11 @@ public class NacosMcpServerCacheHolder implements Closeable {
* Remove new update task for mcp server.
*
* @param mcpName name of mcp server
* @param version version of mcp server
*/
public void removeMcpServerUpdateTask(String mcpName) {
McpServerUpdater updateTask = this.updateTaskMap.remove(mcpName);
public void removeMcpServerUpdateTask(String mcpName, String version) {
String mcpServerKey = McpServerUtils.buildMcpServerKey(mcpName, version);
McpServerUpdater updateTask = this.updateTaskMap.remove(mcpServerKey);
if (null != updateTask) {
updateTask.cancel();
}
@ -146,13 +151,6 @@ public class NacosMcpServerCacheHolder implements Closeable {
return false;
}
private String buildCacheKey(String mcpName, String version) {
if (StringUtils.isBlank(version)) {
version = "latest";
}
return mcpName + "::" + version;
}
@Override
public void shutdown() throws NacosException {
this.updaterExecutor.shutdownNow();
@ -162,10 +160,13 @@ public class NacosMcpServerCacheHolder implements Closeable {
private final String mcpName;
private final String version;
private final AtomicBoolean cancel;
public McpServerUpdater(String mcpName) {
public McpServerUpdater(String mcpName, String version) {
this.mcpName = mcpName;
this.version = version;
this.cancel = new AtomicBoolean(false);
}
@ -175,7 +176,7 @@ public class NacosMcpServerCacheHolder implements Closeable {
return;
}
try {
McpServerDetailInfo detailInfo = aiGrpcClient.queryMcpServer(mcpName, null);
McpServerDetailInfo detailInfo = aiGrpcClient.queryMcpServer(mcpName, version);
processMcpServerDetailInfo(detailInfo);
} catch (Exception e) {
LOGGER.warn("Mcp server updater execute query failed", e);

View File

@ -17,6 +17,7 @@
package com.alibaba.nacos.client.ai.event;
import com.alibaba.nacos.api.ai.listener.NacosMcpServerEvent;
import com.alibaba.nacos.client.ai.utils.McpServerUtils;
import com.alibaba.nacos.common.notify.Event;
import com.alibaba.nacos.common.notify.listener.Subscriber;
import com.alibaba.nacos.common.utils.CollectionUtils;
@ -42,11 +43,12 @@ public class McpServerChangeNotifier extends Subscriber<McpServerChangedEvent> {
@Override
public void onEvent(McpServerChangedEvent event) {
if (!isSubscribed(event.getMcpName())) {
String mcpServerKey = McpServerUtils.buildMcpServerKey(event.getMcpName(), event.getVersion());
if (!isSubscribed(mcpServerKey)) {
return;
}
NacosMcpServerEvent notifiedEvent = new NacosMcpServerEvent(event.getMcpServer());
for (McpServerListenerInvoker each : mcpServerListenerInvokers.get(event.getMcpName())) {
for (McpServerListenerInvoker each : mcpServerListenerInvokers.get(mcpServerKey)) {
each.invoke(notifiedEvent);
}
}
@ -60,13 +62,15 @@ public class McpServerChangeNotifier extends Subscriber<McpServerChangedEvent> {
* register listener.
*
* @param mcpName name of mcp server
* @param version version of mcp server
* @param listenerInvoker listener invoker
*/
public void registerListener(String mcpName, McpServerListenerInvoker listenerInvoker) {
public void registerListener(String mcpName, String version, McpServerListenerInvoker listenerInvoker) {
if (listenerInvoker == null) {
return;
}
mcpServerListenerInvokers.compute(mcpName, (key, mcpServerListenerInvokers) -> {
String mcpServerKey = McpServerUtils.buildMcpServerKey(mcpName, version);
mcpServerListenerInvokers.compute(mcpServerKey, (key, mcpServerListenerInvokers) -> {
if (null == mcpServerListenerInvokers) {
mcpServerListenerInvokers = new ConcurrentHashSet<>();
}
@ -79,13 +83,15 @@ public class McpServerChangeNotifier extends Subscriber<McpServerChangedEvent> {
* deregister listener.
*
* @param mcpName name of mcp server
* @param version version of mcp server
* @param listenerInvoker listener invoker
*/
public void deregisterListener(String mcpName, McpServerListenerInvoker listenerInvoker) {
public void deregisterListener(String mcpName, String version, McpServerListenerInvoker listenerInvoker) {
if (listenerInvoker == null) {
return;
}
mcpServerListenerInvokers.compute(mcpName, (key, mcpServerListenerInvokers) -> {
String mcpServerKey = McpServerUtils.buildMcpServerKey(mcpName, version);
mcpServerListenerInvokers.compute(mcpServerKey, (key, mcpServerListenerInvokers) -> {
if (null == mcpServerListenerInvokers) {
return null;
}

View File

@ -17,6 +17,7 @@
package com.alibaba.nacos.client.ai.event;
import com.alibaba.nacos.api.ai.model.mcp.McpServerDetailInfo;
import com.alibaba.nacos.client.ai.utils.McpServerUtils;
import com.alibaba.nacos.common.notify.Event;
/**
@ -30,17 +31,29 @@ public class McpServerChangedEvent extends Event {
private final String mcpName;
private final String version;
private final McpServerDetailInfo mcpServer;
public McpServerChangedEvent(String mcpName, McpServerDetailInfo mcpServer) {
this.mcpName = mcpName;
public McpServerChangedEvent(McpServerDetailInfo mcpServer) {
this.mcpServer = mcpServer;
this.mcpName = mcpServer.getName();
this.version = buildVersion(mcpServer);
}
private String buildVersion(McpServerDetailInfo mcpServer) {
return mcpServer.getVersionDetail().getIs_latest() ? McpServerUtils.LATEST_VERSION
: mcpServer.getVersionDetail().getVersion();
}
public String getMcpName() {
return mcpName;
}
public String getVersion() {
return version;
}
public McpServerDetailInfo getMcpServer() {
return mcpServer;
}

View File

@ -249,19 +249,20 @@ public class AiGrpcClient implements Closeable {
* Subscribe mcp server latest version.
*
* @param mcpName name of mcp server
* @param version version of mcp server
* @return latest version mcp server
* @throws NacosException if request parameter is invalid or handle error
*/
public McpServerDetailInfo subscribeMcpServer(String mcpName) throws NacosException {
public McpServerDetailInfo subscribeMcpServer(String mcpName, String version) throws NacosException {
if (!isAbilitySupportedByServer(AbilityKey.SERVER_MCP_REGISTRY)) {
throw new NacosRuntimeException(NacosException.SERVER_NOT_IMPLEMENTED,
"Request Nacos server version is too low, not support mcp registry feature.");
}
McpServerDetailInfo cachedServer = mcpServerCacheHolder.getMcpServer(mcpName, null);
McpServerDetailInfo cachedServer = mcpServerCacheHolder.getMcpServer(mcpName, version);
if (null == cachedServer) {
cachedServer = queryMcpServer(mcpName, null);
cachedServer = queryMcpServer(mcpName, version);
mcpServerCacheHolder.processMcpServerDetailInfo(cachedServer);
mcpServerCacheHolder.addMcpServerUpdateTask(mcpName);
mcpServerCacheHolder.addMcpServerUpdateTask(mcpName, version);
}
return cachedServer;
}
@ -270,14 +271,15 @@ public class AiGrpcClient implements Closeable {
* Un-subscribe mcp server.
*
* @param mcpName name of mcp server
* @param version version of mcp server
* @throws NacosException if request parameter is invalid or handle error
*/
public void unsubscribeMcpServer(String mcpName) throws NacosException {
public void unsubscribeMcpServer(String mcpName, String version) throws NacosException {
if (!isAbilitySupportedByServer(AbilityKey.SERVER_MCP_REGISTRY)) {
throw new NacosRuntimeException(NacosException.SERVER_NOT_IMPLEMENTED,
"Request Nacos server version is too low, not support mcp registry feature.");
}
mcpServerCacheHolder.removeMcpServerUpdateTask(mcpName);
mcpServerCacheHolder.removeMcpServerUpdateTask(mcpName, version);
}
public boolean isEnable() {

View File

@ -0,0 +1,43 @@
/*
* Copyright 1999-2025 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.client.ai.utils;
import com.alibaba.nacos.common.utils.StringUtils;
/**
* Nacos AI module mcp server utils.
*
* @author xiweng.yy
*/
public class McpServerUtils {
public static final String LATEST_VERSION = "latest";
/**
* Build mcp server versioned key.
*
* @param mcpName name of mcp server
* @param version version of mcp server, if version is blank or null, use latest version
* @return mcp server versioned key, pattern ${mcpName}::${version}
*/
public static String buildMcpServerKey(String mcpName, String version) {
if (StringUtils.isBlank(version)) {
version = LATEST_VERSION;
}
return mcpName + "::" + version;
}
}

View File

@ -46,6 +46,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -169,10 +170,10 @@ class NacosAiServiceTest {
injectMocks();
AbstractNacosMcpServerListener listener = Mockito.mock(AbstractNacosMcpServerListener.class);
McpServerDetailInfo expected = new McpServerDetailInfo();
when(grpcClient.subscribeMcpServer("testMcpName")).thenReturn(expected);
when(grpcClient.subscribeMcpServer("testMcpName", null)).thenReturn(expected);
McpServerDetailInfo actual = nacosAiService.subscribeMcpServer("testMcpName", listener);
assertEquals(expected, actual);
verify(mcpServerNotifier).registerListener(eq("testMcpName"), any(McpServerListenerInvoker.class));
verify(mcpServerNotifier).registerListener(eq("testMcpName"), isNull(), any(McpServerListenerInvoker.class));
verify(listener).onEvent(any(NacosMcpServerEvent.class));
}
@ -187,8 +188,8 @@ class NacosAiServiceTest {
injectMocks();
AbstractNacosMcpServerListener listener = Mockito.mock(AbstractNacosMcpServerListener.class);
nacosAiService.unsubscribeMcpServer("testMcpName", listener);
verify(mcpServerNotifier).deregisterListener(eq("testMcpName"), any(McpServerListenerInvoker.class));
verify(grpcClient).unsubscribeMcpServer("testMcpName");
verify(mcpServerNotifier).deregisterListener(eq("testMcpName"), isNull(), any(McpServerListenerInvoker.class));
verify(grpcClient).unsubscribeMcpServer("testMcpName", null);
}
@Test
@ -197,16 +198,16 @@ class NacosAiServiceTest {
when(mcpServerNotifier.isSubscribed("testMcpName")).thenReturn(true);
AbstractNacosMcpServerListener listener = Mockito.mock(AbstractNacosMcpServerListener.class);
nacosAiService.unsubscribeMcpServer("testMcpName", listener);
verify(mcpServerNotifier).deregisterListener(eq("testMcpName"), any(McpServerListenerInvoker.class));
verify(grpcClient, never()).unsubscribeMcpServer("testMcpName");
verify(mcpServerNotifier).deregisterListener(eq("testMcpName"), isNull(), any(McpServerListenerInvoker.class));
verify(grpcClient, never()).unsubscribeMcpServer("testMcpName", null);
}
@Test
void unsubscribeMcpServerWithNullListener() throws NoSuchFieldException, IllegalAccessException, NacosException {
injectMocks();
nacosAiService.unsubscribeMcpServer("testMcpName", null);
verify(mcpServerNotifier, never()).deregisterListener(eq("testMcpName"), any(McpServerListenerInvoker.class));
verify(grpcClient, never()).unsubscribeMcpServer("testMcpName");
verify(mcpServerNotifier, never()).deregisterListener(eq("testMcpName"), isNull(), any(McpServerListenerInvoker.class));
verify(grpcClient, never()).unsubscribeMcpServer("testMcpName", null);
}
@Test

View File

@ -77,6 +77,7 @@ class NacosMcpServerCacheHolderTest {
mcpServerDetailInfo.setName("test");
mcpServerDetailInfo.setVersionDetail(new ServerVersionDetail());
mcpServerDetailInfo.getVersionDetail().setVersion("1.0.0");
mcpServerDetailInfo.getVersionDetail().setIs_latest(true);
cacheHolder.processMcpServerDetailInfo(mcpServerDetailInfo);
assertNotNull(cacheHolder.getMcpServer("test", "1.0.0"));
assertEquals(mcpServerDetailInfo, cacheHolder.getMcpServer("test", "1.0.0"));
@ -122,11 +123,13 @@ class NacosMcpServerCacheHolderTest {
mcpServerDetailInfo.setName("test");
mcpServerDetailInfo.setVersionDetail(new ServerVersionDetail());
mcpServerDetailInfo.getVersionDetail().setVersion("1.0.0");
mcpServerDetailInfo.getVersionDetail().setIs_latest(true);
cacheHolder.processMcpServerDetailInfo(mcpServerDetailInfo);
mcpServerDetailInfo = new McpServerDetailInfo();
mcpServerDetailInfo.setName("test");
mcpServerDetailInfo.setVersionDetail(new ServerVersionDetail());
mcpServerDetailInfo.getVersionDetail().setVersion("1.0.0");
mcpServerDetailInfo.getVersionDetail().setIs_latest(true);
mcpServerDetailInfo.setProtocol(AiConstants.Mcp.MCP_PROTOCOL_STDIO);
MockEventSubscriber subscriber = new MockEventSubscriber();
@ -150,6 +153,7 @@ class NacosMcpServerCacheHolderTest {
mcpServerDetailInfo.setName("test");
mcpServerDetailInfo.setVersionDetail(new ServerVersionDetail());
mcpServerDetailInfo.getVersionDetail().setVersion("1.0.0");
mcpServerDetailInfo.getVersionDetail().setIs_latest(true);
cacheHolder.processMcpServerDetailInfo(mcpServerDetailInfo);
MockEventSubscriber subscriber = new MockEventSubscriber();
@ -194,12 +198,12 @@ class NacosMcpServerCacheHolderTest {
mcpServerDetailInfo.setName("test");
mcpServerDetailInfo.setVersionDetail(new ServerVersionDetail());
mcpServerDetailInfo.getVersionDetail().setVersion("1.0.0");
when(aiGrpcClient.queryMcpServer("test", null)).thenReturn(mcpServerDetailInfo);
cacheHolder.addMcpServerUpdateTask("test");
when(aiGrpcClient.queryMcpServer("test", "1.0.0")).thenReturn(mcpServerDetailInfo);
cacheHolder.addMcpServerUpdateTask("test", "1.0.0");
TimeUnit.MILLISECONDS.sleep(110);
assertNotNull(cacheHolder.getMcpServer("test", "1.0.0"));
TimeUnit.MILLISECONDS.sleep(110);
verify(aiGrpcClient, times(2)).queryMcpServer("test", null);
verify(aiGrpcClient, times(2)).queryMcpServer("test", "1.0.0");
}
@Test
@ -209,12 +213,12 @@ class NacosMcpServerCacheHolderTest {
mcpServerDetailInfo.setName("test");
mcpServerDetailInfo.setVersionDetail(new ServerVersionDetail());
mcpServerDetailInfo.getVersionDetail().setVersion("1.0.0");
when(aiGrpcClient.queryMcpServer("test", null)).thenThrow(new RuntimeException("test"));
cacheHolder.addMcpServerUpdateTask("test");
when(aiGrpcClient.queryMcpServer("test", "1.0.0")).thenThrow(new RuntimeException("test"));
cacheHolder.addMcpServerUpdateTask("test", "1.0.0");
TimeUnit.MILLISECONDS.sleep(110);
assertNull(cacheHolder.getMcpServer("test", "1.0.0"));
TimeUnit.MILLISECONDS.sleep(110);
verify(aiGrpcClient, times(2)).queryMcpServer("test", null);
verify(aiGrpcClient, times(2)).queryMcpServer("test", "1.0.0");
}
@Test
@ -224,13 +228,13 @@ class NacosMcpServerCacheHolderTest {
mcpServerDetailInfo.setName("test");
mcpServerDetailInfo.setVersionDetail(new ServerVersionDetail());
mcpServerDetailInfo.getVersionDetail().setVersion("1.0.0");
when(aiGrpcClient.queryMcpServer("test", null)).thenReturn(mcpServerDetailInfo);
cacheHolder.addMcpServerUpdateTask("test");
when(aiGrpcClient.queryMcpServer("test", "1.0.0")).thenReturn(mcpServerDetailInfo);
cacheHolder.addMcpServerUpdateTask("test", "1.0.0");
TimeUnit.MILLISECONDS.sleep(110);
assertNotNull(cacheHolder.getMcpServer("test", "1.0.0"));
cacheHolder.removeMcpServerUpdateTask("test");
cacheHolder.removeMcpServerUpdateTask("test", "1.0.0");
TimeUnit.MILLISECONDS.sleep(110);
verify(aiGrpcClient).queryMcpServer("test", null);
verify(aiGrpcClient).queryMcpServer("test", "1.0.0");
}
@Test
@ -240,8 +244,8 @@ class NacosMcpServerCacheHolderTest {
mcpServerDetailInfo.setName("test");
mcpServerDetailInfo.setVersionDetail(new ServerVersionDetail());
mcpServerDetailInfo.getVersionDetail().setVersion("1.0.0");
cacheHolder.addMcpServerUpdateTask("test");
cacheHolder.removeMcpServerUpdateTask("test");
cacheHolder.addMcpServerUpdateTask("test", "1.0.0");
cacheHolder.removeMcpServerUpdateTask("test", "1.0.0");
TimeUnit.MILLISECONDS.sleep(110);
verify(aiGrpcClient, never()).queryMcpServer("test", null);
}

View File

@ -19,6 +19,7 @@ package com.alibaba.nacos.client.ai.event;
import com.alibaba.nacos.api.ai.listener.AbstractNacosMcpServerListener;
import com.alibaba.nacos.api.ai.listener.NacosMcpServerEvent;
import com.alibaba.nacos.api.ai.model.mcp.McpServerDetailInfo;
import com.alibaba.nacos.api.ai.model.mcp.registry.ServerVersionDetail;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -39,10 +40,17 @@ class McpServerChangeNotifierTest {
private AtomicBoolean invokedMark;
private McpServerDetailInfo mcpServerDetailInfo;
@BeforeEach
void setUp() {
changeNotifier = new McpServerChangeNotifier();
invokedMark = new AtomicBoolean(false);
mcpServerDetailInfo = new McpServerDetailInfo();
mcpServerDetailInfo.setName("test");
mcpServerDetailInfo.setVersionDetail(new ServerVersionDetail());
mcpServerDetailInfo.getVersionDetail().setVersion("1.0.0");
mcpServerDetailInfo.getVersionDetail().setIs_latest(true);
}
@AfterEach
@ -51,7 +59,7 @@ class McpServerChangeNotifierTest {
@Test
void onEventWithoutListener() {
assertDoesNotThrow(() -> changeNotifier.onEvent(new McpServerChangedEvent("test", new McpServerDetailInfo())));
assertDoesNotThrow(() -> changeNotifier.onEvent(new McpServerChangedEvent(mcpServerDetailInfo)));
}
@Test
@ -63,8 +71,24 @@ class McpServerChangeNotifierTest {
}
};
McpServerListenerInvoker invoker = new McpServerListenerInvoker(listener);
changeNotifier.registerListener("test", invoker);
assertDoesNotThrow(() -> changeNotifier.onEvent(new McpServerChangedEvent("test", new McpServerDetailInfo())));
changeNotifier.registerListener("test", null, invoker);
assertDoesNotThrow(() -> changeNotifier.onEvent(new McpServerChangedEvent(mcpServerDetailInfo)));
assertTrue(invokedMark.get());
assertTrue(invoker.isInvoked());
}
@Test
void onEventNotLatestVersion() {
AbstractNacosMcpServerListener listener = new AbstractNacosMcpServerListener() {
@Override
public void onEvent(NacosMcpServerEvent event) {
invokedMark.set(true);
}
};
McpServerListenerInvoker invoker = new McpServerListenerInvoker(listener);
changeNotifier.registerListener("test", "1.0.0", invoker);
mcpServerDetailInfo.getVersionDetail().setIs_latest(false);
assertDoesNotThrow(() -> changeNotifier.onEvent(new McpServerChangedEvent(mcpServerDetailInfo)));
assertTrue(invokedMark.get());
assertTrue(invoker.isInvoked());
}
@ -80,9 +104,9 @@ class McpServerChangeNotifierTest {
AbstractNacosMcpServerListener listener2 = Mockito.mock(AbstractNacosMcpServerListener.class);
McpServerListenerInvoker invoker = new McpServerListenerInvoker(listener);
McpServerListenerInvoker invoker2 = new McpServerListenerInvoker(listener2);
changeNotifier.registerListener("test", invoker);
changeNotifier.registerListener("test", invoker2);
assertDoesNotThrow(() -> changeNotifier.onEvent(new McpServerChangedEvent("test", new McpServerDetailInfo())));
changeNotifier.registerListener("test", null, invoker);
changeNotifier.registerListener("test", null, invoker2);
assertDoesNotThrow(() -> changeNotifier.onEvent(new McpServerChangedEvent(mcpServerDetailInfo)));
assertTrue(invokedMark.get());
assertTrue(invoker.isInvoked());
assertTrue(invoker2.isInvoked());
@ -90,26 +114,26 @@ class McpServerChangeNotifierTest {
invokedMark.set(false);
reset(listener2);
changeNotifier.deregisterListener("test", invoker2);
assertDoesNotThrow(() -> changeNotifier.onEvent(new McpServerChangedEvent("test", new McpServerDetailInfo())));
changeNotifier.deregisterListener("test", null, invoker2);
assertDoesNotThrow(() -> changeNotifier.onEvent(new McpServerChangedEvent(mcpServerDetailInfo)));
assertTrue(invokedMark.get());
verify(listener2, Mockito.never()).onEvent(any(NacosMcpServerEvent.class));
invokedMark.set(false);
changeNotifier.deregisterListener("test", invoker);
assertDoesNotThrow(() -> changeNotifier.onEvent(new McpServerChangedEvent("test", new McpServerDetailInfo())));
changeNotifier.deregisterListener("test", null, invoker);
assertDoesNotThrow(() -> changeNotifier.onEvent(new McpServerChangedEvent(mcpServerDetailInfo)));
assertFalse(invokedMark.get());
}
@Test
void registerNullListener() {
changeNotifier.registerListener("test", null);
changeNotifier.registerListener("test", null, null);
assertFalse(changeNotifier.isSubscribed("test"));
}
@Test
void deregisterNullListener() {
changeNotifier.deregisterListener("test", null);
changeNotifier.deregisterListener("test", null, null);
assertFalse(changeNotifier.isSubscribed("test"));
}
@ -122,7 +146,7 @@ class McpServerChangeNotifierTest {
}
};
McpServerListenerInvoker invoker = new McpServerListenerInvoker(listener);
changeNotifier.deregisterListener("test", invoker);
changeNotifier.deregisterListener("test", null, invoker);
assertFalse(changeNotifier.isSubscribed("test"));
}
}

View File

@ -200,9 +200,9 @@ class AiGrpcClientTest {
QueryMcpServerResponse response = new QueryMcpServerResponse();
response.setMcpServerDetailInfo(mcpServerDetailInfo);
when(rpcClient.request(any(QueryMcpServerRequest.class))).thenReturn(response);
assertEquals(mcpServerDetailInfo, aiGrpcClient.subscribeMcpServer("test"));
assertEquals(mcpServerDetailInfo, aiGrpcClient.subscribeMcpServer("test", null));
verify(mcpServerCacheHolder).processMcpServerDetailInfo(mcpServerDetailInfo);
verify(mcpServerCacheHolder).addMcpServerUpdateTask("test");
verify(mcpServerCacheHolder).addMcpServerUpdateTask("test", null);
}
@Test
@ -211,18 +211,18 @@ class AiGrpcClientTest {
when(rpcClient.getConnectionAbility(AbilityKey.SERVER_MCP_REGISTRY)).thenReturn(AbilityStatus.SUPPORTED);
McpServerDetailInfo mcpServerDetailInfo = new McpServerDetailInfo();
when(mcpServerCacheHolder.getMcpServer("test", null)).thenReturn(mcpServerDetailInfo);
assertEquals(mcpServerDetailInfo, aiGrpcClient.subscribeMcpServer("test"));
assertEquals(mcpServerDetailInfo, aiGrpcClient.subscribeMcpServer("test", null));
verify(rpcClient, never()).request(any(QueryMcpServerRequest.class));
verify(mcpServerCacheHolder, never()).processMcpServerDetailInfo(mcpServerDetailInfo);
verify(mcpServerCacheHolder, never()).addMcpServerUpdateTask("test");
verify(mcpServerCacheHolder, never()).addMcpServerUpdateTask("test", null);
}
@Test
void unsubscribeMcpServer() throws NoSuchFieldException, IllegalAccessException, NacosException {
injectMock();
when(rpcClient.getConnectionAbility(AbilityKey.SERVER_MCP_REGISTRY)).thenReturn(AbilityStatus.SUPPORTED);
aiGrpcClient.unsubscribeMcpServer("test");
verify(mcpServerCacheHolder).removeMcpServerUpdateTask("test");
aiGrpcClient.unsubscribeMcpServer("test", null);
verify(mcpServerCacheHolder).removeMcpServerUpdateTask("test", null);
}
@Test
@ -263,14 +263,14 @@ class AiGrpcClientTest {
void subscribeMcpServerWithFeatureDisabled() throws NoSuchFieldException, IllegalAccessException {
injectMock();
when(rpcClient.getConnectionAbility(AbilityKey.SERVER_MCP_REGISTRY)).thenReturn(AbilityStatus.NOT_SUPPORTED);
assertThrows(NacosRuntimeException.class, () -> aiGrpcClient.subscribeMcpServer("test"));
assertThrows(NacosRuntimeException.class, () -> aiGrpcClient.subscribeMcpServer("test", null));
}
@Test
void unsubscribeMcpServerWithFeatureDisabled() throws NoSuchFieldException, IllegalAccessException {
injectMock();
when(rpcClient.getConnectionAbility(AbilityKey.SERVER_MCP_REGISTRY)).thenReturn(AbilityStatus.NOT_SUPPORTED);
assertThrows(NacosRuntimeException.class, () -> aiGrpcClient.unsubscribeMcpServer("test"));
assertThrows(NacosRuntimeException.class, () -> aiGrpcClient.unsubscribeMcpServer("test", null));
}
@Test