Support direct shard database operation routing in Spring JDBC (#31506)
Introduce ShardingKeyDataSourceAdapter to get shard connections. This commit introduces a DataSource proxy, that changes the behavior of the getConnection method to use the `createConnectionBuilder()` api to acquire direct shard connections. The shard connection is acquired by specifying a `ShardingKey` that is correspondent to the wanted shard.
This commit is contained in:
parent
d919930d83
commit
e4e2224449
|
|
@ -0,0 +1,38 @@
|
|||
package org.springframework.jdbc.core;
|
||||
|
||||
import java.sql.SQLException;
|
||||
import java.sql.ShardingKey;
|
||||
|
||||
import org.springframework.lang.Nullable;
|
||||
|
||||
/**
|
||||
* Interface defines methods for retrieving sharding keys, which are used to establish
|
||||
* direct shard connections (in the context of sharded databases). This is used as a
|
||||
* way of providing the sharding key in
|
||||
* {@link org.springframework.jdbc.datasource.ShardingKeyDataSourceAdapter}.
|
||||
*
|
||||
* @author Mohamed Lahyane (Anir)
|
||||
*/
|
||||
|
||||
|
||||
public interface ShardingKeyProvider {
|
||||
/**
|
||||
* Retrieves the sharding key. This method returns the sharding key relevant to the current context,
|
||||
* which will be used to obtain a direct shard connection.
|
||||
*
|
||||
* @return The sharding key, or null if it is not available or cannot be determined.
|
||||
* @throws SQLException If an error occurs while obtaining the sharding key.
|
||||
*/
|
||||
@Nullable
|
||||
ShardingKey getShardingKey() throws SQLException;
|
||||
|
||||
/**
|
||||
* Retrieves the super sharding key. This method returns the super sharding key relevant to the
|
||||
* current context, which will be used to obtain a direct shard connection.
|
||||
*
|
||||
* @return The super sharding key, or null if it is not available or cannot be determined.
|
||||
* @throws SQLException If an error occurs while obtaining the super sharding key.
|
||||
*/
|
||||
@Nullable
|
||||
ShardingKey getSuperShardingKey() throws SQLException;
|
||||
}
|
||||
|
|
@ -0,0 +1,130 @@
|
|||
package org.springframework.jdbc.datasource;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.ConnectionBuilder;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.ShardingKey;
|
||||
import java.sql.ShardingKeyBuilder;
|
||||
|
||||
import javax.sql.DataSource;
|
||||
|
||||
import org.springframework.core.NamedThreadLocal;
|
||||
import org.springframework.jdbc.core.ShardingKeyProvider;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
/**
|
||||
* An adapter for a target {@link DataSource}, designed to apply sharding keys, if specified,
|
||||
* to every standard {@code getConnection()} call, returning a direct connection to the shard
|
||||
* corresponding to the specified sharding key value. All other methods are simply delegated
|
||||
* to the corresponding methods of the target DataSource.
|
||||
*
|
||||
* <p>The target {@link DataSource} must implement the {@code createConnectionBuilder()} method;
|
||||
* otherwise, a {@link java.sql.SQLFeatureNotSupportedException} will be thrown when attempting
|
||||
* to acquire shard connections.</p>
|
||||
*
|
||||
* <p>This proxy datasource takes a {@link ShardingKeyProvider} object as an attribute,
|
||||
* which is used to get the sharding keys.</p>
|
||||
*
|
||||
* @author Mohamed Lahyane (Anir)
|
||||
* @see #getConnection
|
||||
* @see #createConnectionBuilder()
|
||||
* @see UserCredentialsDataSourceAdapter
|
||||
*/
|
||||
public class ShardingKeyDataSourceAdapter extends DelegatingDataSource {
|
||||
@Nullable
|
||||
private ShardingKeyProvider shardingkeyProvider;
|
||||
|
||||
/**
|
||||
* Creates a new instance of ShardingKeyDataSourceAdapter, wrapping the given {@link DataSource}.
|
||||
*
|
||||
* @param dataSource the target DataSource to be wrapped.
|
||||
*/
|
||||
public ShardingKeyDataSourceAdapter(DataSource dataSource) {
|
||||
super(dataSource);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new instance of ShardingKeyDataSourceAdapter, wrapping the given {@link DataSource}.
|
||||
*
|
||||
* @param dataSource the target DataSource to be wrapped.
|
||||
* @param shardingKeyProvider the ShardingKeyProvider used to get the shardingKeys.
|
||||
*/
|
||||
public ShardingKeyDataSourceAdapter(DataSource dataSource, ShardingKeyProvider shardingKeyProvider) {
|
||||
super(dataSource);
|
||||
this.shardingkeyProvider = shardingKeyProvider;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the {@link ShardingKeyProvider} for this adapter.
|
||||
*
|
||||
* @param shardingKeyProvider the ShardingKeyProvider to set.
|
||||
*/
|
||||
public void setShardingKeyProvider(ShardingKeyProvider shardingKeyProvider) {
|
||||
this.shardingkeyProvider = shardingKeyProvider;
|
||||
}
|
||||
|
||||
/**
|
||||
* Obtains a connection to the database shard using the provided sharding key
|
||||
* and super sharding key (if available).
|
||||
* <p>the sharding key is obtained from the thread local storage, if is {@code null},
|
||||
* it is obtained from the {@link ShardingKeyProvider}.</p>
|
||||
*
|
||||
* @return a Connection object representing a direct shard connection.
|
||||
* @throws SQLException if an error occurs while creating the connection.
|
||||
* @see #createConnectionBuilder()
|
||||
*/
|
||||
@Override
|
||||
public Connection getConnection() throws SQLException {
|
||||
return createConnectionBuilder().build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Obtains a connection to the database shard using the provided username and password,
|
||||
* considering the sharding keys (if available) and the given credentials.
|
||||
*
|
||||
* @param username the database user on whose behalf the connection is being made.
|
||||
* @param password the user's password.
|
||||
* @return a Connection object representing a direct shard connection.
|
||||
* @throws SQLException if an error occurs while creating the connection.
|
||||
*/
|
||||
@Override
|
||||
public Connection getConnection(String username, String password) throws SQLException {
|
||||
return createConnectionBuilder().user(username).password(password).build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new instance of {@link ConnectionBuilder} using the target DataSource's
|
||||
* {@code createConnectionBuilder()} method, and sets the appropriate sharding keys
|
||||
* from the thread-local storage or the {@link ShardingKeyProvider}.
|
||||
*
|
||||
* @return a ConnectionBuilder object representing a builder for direct shard connections.
|
||||
* @throws SQLException if an error occurs while creating the ConnectionBuilder.
|
||||
*/
|
||||
@Override
|
||||
public ConnectionBuilder createConnectionBuilder() throws SQLException {
|
||||
ConnectionBuilder connectionBuilder = obtainTargetDataSource().createConnectionBuilder();
|
||||
|
||||
ShardingKey shardingKey = null;
|
||||
ShardingKey superShardingKey = null;
|
||||
|
||||
if (shardingkeyProvider != null) {
|
||||
shardingKey = shardingkeyProvider.getShardingKey();
|
||||
superShardingKey = shardingkeyProvider.getSuperShardingKey();
|
||||
}
|
||||
|
||||
return connectionBuilder.shardingKey(shardingKey).superShardingKey(superShardingKey);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new instance of {@link ShardingKeyBuilder} using the target DataSource's
|
||||
* {@code createShardingKeyBuilder()} method.
|
||||
*
|
||||
* @return a ShardingKeyBuilder object representing a builder for sharding keys.
|
||||
* @throws SQLException if an error occurs while creating the ShardingKeyBuilder.
|
||||
*/
|
||||
@Override
|
||||
public ShardingKeyBuilder createShardingKeyBuilder() throws SQLException {
|
||||
return obtainTargetDataSource().createShardingKeyBuilder();
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,97 @@
|
|||
package org.springframework.jdbc.datasource;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.ConnectionBuilder;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.ShardingKey;
|
||||
|
||||
import javax.sql.DataSource;
|
||||
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import org.springframework.jdbc.core.ShardingKeyProvider;
|
||||
|
||||
import static org.assertj.core.api.Assertions.*;
|
||||
import static org.mockito.BDDMockito.*;
|
||||
|
||||
public class ShardingKeyDataSourceAdapterTests {
|
||||
private final Connection connection = mock();
|
||||
private final Connection shardConnection = mock();
|
||||
private final DataSource dataSource = mock();
|
||||
private final ConnectionBuilder connectionBuilder = mock(ConnectionBuilder.class, RETURNS_DEEP_STUBS);
|
||||
private final ConnectionBuilder shardConnectionBuilder = mock(ConnectionBuilder.class, RETURNS_DEEP_STUBS);
|
||||
private final ShardingKey shardingKey = mock();
|
||||
private final ShardingKey superShardingKey = mock();
|
||||
private final ShardingKeyProvider shardingKeyProvider = new ShardingKeyProvider() {
|
||||
@Override
|
||||
public ShardingKey getShardingKey() throws SQLException {
|
||||
return shardingKey;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ShardingKey getSuperShardingKey() throws SQLException {
|
||||
return superShardingKey;
|
||||
}
|
||||
};
|
||||
|
||||
@BeforeEach
|
||||
public void setUp() throws SQLException {
|
||||
given(dataSource.createConnectionBuilder()).willReturn(connectionBuilder);
|
||||
when(connectionBuilder.shardingKey(null).superShardingKey(null)).thenReturn(connectionBuilder);
|
||||
when(connectionBuilder.shardingKey(shardingKey).superShardingKey(superShardingKey))
|
||||
.thenReturn(shardConnectionBuilder);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetConnectionNoKeyProvider() throws SQLException {
|
||||
ShardingKeyDataSourceAdapter dataSourceAdapter = new ShardingKeyDataSourceAdapter(dataSource);
|
||||
|
||||
when(connectionBuilder.build()).thenReturn(connection);
|
||||
|
||||
assertThat(dataSourceAdapter.getConnection()).isEqualTo(connection);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetConnectionWithKeyProvider() throws SQLException {
|
||||
|
||||
ShardingKeyDataSourceAdapter dataSourceAdapter = new ShardingKeyDataSourceAdapter(
|
||||
dataSource,
|
||||
shardingKeyProvider);
|
||||
|
||||
when(shardConnectionBuilder.build()).thenReturn(shardConnection);
|
||||
|
||||
assertThat(dataSourceAdapter.getConnection()).isEqualTo(shardConnection);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetConnectionWithCredentialsNoKeyProvider() throws SQLException {
|
||||
ShardingKeyDataSourceAdapter dataSourceAdapter = new ShardingKeyDataSourceAdapter(dataSource);
|
||||
|
||||
String username = "Anir";
|
||||
String password = "spring";
|
||||
|
||||
Connection userConnection = mock();
|
||||
|
||||
when(connectionBuilder.user(username).password(password).build()).thenReturn(userConnection);
|
||||
|
||||
assertThat(dataSourceAdapter.getConnection(username, password)).isEqualTo(userConnection);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetConnectionWithCredentialsAndKeyProvider() throws SQLException {
|
||||
ShardingKeyDataSourceAdapter dataSourceAdapter = new ShardingKeyDataSourceAdapter(
|
||||
dataSource,
|
||||
shardingKeyProvider);
|
||||
|
||||
String username = "mbekraou";
|
||||
String password = "jdbc";
|
||||
|
||||
Connection userWithKeyProviderConnection = mock();
|
||||
|
||||
when(shardConnectionBuilder.user(username).password(password).build())
|
||||
.thenReturn(userWithKeyProviderConnection);
|
||||
|
||||
assertThat(dataSourceAdapter.getConnection(username, password)).isEqualTo(userWithKeyProviderConnection);
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue