博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
利用AOP写2PC框架(二)
阅读量:5235 次
发布时间:2019-06-14

本文共 14666 字,大约阅读时间需要 48 分钟。

AOP的底层已经封装好了以后,我们就要开始针对应用层写具体的业务逻辑了。

也就是说我们需要有个类继承于AopProxyBase,并且重写其After,Bofore以达到我们的拦截记录的功能。代码如下:

public class TransactionProxy : AopProxyBase    {        public TransactionProxy(MarshalByRefObject obj, Type type)            : base(obj, type)        { }        public override void Before(System.Runtime.Remoting.Messaging.IMessage requestMsg, AopMethodAttribute[] attrs)        {        }        public override void After(System.Runtime.Remoting.Messaging.IMessage requestMsg, System.Runtime.Remoting.Messaging.IMessage Respond, AopMethodAttribute[] attrs)        {            foreach (var attr in attrs)            {                if (attr is LogicRollBackTransAttribute)                {                    return;                }            }            var args = requestMsg.Properties["__Args"] as object[];            string methodName = requestMsg.Properties["__MethodName"] as string;            CustomTransaction customTrans = null;            List list = new List();            customTrans = CallContext.GetData(TransKey.CustomTransKey) as CustomTransaction;            if (customTrans != null)            {                list.AddRange(args);                TransactionUnit unit = AppTransactionManage.Instance.GetRollBackInfo(methodName);                if (unit != null)                {                    unit.Argments = list;                    unit.Mark = customTrans.Mark;                }                customTrans.Compensation.Add(unit);                TransQueueManage.Instance.Push                (                    new Model.BankTransLog                    {                        Mark = unit.Mark,                        MethodName = methodName,                        ParamsConfig = JsonHelper.ToJson(unit.Argments),                        Status = 0,                        Type = 0                    }                );                CallContext.SetData(TransKey.CustomTransKey, customTrans);                var outArgs = Respond.Properties["__OutArgs"] as object[];                IDbTransaction dbTrans;                foreach (var attr in attrs)                {                    if (attr is DbTransAttribute || attr is LogicTransAttribute)                    {                        if (outArgs != null)                        {                            foreach (var arg in outArgs)                            {                                if (arg is IDbTransaction)                                {                                    dbTrans = arg as IDbTransaction;                                    if (customTrans != null)                                    {                                        customTrans.AddDbTransaction(dbTrans);                                    }                                }                            }                        }                    }                }            }        }    }
View Code

在After的地方,我们可以看到,我们做了一次LogicRollBackTransAttribute的判定,避免在回调的时候,又再走一次拦截和记录的流程。

同时做了DbTransAttribute和LogicTransAttribute的判定。因为我把事务分为两类,一类是db本身自己控制的,可以直接rollback的,一类是logic的,需要我们去手动通过逻辑回滚的。代码如下:

[AttributeUsage(AttributeTargets.Method)]    public class LogicTransAttribute : AopMethodAttribute    {        public string MethodName { get; set; }        public LogicTransAttribute()        {        }        public LogicTransAttribute(string name)        {            this.MethodName = name;        }    }[AttributeUsage(AttributeTargets.Method)]    public class DbTransAttribute : AopMethodAttribute    {            }
View Code

同时可以看到,我把每一个函数的调用作为一个单元,用TransactionUnit类来保存,代码如下:

public class TransactionUnit    {        public object InstanceObject;        ///         /// 执行的方法        ///         public MethodInfo Forward;        ///         /// 失败回滚的方法        ///         public MethodInfo Rollback;        ///         /// 参数        ///         public IList Argments;        ///         /// 唯一标识        ///         public string Mark;    }
View Code

因为,一个事务里面,可能包含了多次操作redis,或者多次操作db,为了保证线程安全,同时又需要避开锁,我用了CallContext将一个线程里面的一段事务,保存在其线程上下文中。在保存一个完整的TransactionUnit的时候,不可能每一次都去通过反射去取MethodInfo,所以又增加了一段初始化和字典来保存其MethodInfo。代码如下:

public class AppTransactionManage    {        private Dictionary
_transMaps; static AppTransactionManage() { } private AppTransactionManage() { if (this._transMaps == null) { this._transMaps = new Dictionary
(); } } private static AppTransactionManage _instance; public static AppTransactionManage Instance { get { if (_instance == null) { _instance = new AppTransactionManage(); } return _instance; } } public TransactionUnit GetRollBackInfo(string methodName) { if (this._transMaps == null) throw new ArgumentNullException("not init"); if (this._transMaps.ContainsKey(methodName)) { return this._transMaps[methodName]; } return null; } public void Init(params string[] assembly) { this.Init(2, assembly); } public void Init(int threadNum, params string[] assembly) { if (assembly != null) { foreach (string s in assembly) { var ass = Assembly.Load(s); if (ass != null) { var types = ass.GetTypes(); foreach (var type in types) { var transAttr = type.GetCustomAttribute(typeof(TransactionAttribute), false) as TransactionAttribute; if (transAttr != null) { var methods = type.GetMethods(); foreach (var method in methods) { var forwardTrans = method.GetCustomAttribute(typeof(LogicTransAttribute), false) as LogicTransAttribute; var rollbackTrans = method.GetCustomAttribute(typeof(LogicRollBackTransAttribute), false) as LogicRollBackTransAttribute; TransactionUnit unit; if (forwardTrans != null) { if (!this._transMaps.TryGetValue(forwardTrans.MethodName, out unit)) { unit = new TransactionUnit(); } unit.Forward = method; unit.InstanceObject = Activator.CreateInstance(type); this._transMaps[forwardTrans.MethodName] = unit; } if (rollbackTrans != null) { if (!this._transMaps.TryGetValue(rollbackTrans.MethodName, out unit)) { unit = new TransactionUnit(); } unit.Rollback = method; unit.InstanceObject = Activator.CreateInstance(type); this._transMaps[rollbackTrans.MethodName] = unit; } } } } } } } TransQueueManage.Instance.Init( (t) => { BankTransLogBLL.Instance.Add(t); }, threadNum ); } }
View Code

为了友好开发者的调用,可以让其像使用SqlTransaction一样来使用,我又对外公开了一个CustomTranstion,将调用方式封装在这个类里面,代码如下:

public class CustomTransaction : IDisposable    {        private List
_dbTransactions; private bool _isRollBack = true; ///
/// 补偿机制 /// public List
Compensation; public void Commit() { if (this._dbTransactions != null) { this._dbTransactions.ForEach((t) => t.Commit()); } this._isRollBack = false; } public void RollBack() { if (this.Compensation != null) { this.Compensation.ForEach((t) => { object[] paramsArray = t.Argments == null ? null : t.Argments.ToArray(); t.Rollback.Invoke(t.InstanceObject, paramsArray); }); } if (this._dbTransactions != null) { this._dbTransactions.ForEach((t) => t.Rollback()); } } private bool _isRetry = true; public CustomTransaction(bool isRetry = true) { this._isRetry = isRetry; if (this._dbTransactions == null) { this._dbTransactions = new List
(); } if (this.Compensation == null) { this.Compensation = new List
(); } CallContext.SetData(TransKey.CustomTransKey, this); } public void AddDbTransaction(IDbTransaction transaction) { this._dbTransactions.Add(transaction); } public void Dispose() { if (this._isRollBack) { this.RollBack(); } CallContext.FreeNamedDataSlot(TransKey.CustomTransKey); } }
View Code

 这个时候,你就可以像是用SqlTransaction一样去Using(var trans = new CustomTranstion()){}然后在using里面去写trans.Commit();来提交所有的事务操作,如果不做Commit操作的话,在CustomTranstion里面,会自动去调用其rollback()操作。

但是这并没有完,所有的只是记录下来了,但是并没有保存到DB去做持久化。这个时候就需要增加一个队列,来不断的去将TransactionUnit来保存到db,同时又需要把队列去做持久化,避免一些意外原因,导致队列数据丢失,而缺失了这部分的日志记录(虽然我个人认为这一部分可以省略)。代码如下:

[Serializable]    public class TransQueue : IDisposable    {        public Queue
_transQueue; public Action
ExecuteAction; private Thread _thread; private bool _isDispose; public delegate void PersistenceHandler(Model.BankTransLog[] models); PersistenceHandler persistenceHandler; private readonly object _syncObject = new object(); public TransQueue() { if (_transQueue == null) { _transQueue = new Queue
(); } if (persistenceHandler == null) { persistenceHandler = PersistenceToDisk; } if (_thread == null) { _thread = new Thread(Thread_Work) { IsBackground = true }; } _thread.Start(); } public void Push(Model.BankTransLog model) { if (_transQueue == null) throw new ArgumentNullException("transQueue is not init"); lock (_syncObject) { _transQueue.Enqueue(model); } } public void Thread_Work() { while (!_isDispose) { Model.BankTransLog[] items = null; if (_transQueue != null && _transQueue.Count > 0) { lock (_syncObject) { items = new Model.BankTransLog[_transQueue.Count]; _transQueue.CopyTo(items, 0); _transQueue.Clear(); } } if (items != null && items.Length > 0) { persistenceHandler.BeginInvoke(items, PersistenceHandlerCallBack, persistenceHandler); foreach (var item in items) { if (ExecuteAction != null) { ExecuteAction.Invoke(item); } } } Thread.Sleep(1000); } } public void PersistenceHandlerCallBack(IAsyncResult result) { try { (result.AsyncState as PersistenceHandler).EndInvoke(result); } catch (Exception e) { } } public void PersistenceToDisk(Model.BankTransLog[] items) { try { BinaryHelper.SaveToFile(items); } catch (Exception e) { } } public void Dispose() { _isDispose = true; _thread.Join(); } }public class TransQueueManage { private int _threadNumber = 2; private TransQueue[] _transQueue; Random random = new Random(); public Action
ExecuteAction; private TransQueueManage() { } static TransQueueManage() { } public void Init(Action
action, int threadNum = 2) { if (_transQueue == null) { this._threadNumber = threadNum; _transQueue = new TransQueue[threadNum]; for (var i = 0; i < threadNum; i++) { _transQueue[i] = new TransQueue(); _transQueue[i].ExecuteAction = action; } } } private static readonly object _syncObject = new object(); private static TransQueueManage _instance; public static TransQueueManage Instance { get { if (_instance == null) { lock (_syncObject) { if (_instance == null) { _instance = new TransQueueManage(); } } } return _instance; } } public void Push(Model.BankTransLog model) { var index = GetRandomThreadIndex(); _transQueue[index].Push(model); } public int GetRandomThreadIndex() { return random.Next(0, this._threadNumber); } }
View Code

 

转载于:https://www.cnblogs.com/selfteam/p/4016054.html

你可能感兴趣的文章
day22 01 初识面向对象----简单的人狗大战小游戏
查看>>
递归函数,二分运算,正则表达式
查看>>
Flutter之内置动画(转)
查看>>
MySql优化相关概念的理解笔记
查看>>
数据库解决方案
查看>>
DataContract和DataMember的作用
查看>>
js如何获取response header信息
查看>>
python_文件的打开和关闭
查看>>
ADO.NET介绍
查看>>
iOS: 数据持久化方案
查看>>
【C#】【Thread】Monitor和Lock
查看>>
UVALive - 3635 - Pie(二分)
查看>>
集合类List,set,Map 的遍历方法,用法和区别
查看>>
Scala入门系列(十):函数式编程之集合操作
查看>>
pulseaudio的交叉编译
查看>>
Cracking The Coding Interview 1.1
查看>>
vb.net 浏览文件夹读取指定文件夹下的csv文件 并验证,显示错误信息
查看>>
NetworkInterface的使用
查看>>
元素自动居中显示
查看>>
JDBC 时间处理
查看>>