今天我们将在上一课时的基础上,详细展开 ShardingSphere 中分布式事务的具体实现过程。首先,我们将介绍支持强一致性事务的 XAShardingTransactionManager。
XAShardingTransactionManager
让我们回到 ShardingSphere,来到 sharding-transaction-xa-core 工程的 XAShardingTransactionManager 类,该类是分布式事务的 XA 实现类。
我们先来看 XAShardingTransactionManager 类的定义和所包含的变量:
1
2
3
4
5
6
7
|
public final class XAShardingTransactionManager implements ShardingTransactionManager {
private final Map<String, XATransactionDataSource> cachedDataSources = new HashMap<>();
private final XATransactionManager xaTransactionManager = XATransactionManagerLoader.getInstance().getTransactionManager();
}
|
可以看到 XAShardingTransactionManager 实现了上一课时中介绍的 ShardingTransactionManager 接口,并保存着一组 XATransactionDataSource。同时,XATransactionManager 实例的加载仍然是采用了 JDK 中的 ServiceLoader 类,如下所示:
1
2
3
4
5
6
7
8
9
10
11
|
private XATransactionManager load() {
Iterator<XATransactionManager> xaTransactionManagers = ServiceLoader.load(XATransactionManager.class).iterator();
if (!xaTransactionManagers.hasNext()) {
return new AtomikosTransactionManager();
}
XATransactionManager result = xaTransactionManagers.next();
if (xaTransactionManagers.hasNext()) {
log.warn("There are more than one transaction mangers existing, chosen first one by default.");
}
return result;
}
|
XATransactionManager 就是对各种第三方 XA 事务管理器的一种抽象,通过上述代码,可以看到在找不到合适的 XATransactionManager 的情况下,系统默认会创建一个 AtomikosTransactionManager。而这个 XATransactionManager 的定义实际上是位于单独的一个代码工程中,即 sharding-transaction-xa-spi 工程,该接口定义如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
public interface XATransactionManager extends AutoCloseable {
//初始化 XA 事务管理器
void init();
//注册事务恢复资源
void registerRecoveryResource(String dataSourceName, XADataSource xaDataSource);
//移除事务恢复资源
void removeRecoveryResource(String dataSourceName, XADataSource xaDataSource);
//嵌入一个 SingleXAResource 资源
void enlistResource(SingleXAResource singleXAResource);
//返回 TransactionManager
TransactionManager getTransactionManager();
}
|
这些接口方法从命名上基本可以理解其含义,但详细的用法我们还是要结合具体的 XATransactionManager 实现类进行理解。这里我们还发现了一个 SingleXAResource,这个类同样位于 sharding-transaction-xa-spi 工程中,从名称上看,应该是对 JTA 中 XAResource 接口的一种实现,我们来看一下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
public final class SingleXAResource implements XAResource {
private final String resourceName;
private final XAResource delegate;
@Override
public void start(final Xid xid, final int i) throws XAException {
delegate.start(xid, i);
}
@Override
public void commit(final Xid xid, final boolean b) throws XAException {
delegate.commit(xid, b);
}
@Override
public void rollback(final Xid xid) throws XAException {
delegate.rollback(xid);
}
@Override
public boolean isSameRM(final XAResource xaResource) {
SingleXAResource singleXAResource = (SingleXAResource) xaResource;
return resourceName.equals(singleXAResource.getResourceName());
}
…
}
|
可以看到 SingleXAResource 虽然实现了 JTA 的 XAResource 接口,但更像是一个代理类,具体的操作方法还是委托给了内部的 XAResource 进行实现。
接下来,我们将围绕 XA 分布式事务中的几个核心类展开讨论。
1.XADataSource
XADataSource 属于 JDBC 规范中的内容,我们在“03 | 规范兼容:JDBC 规范与 ShardingSphere 是什么关系?”中已经提到过这个接口,该接口的作用就是获取 XAConnection。
那么 XADataSource 是如何构建出来的呢?我们首先找到了一个 XADataSourceFactory 工厂类,显然该类负责生成具体的 XADataSource,如下所示的就是完成这一工作的 build 方法:
1
2
3
4
5
6
7
|
public static XADataSource build(final DatabaseType databaseType, final DataSource dataSource) {
XADataSourceDefinition xaDataSourceDefinition = XADataSourceDefinitionFactory.getXADataSourceDefinition(databaseType);
XADataSource result = createXADataSource(xaDataSourceDefinition);
Properties xaProperties = xaDataSourceDefinition.getXAProperties(SWAPPER.swap(dataSource));
PropertyUtils.setProperties(result, xaProperties);
return result;
}
|
这里首先用到了一个 XADataSourceDefinition 接口,从命名上看应该是关于 XADataSource 的一种定义,如下所示:
1
2
3
4
5
6
7
8
|
public interface XADataSourceDefinition extends DatabaseTypeAwareSPI {
//获取 XA 驱动类名
Collection<String> getXADriverClassName();
//获取 XA 属性
Properties getXAProperties(DatabaseAccessConfiguration databaseAccessConfiguration);
}
|
可以看到这个接口继承了 DatabaseTypeAwareSPI,从命名上看这也是一个 SPI 接口,其定义如下所示:
1
2
3
4
|
public interface DatabaseTypeAwareSPI {
//获取数据库类型
String getDatabaseType();
}
|
在 ShardingSphere 中,继承 DatabaseTypeAwareSPI 接口的就只有 XADataSourceDefinition 接口,而后者存在一批实现类,整体的类层结构如下所示:
XADataSourceDefinition 的实现类
这里以 MySQLXADataSourceDefinition 为例展开讨论,该类分别实现了 DatabaseTypeAwareSPI 和 XADataSourceDefinition 这两个接口中所定义的三个方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
public final class MySQLXADataSourceDefinition implements XADataSourceDefinition {
@Override
public String getDatabaseType() {
return "MySQL";
}
@Override
public Collection<String> getXADriverClassName() {
return Arrays.asList("com.mysql.jdbc.jdbc2.optional.MysqlXADataSource", "com.mysql.cj.jdbc.MysqlXADataSource");
}
@Override
public Properties getXAProperties(final DatabaseAccessConfiguration databaseAccessConfiguration) {
Properties result = new Properties();
result.setProperty("user", databaseAccessConfiguration.getUsername());
result.setProperty("password", Optional.fromNullable(databaseAccessConfiguration.getPassword()).or(""));
result.setProperty("URL", databaseAccessConfiguration.getUrl());
…
return result;
}
}
|
我们从这里得知,作为数据库供应商,MySQL 提供了两个 XADataSource 的驱动程序。而在 getXAProperties 中,我们发现 URL、Username 和 Password 等信息是通过 DatabaseAccessConfiguration 对象进行获取的,该对象在本文后面会介绍到。
另一方面,因为 DatabaseTypeAwareSPI 接口命名中带有 SPI,所以我们不难想象各种 XADataSourceDefinition 实际上也是基于 SPI 机制进行加载的,这在用于获取 XADataSourceDefinition 的工厂类 XADataSourceDefinitionFactory 中可以得到确认:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
public final class XADataSourceDefinitionFactory {
private static final Map<DatabaseType, XADataSourceDefinition> XA_DATA_SOURCE_DEFINITIONS = new HashMap<>();
static {
//通过 ServiceLoader 加载 XADataSourceDefinition
for (XADataSourceDefinition each : ServiceLoader.load(XADataSourceDefinition.class)) {
XA_DATA_SOURCE_DEFINITIONS.put(DatabaseTypes.getActualDatabaseType(each.getDatabaseType()), each);
}
}
public static XADataSourceDefinition getXADataSourceDefinition(final DatabaseType databaseType) {
return XA_DATA_SOURCE_DEFINITIONS.get(databaseType);
}
}
|
同样,在 sharding-transaction-xa-core 工程中,我们也发现了如下所示的 SPI 配置信息:
sharding-transaction-xa-core 工程中的 SPI 配置
当根据数据库类型获取了对应的 XADataSourceDefinition 之后,我们就可以根据 XADriverClassName 来创建具体的 XADataSource:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
private static XADataSource loadXADataSource(final String xaDataSourceClassName) {
Class xaDataSourceClass;
try {
//加载 XADataSource 实现类
xaDataSourceClass = Thread.currentThread().getContextClassLoader().loadClass(xaDataSourceClassName);
} catch (final ClassNotFoundException ignored) {
try {
xaDataSourceClass = Class.forName(xaDataSourceClassName);
} catch (final ClassNotFoundException ex) {
throw new ShardingException("Failed to load [%s]", xaDataSourceClassName);
}
}
try {
return (XADataSource) xaDataSourceClass.newInstance();
} catch (final InstantiationException | IllegalAccessException ex) {
throw new ShardingException("Failed to instance [%s]", xaDataSourceClassName);
}
}
|
这里会先从当前线程的 ContextClassLoader 中加载目标驱动的实现类,如果加载不到,就直接通过反射进行创建,最后返回 XADataSource 的实例对象。
当获取了 XADataSource 的实例对象之后,我们需要设置它的属性,这部分工作是由 DataSourceSwapper 类来完成的。在这里,ShardingSphere 针对不同类型的数据库连接池工具还专门做了一层封装,提取了 DataSourcePropertyProvider 接口用于对 DataSource的URL 、Username 和 Password 等基础信息进行抽象。
DataSourcePropertyProvider 接口的定义如下所示:
1
2
3
4
5
6
|
public interface DataSourcePropertyProvider {
String getDataSourceClassName();
String getURLPropertyName();
String getUsernamePropertyName();
String getPasswordPropertyName();
}
|
DataSourcePropertyProvider 的实现类有两个,一个是 DefaultDataSourcePropertyProvider,另一个是 HikariCPPropertyProvider。ShardingSphere 默认使用的是 HikariCPPropertyProvider,这点可以从如下所示的 SPI 配置文件中得到确认:
DataSourcePropertyProvider 的 SPI 配置
HikariCPPropertyProvider 实现了 DataSourcePropertyProvider 接口,并包含了对这些基础信息的定义:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
public final class HikariCPPropertyProvider implements DataSourcePropertyProvider {
@Override
public String getDataSourceClassName() {
return "com.zaxxer.hikari.HikariDataSource";
}
@Override
public String getURLPropertyName() {
return "jdbcUrl";
}
@Override
public String getUsernamePropertyName() {
return "username";
}
@Override
public String getPasswordPropertyName() {
return "password";
}
}
|
然后在 DataSourceSwapper 的 swap 方法中,实际上就是通过反射来构建 findGetterMethod 工具方法,并以此获取 URL、Username 和 Password 等基础信息,并返回一个 DatabaseAccessConfiguration 对象供具体的 XADataSourceDefinition 进行使用。
swap 方法的实现如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
|
public DatabaseAccessConfiguration swap(final DataSource dataSource) {
DataSourcePropertyProvider provider = DataSourcePropertyProviderLoader.getProvider(dataSource);
try {
String url = (String) findGetterMethod(dataSource, provider.getURLPropertyName()).invoke(dataSource);
String username = (String) findGetterMethod(dataSource, provider.getUsernamePropertyName()).invoke(dataSource);
String password = (String) findGetterMethod(dataSource, provider.getPasswordPropertyName()).invoke(dataSource);
return new DatabaseAccessConfiguration(url, username, password);
} catch (final ReflectiveOperationException ex) {
throw new ShardingException("Cannot swap data source type: `%s`, please provide an implementation from SPI `%s`",
dataSource.getClass().getName(), DataSourcePropertyProvider.class.getName());
}
}
|
至此,我们对 XADataSource 的构建过程描述完毕。这个过程不算复杂,但涉及的类比较多,值得我们以 XADataSourceFactory 为中心画一张类图作为总结:
2.XAConnection
讲完 XADataSource,我们接着来讲 XAConnection,XAConnection 同样是 JDBC 规范中的接口。
负责创建 XAConnection 的工厂类 XAConnectionFactory 如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
public final class XAConnectionFactory {
//基于普通 Connection 创建 XAConnection
public static XAConnection createXAConnection(final DatabaseType databaseType, final XADataSource xaDataSource, final Connection connection) {
switch (databaseType.getName()) {
case "MySQL":
return new MySQLXAConnectionWrapper().wrap(xaDataSource, connection);
case "MariaDB":
return new MariaDBXAConnectionWrapper().wrap(xaDataSource, connection);
case "PostgreSQL":
return new PostgreSQLXAConnectionWrapper().wrap(xaDataSource, connection);
case "H2":
return new H2XAConnectionWrapper().wrap(xaDataSource, connection);
default:
throw new UnsupportedOperationException(String.format("Cannot support database type: `%s`", databaseType));
}
}
}
|
可以看到,相较 XADataSource,创建 XAConnection 的过程就显得直接明了。这里通过一个 switch 语句根据数据库类型分别构建了对应的 ConnectionWrapper,然后再调用 wrap 方法返回 XAConnection。
我们还是以 MySQLXAConnectionWrapper 为例来分析具体的实现过程。
MySQLXAConnectionWrapper 实现了 XAConnectionWrapper 接口,所以我们先来看 XAConnectionWrapper 接口的定义:
1
2
3
4
5
|
public interface XAConnectionWrapper {
//基于 XADataSource 把 Connection 包装成 XAConnection
XAConnection wrap(XADataSource xaDataSource, Connection connection);
}
|
XAConnectionWrapper 接口只有一个方法,即根据传入的 XADataSource 和一个普通 Connection 对象创建出一个新的 XAConnection 对象。XAConnectionWrapper 接口的类层结构如下所示:
XAConnectionWrapper 接口的实现类
MySQLXAConnectionWrapper 中的 warp 方法如下所示:
1
2
3
4
5
6
7
8
9
|
@Override
public XAConnection wrap(final XADataSource xaDataSource, final Connection connection) {
//获取真实 Connection 对象
Connection physicalConnection = unwrapPhysicalConnection(xaDataSource.getClass().getName(), connection);
Method method = xaDataSource.getClass().getDeclaredMethod("wrapConnection", Connection.class);
method.setAccessible(true);
//通过反射包装 Connection 对象
return (XAConnection) method.invoke(xaDataSource, physicalConnection);
}
|
上述方法的流程为先通过 unwrapPhysicalConnection 将传入的 Connection 转变为一个真实的连接对象,然后再基于 XADataSource 的 wrapConnection 方法通过反射对这个物理连接进行包装,从而形成一个 XAConnection 对象。
对于 Mysql 而言,我们在前面的内容中已经知道它有两种 XADataSource 驱动类。而在 MySQLXAConnectionWrapper 我们同样找到了如下所示的这两种驱动类的类名定义:
1
2
3
|
private static final String MYSQL_XA_DATASOURCE_5 = "com.mysql.jdbc.jdbc2.optional.MysqlXADataSource";
private static final String MYSQL_XA_DATASOURCE_8 = "com.mysql.cj.jdbc.MysqlXADataSource";
|
显然,根据数据库版本的不同,这两个驱动类的行为也有所不同。因此,unwrapPhysicalConnection 的处理过程如下所示:
1
2
3
4
5
6
7
8
9
10
|
private Connection unwrapPhysicalConnection(final String xaDataSourceClassName, final Connection connection) {
switch (xaDataSourceClassName) {
case MYSQL_XA_DATASOURCE_5:
return (Connection) connection.unwrap(Class.forName("com.mysql.jdbc.Connection"));
case MYSQL_XA_DATASOURCE_8:
return (Connection) connection.unwrap(Class.forName("com.mysql.cj.jdbc.JdbcConnection"));
default:
throw new UnsupportedOperationException(String.format("Cannot support xa datasource: `%s`", xaDataSourceClassName));
}
}
|
作为对比,我们再来看 PostgreSQLXAConnectionWrapper,它的 wrap 方法则比较简单,如下所示。显然,这部分内容的理解需要对不同的数据库驱动有一定的了解。
1
2
3
4
|
public XAConnection wrap(final XADataSource xaDataSource, final Connection connection) {
BaseConnection physicalConnection = (BaseConnection) connection.unwrap(Class.forName("org.postgresql.core.BaseConnection"));
return new PGXAConnection(physicalConnection);
}
|
3.XATransactionDataSource
介绍完了 XADataSource 和 XAConnection 的创建过程之后,让我们回到 XAShardingTransactionManager,我们发现这里用到的 DataSource 并不是 JDBC 中原生的 XADataSource,而是一种 XATransactionDataSource。
我们来到这个 XATransactionDataSource 类,该类的变量和构造函数如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
private final DatabaseType databaseType;
private final String resourceName;
private final DataSource dataSource;
private XADataSource xaDataSource;
private XATransactionManager xaTransactionManager;
public XATransactionDataSource(final DatabaseType databaseType, final String resourceName, final DataSource dataSource, final XATransactionManager xaTransactionManager) {
this.databaseType = databaseType;
this.resourceName = resourceName;
this.dataSource = dataSource;
if (!CONTAINER_DATASOURCE_NAMES.contains(dataSource.getClass().getSimpleName())) {
this.xaDataSource = XADataSourceFactory.build(databaseType, dataSource);
this.xaTransactionManager = xaTransactionManager;
xaTransactionManager.registerRecoveryResource(resourceName, xaDataSource);
}
}
|
上述变量我们都认识,而在构造函数中,调用了 XATransactionManager 类中的 registerRecoveryResource 方法将构建的 XADataSource 作为一种资源进行注册。
然后,我们来看 XATransactionDataSource 中的核心方法 getConnection,如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
public Connection getConnection() throws SQLException, SystemException, RollbackException {
if (CONTAINER_DATASOURCE_NAMES.contains(dataSource.getClass().getSimpleName())) {
return dataSource.getConnection();
}
//从DataSource中 构建一个 Connection
Connection result = dataSource.getConnection();
//通过 XAConnectionFactory 创建一个 XAConnection
XAConnection xaConnection = XAConnectionFactory.createXAConnection(databaseType, xaDataSource, result);
//从 XATransactionManager 中获取 Transaction 对象
final Transaction transaction = xaTransactionManager.getTransactionManager().getTransaction();
//判当前线程中是否存在这个 Transaction
if (!enlistedTransactions.get().contains(transaction)) {
//将 XAConnection 中的 XAResource 与目标 Transaction 对象关联起来
transaction.enlistResource(new SingleXAResource(resourceName, xaConnection.getXAResource()));
//Transaction 中注册一个 Synchronization 接口
transaction.registerSynchronization(new Synchronization() {
@Override
public void beforeCompletion() {
enlistedTransactions.get().remove(transaction);
}
@Override
public void afterCompletion(final int status) {
enlistedTransactions.get().clear();
}
});
//将该 Transaction 对象放入到当前线程中
enlistedTransactions.get().add(transaction);
}
return result;
}
|
这里先从 DataSource 中构建一个 Connection,然后传入到 XAConnectionFactory 中创建一个 XAConnection,接着从 XATransactionManager 中获取 Transaction 对象。请注意在 XATransactionDataSource 中存在一个 ThreadLocal 变量 enlistedTransactions,用于保存当前线程所涉及的 Transaction 对象列表:
1
2
3
4
5
6
|
private final ThreadLocal<Set<Transaction>> enlistedTransactions = new ThreadLocal<Set<Transaction>>() {
@Override
public Set<Transaction> initialValue() {
return new HashSet<>();
}
};
|
在上述方法中,当从 XATransactionManager 中获取 Transaction 对象之后,会先判断 enlistedTransactions中 是否存在该 Transaction 对象,如果没有,则将 XAConnection 中的 XAResource 与目标 Transaction 对象关联起来。
然后我们再来看 Transaction 对象的 registerSynchronization 方法的使用方法,该方法注册了一个 Synchronization 接口,该接口包含了 beforeCompletion 和 afterCompletion 这两个方法。
在二阶段提交之前,TransctionManager 会调用 Synchronization 接口的 beforeCompletion 方法;而当事务结束时,TransctionManager 会调用 Synchronization 接口的 afterCompletion方法。我们在 getConnection 方法中看到这两个方法的应用。最终,我们把该 Transaction 对象放入到线程安全的 enlistedTransactions 中。
最后,我们来看一下 XATransactionDataSource 的 close 方法,如下所示:
1
2
3
4
5
6
7
8
|
@Override
public void close() {
if (!CONTAINER_DATASOURCE_NAMES.contains(dataSource.getClass().getSimpleName())) {
xaTransactionManager.removeRecoveryResource(resourceName, xaDataSource);
} else {
close(dataSource);
}
}
|
可以看到,这里调用了 XATransactionManager 的 removeRecoveryResource 方法将资源进行移出。
至此,基于 XATransactionDataSource 获取 Connection 的过程也介绍完毕。关于 XATransactionManager的 具体内容我们放在下一课时中继续进行探讨。
从源码解析到日常开发
ShardingSphere 作为一款完全兼容 JDBC 规范的分布式数据库中间件,同样完成了针对分布式事务中的相关对象的兼容。今天的课程中,进一步强化了我们对 JDBC 规范的理解和如何扩展JDBC 规范中核心接口的方法。同时,在 MySQLXAConnectionWrapper 这个 Wrapper 类中,我们也再次看到使用反射技术创建 XAConnection 对象的实现方法。这些开发技巧都值得我们进行学习和应用。
小结与预告
分布式事务是一个相对复杂的概念,ShardingSphere 中提供了强一致性和最终一致性两种实现方案。今天的内容我们围绕基于 XA 协议的分片事务管理器 XAShardingTransactionManager 展开了讨论,在理解 XAShardingTransactionManager 中 XADataSource、XAConnection 等核心对象时,重点还是需要站在 JDBC 规范的基础上,掌握与分布式事务集成和兼容的整个过程,本课时对这一过程进行了详细的介绍。
这里给你留一道思考题:ShardingSphere 中对分布式环境下的强一致性事务做了哪些维度的抽象?欢迎你在留言区与大家讨论,我将逐一点评解答。
XAShardingTransactionManager 的内容很多,下一课时,我们将在今天课时的基础上,继续探讨 XAShardingTransactionManager 的剩余部分内容,以及 ShardingSphere 中另一个分片事务管理器 SeataATShardingTransactionManager。
课程评价入口,挑选 5 名小伙伴赠送小礼品~
-– ### 精选评论