距离上一次更新该文章已经过了 697 天,文章所描述的內容可能已经发生变化,请留意。
分布式事务
<谨供参考>
分布式事务顾名思义就是要在分布式系统中实现事务,它其实是由多个本地事务组合而成。
关于分布式事务目前也有许多种解决方案,常说的几种如2pc,3pc,TCC,本地消息表,消息事务,最大努力通知
关于几种方案的介绍可以看下敖丙的文章<分布式事务的六种解决方案>
无论是哪种解决方案,目的都是希望保证多个系统的事务方法统一提交,要么全成,全么全败(原子性),所以这篇文章指在建立这样的想法上,手写一个分布式事务解决方案的demo,以为之后的分布式事务框架以及知识学习做积累
在开发之前需要知道一点,spring的事务管理是基于(jdbc/java.sql.xxx)进行拓展,所以我们可以从这里着手


图中画了四个角色
- 服务调用者(server1)
- 服务被调用者(server2)
- 全局事务管理者(tx-manager)
- 中间协调者(global-tx)-负责和tx-manager交互的
整个调用链路如下:
1 2 3 4 5 6 7 8 9 10
| 1. 访问server1接口,server1做update操作 2. 在server1做update之前先获取@Xwtransactional的value值,判断它是不是事务组的开始方,如果是,则先创建一个全局事务组,然后将自己加入此事务组中 3. 走XwDataSourceAspect获取到我们自己的XwConnection 4. 调用server2执行update操作 5. 然后再走一遍刚才server1的过程,只不过server2是事务组的结束方,所以它直接去取刚才的那条事务组并加入进去即可(ThreadLocal) 6. server2执行完本地方法后调用XwConnection.commit,要提交的时候先不提交,等TxManager的通知再提交(这里为了不影响后续的操作,使用守护线程wait的方式) 7. XwTransactionManager.addXwTransaction,组装数据发送到tx-manager 8. tx-manager判断是否已经接收到结束事务的标记,比较事务是否已经全部到达,如果已经全部到达则看是否需要回滚,然后发送回global-tx,由global-tx唤醒当前XwTransaction,执行commit/rollback操作 9. server2结束调用,回到server1 10. 执行剩余方法,随后再走一遍server2的过程(6-8)
|
server1 和server2 的代码很简单,就是两个update数据库的操作
- tx-manager,负责和中间协调者通信交互 1. 接收并保存每个分布式事务 2.通知每个子事务进行提交或者回滚,这里使用netty进行数据的交互通信
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
|
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
private static Map<String, List<String>> transactionTypeMap = new HashMap<>(); private static Map<String, Boolean> isEndMap = new HashMap<>(); private static Map<String, Integer> transactionCountMap = new HashMap<>();
[@Override](https: public void handlerAdded(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); channelGroup.add(ctx.channel()); }
@Override public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("接收数据:" + msg.toString()); JSONObject jsonObject = JSON.parseObject((String) msg); String command = jsonObject.getString("command"); String groupId = jsonObject.getString("groupId"); String transactionType = jsonObject.getString("transactionType"); Integer transactionCount = jsonObject.getInteger("transactionCount"); Boolean isEnd = jsonObject.getBoolean("isEnd"); if ("create".equals(command)) { transactionTypeMap.put(groupId, new ArrayList<>()); } else if ("add".equals(command)) { transactionTypeMap.get(groupId).add(transactionType); if (isEnd) { isEndMap.put(groupId, true); transactionCountMap.put(groupId, transactionCount); } JSONObject result = new JSONObject(); result.put("groupId", groupId); if (isEndMap.get(groupId) && transactionCountMap.get(groupId).equals(transactionTypeMap.get(groupId).size())) { if (transactionTypeMap.get(groupId).contains("rollback")){ result.put("command", "rollback"); sendResult(result); } else { result.put("command", "commit"); sendResult(result); } } } } private void sendResult(JSONObject result) { for (Channel channel : channelGroup) { System.out.println("发送数据:" + result.toJSONString()); channel.writeAndFlush(result.toJSONString()); } } }
|
XwTransactionAspect
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| @Around("@annotation(org.xiaowu.txmanager.annotation.Xwtransactional)") public void invoke(ProceedingJoinPoint point) { MethodSignature signature = (MethodSignature) point.getSignature(); Method method = signature.getMethod(); Xwtransactional lbAnnotation = method.getAnnotation(Xwtransactional.class); String groupId = ""; if (lbAnnotation.isStart()) {
groupId = XwTransactionManager.createXwTransactionGroup(); } else {
groupId = XwTransactionManager.getCurrentGroupId(); }
XwTransaction xwTransaction = XwTransactionManager.createXwTransaction(groupId);
try { point.proceed(); XwTransactionManager.addXwTransaction(xwTransaction, lbAnnotation.isEnd(), TransactionType.commit); } catch (Exception e) { XwTransactionManager.addXwTransaction(xwTransaction, lbAnnotation.isEnd(), TransactionType.rollback); e.printStackTrace(); } catch (Throwable throwable) { XwTransactionManager.addXwTransaction(xwTransaction, lbAnnotation.isEnd(), TransactionType.rollback); throwable.printStackTrace(); } }
|
XwTransactionManager
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
|
public static String createXwTransactionGroup() { String groupId = UUID.randomUUID().toString(); JSONObject jsonObject = new JSONObject(); jsonObject.put("groupId", groupId); jsonObject.put("command", "create"); nettyClient.send(jsonObject); System.out.println("创建事务组");
currentGroupId.set(groupId); return groupId; }
public static XwTransaction createXwTransaction(String groupId) { String transactionId = UUID.randomUUID().toString(); XwTransaction xwTransaction = new XwTransaction(groupId, transactionId); XW_TRANSACTION_MAP.put(groupId, xwTransaction); currentXwTransaction.set(xwTransaction); addTransactionCount();
System.out.println("创建事务");
return xwTransaction; }
public static XwTransaction addXwTransaction(XwTransaction xwTransaction, Boolean isEnd, TransactionType transactionType) { JSONObject jsonObject = new JSONObject(); jsonObject.put("groupId", xwTransaction.getGroupId()); jsonObject.put("transactionId", xwTransaction.getTransactionId()); jsonObject.put("transactionType", transactionType); jsonObject.put("command", "add"); jsonObject.put("isEnd", isEnd); jsonObject.put("transactionCount", XwTransactionManager.getTransactionCount()); nettyClient.send(jsonObject); System.out.println("添加事务"); return xwTransaction; }
|
XwConnection
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| public void commit() throws SQLException {
new Thread(() -> { try { System.out.println("commit wait..."); xwTransaction.getTask().waitTask(); if (xwTransaction.getTransactionType().equals(TransactionType.rollback)) { connection.rollback(); } else { connection.commit(); } connection.close(); } catch (SQLException e) { e.printStackTrace(); } }).start(); }
@Override public void rollback() throws SQLException { new Thread(() -> { try { System.out.println("rollback wait..."); xwTransaction.getTask().waitTask(); connection.rollback(); connection.close(); } catch (SQLException e) { e.printStackTrace(); } }).start();
}
@Override public void close() throws SQLException {
}
|
还有一些小注意的点,比如server1在调用server2时候,为了让server2知道groupid,可以把它放在header里.


源码仓库地址: 分布式事务学习