之前参与一个机票价格计算的项目,为他们设计了基本的处理流程,但是由于整个计算流程相当复杂,而且变化非常频繁,导致日常的修改、维护和升级也变得越来越麻烦,当我后来再接手的时候已经看不懂计算逻辑了。为了解决这个问题,我借鉴了“工作流”的思路,试图将整个计算过程设计成一个工作流。但是我又不想引入一个独立的工作流引擎,于是写了一个名为Pipelines的框架。顾名思义,Pipelines通过构建Pipeline的方式完成所需的处理流程,整个处理逻辑被分解并实现在若干Pipe中,这些Pipe按照指定的顺序将完成的Pipeline构建出来。Pipeline本质上就是一个简单的顺序工作流,它仅仅按序执行注册的Pipe。这个简单的Pipelines框架被放在这里,这里我不会介绍它的设计实现,只是简单地介绍它的用法,有兴趣的可以查看源代码。
一、构建并执行管道一、构建并执行管道二、Pipeline的“内部中断”三、Pipeline的“外部中断”四、处理层次化数据结构五、利用扩展方法时Pipeline构建更简洁
Pipelines旨在提供一个用于处理数据的顺序工作流或者管道(以下简称Pipeline),该Pipeline在一个强类型的上下文中被执行,管道可以利用此上下文得到需要处理的数据,并将处理的结果(含中间结果)存储在上下文中。接下来我们来演示如何利用Pipelines框架处理人口统计数据的实例。如下所示的两个类型分别表示人口统计数据和处理上下文,后者继承基类ContextBase。
(资料图片仅供参考)
public class PopulationData{ public object Statistics { get; set; } = default!;}public sealed class PopulationContext : ContextBase{ public PopulationContext(PopulationData data)=> Data = data; public PopulationData Data { get; }}
Pipeline由一系列Pipe对象按照注册的顺序组合而成。通过继承基类PipeBase
public sealed class FooPopulationPipe : PipeBase{ public override string Description => "Global PopulationProcessor Foo"; protected override void Invoke(PopulationContext context) =>Console.WriteLine($"{nameof(FooPopulationPipe)} is invoked.");}public sealed class BarPopulationPipe : PipeBase { public override string Description => "Global PopulationProcessor Bar"; protected override void Invoke(PopulationContext context) => Console.WriteLine($"{nameof(BarPopulationPipe)} is invoked.");}public sealed class BazPopulationPipe : PipeBase { public override string Description => "Global PopulationProcessor Baz"; protected override void Invoke(PopulationContext context) => Console.WriteLine($"{nameof(BazPopulationPipe)} is invoked.");}
我设计Pipelines的初衷是让每个参与者(包含非技术人员)在代码的频繁迭代过程中,可以清晰地了解当前的处理流程,所以我会将当前应用构建的所有Pipeline的处理流程导出来。基于这个目的,每个Pipe类型都需要利用其Description属性提供一段描述当前处理逻辑的文本。Pipe具体的处理逻辑实现在重写的Invoke方法中。如果涉及异步处理,需要继承更上层的基类Pipe
Pipeline的构建实现在如下所示的BuildPipelines方法中,我们利用该方法提供的IPipelineProvider对象注册了一个命名为“PopulationProcessor”的Pipeline。具体来说,我们调用的是它的AddPipeline
using App;using Artech.Pipelines;var builder = WebApplication.CreateBuilder(args);builder.Services.AddPipelines(BuildPipelines);var app = builder.Build();app.MapGet("/test", async (IPipelineProvider provider, HttpResponse response) => { Console.WriteLine("Execute PopulationProcessor pipeline"); var context = new PopulationContext(new PopulationData()); var pipeline = provider.GetPipeline("PopulationProcessor"); await pipeline.ProcessAsync(context); return Results.Ok();});app.Run();static void BuildPipelines(IPipelineProvider pipelineProvider){ pipelineProvider.AddPipeline ( name: "PopulationProcessor", setup: builder => builder .Use () .Use () .Use ());}
Pipelines框架涉及的服务通过IServiceCollection接口的AddPipelines方法进行注册,BuildPipelines方法转换成委托作为该方法的参数。我们注册了一个指向“/test” 的路由终结点来演示针对管道的执行。如代码片段所示,我们利用注入的IPipelineProvider对象根据注册名称得到具体的Pipeline对象,并创建出相应的PopulationContext上下文作为参数来执行此Pipeline对象。程序执行后,请求路径”/pipelines”可以得到一个Pipeline的列表,点击具体的链接,对应Pipeline体现的流程就会呈现出来。
如果请求路径“/test”来执行构建的管道,管道执行的轨迹将会体现在控制台的输出结果上。
二、Pipeline的“内部中断”构成Pipeline的每个Pipe都可以根据处理逻辑的需要立即中断管道的执行。在如下这个重写的BarPopulationPipe类型的Invoke方法中,如果生成的随机数为偶数,它会调用上下文对象的Abort方法立即终止Pipeline的执行。
public sealed class BarPopulationPipe : PipeBase{ private readonly Random _random = new(); public override string Description => "Global PopulationProcessor Bar"; protected override void Invoke(PopulationContext context) { Console.WriteLine($"{nameof(BarPopulationPipe)} is invoked."); if (_random.Next() % 2 == 0) { context.Abort();} }}
这样的化,当我们构建的Pipeline在执行过程中,有一半的几率BazPopulationPipe将不会执行,如下所示的输出结果体现了这一点。
对于继承自Pipe
public sealed class BarPopulationPipe : Pipe三、Pipeline的“外部中断”{ private readonly Random _random = new(); public override string Description => "Global PopulationProcessor Bar"; public override ValueTask InvokeAsync(PopulationContext context, Func next) { Console.WriteLine($"{nameof(BarPopulationPipe)} is invoked."); if (_random.Next() % 2 != 0) { return next(context);} return ValueTask.CompletedTask; }}
在调用Pipeline时,我们可以利用执行上下文提供的CancellationToken中止Pipeline的执行。我们按照如下的方式再次改写了BarPopulationPipe的执行逻辑,如下面的代码片段所示,我们不再调用Abort方法,而是选择延迟2秒执行后续操作。
public sealed class BarPopulationPipe : Pipe{ private readonly Random _random = new(); public override string Description => "Global PopulationProcessor Bar"; public override async ValueTask InvokeAsync(PopulationContext context, Func next) { Console.WriteLine($"{nameof(BarPopulationPipe)} is invoked."); if (_random.Next() % 2 != 0) { await Task.Delay(2000);} await next(context); }}
我们按照如下的方式重写了PopulationContext的CancellationToken属性。我们为构造函数添加了两个参数,一个代表当前HttpContext上下文,另一个表示设置的超时时限。CancellationToken根据这两个参数创建而成,意味着管道不仅具有默认的超时时间,也可以通过HTTP调用方中止执行。
public sealed class PopulationContext: ContextBase{ public PopulationContext(PopulationData data, HttpContext httpContext, TimeSpan timeout) { Data = data; CancellationToken = CancellationTokenSource.CreateLinkedTokenSource(httpContext.RequestAborted, new CancellationTokenSource(timeout).Token).Token; } public PopulationData Data { get; } public override CancellationToken CancellationToken { get; }}
在注册的终结点处理器中,我们在执行Pipeline之前,将作为参数传入的PopulationContext上下文的超时时间设置为1秒。
var builder = WebApplication.CreateBuilder(args);builder.Services.AddPipelines(BuildPipelines);var app = builder.Build();app.MapGet("/test", async (HttpContext httpContext,IPipelineProvider provider, HttpResponse response) => { Console.WriteLine("Execute PopulationProcessor pipeline"); var context = new PopulationContext(new PopulationData(), httpContext, TimeSpan.FromSeconds(1)); var pipeline = provider.GetPipeline("PopulationProcessor"); await pipeline.ProcessAsync(context); return Results.Ok();});app.Run();
根据BarPopulationPipe的执行逻辑,Pipeline的执行具有一半的几率会超时,一旦超时就会立即抛出一个OperationCancellationToken异常。
四、处理层次化数据结构Pipelines设计的主要目的是用来处理层次化的数据结构,这涉及到子Pipeline的应用。目前我们处理的人口数据体现为一个简单的数据类型,现在我们让它变得更复杂一些。假设我们需要处理国家、省份和城市三个等级的人口数据,其中StatePopulationData代表整个国家的人口数据,它的Provinces属性承载了每个省份的数据。ProvincePopulationData代表具体某个省份的人口数据,其Cities属性承载了每个城市的人口数据。
public class PopulationData{ public object Statistics { get; set; } = default!;}public class StatePopulationData{ public IDictionaryProvinces { get; set; } = default!;}public class ProvincePopulationData{ public IDictionary Cities { get; set; } = default!;}
现在我们需要构建一个Pipeline来处理通过StatePopulationData类型表示的整个国家的人口数据,具体的处理流程为:
利用FooStatePipe处理国家人口数据利用BarStatePipe处理国家人口数据构建子Pipeline处理每个省份人口数据,子Pipeline处理逻辑:利用FooProvincePipe处理省份人口数据 利用BarProvincePipe处理省份人口数据、构建子Pipeline处理每个城市人口数据,子Pipeline处理逻辑利用FooCityPipe处理城市人口数据利用BarCityPipe处理城市人口数据利用BazCityPipe处理城市人口数据利用BazProvincePipe处理省份人口数据利用BazStatePipe处理国家人口数据为此我们需要定义9个Pipe类型,以及3个执行上下文。如下所示的是三个执行上下文类型的具体定义:
public sealed class StatePopulationContext: ContextBase{ public StatePopulationData PopulationData { get; } public StatePopulationContext(StatePopulationData populationData) => PopulationData = populationData;}public sealed class ProvincePopulationContext : SubContextBase>{ public string Province { get; private set; } = default!; public IDictionary Cities { get; private set; } = default!; public override void Initialize(StatePopulationContext parent, KeyValuePair item) { Province = item.Key; Cities = item.Value.Cities; base.Initialize(parent, item); }}public sealed class CityPopulationContext: SubContextBase >{ public string City { get; private set; } = default!; public PopulationData PopulationData { get; private set; } = default!; public override void Initialize(ProvincePopulationContext parent, KeyValuePair item) { City = item.Key; PopulationData = item.Value; base.Initialize(parent, item);}}
9个对应的Pipe类型定义如下。每个类型利用重写的Description提供一个简单的描述,重写的Invoke方法输出当前怎样的数据(那个省/市的人口数据)。
public sealed class FooStatePipe : PipeBase用于构建这个Pipeline的BuildPipelines方法根据构建的Pipeline结构进行了如下的改写:子Pipeline通过IPipelineBuilder{ public override string Description => "State Population Processor Foo"; protected override void Invoke(StatePopulationContext context)=>Console.WriteLine("Foo: Process state population");}public sealed class BarStatePipe : PipeBase { public override string Description => "State Population Processor Bar"; protected override void Invoke(StatePopulationContext context) => Console.WriteLine("Bar: Process state population");}public sealed class BazStatePipe : PipeBase { public override string Description => "State Population Processor Baz"; protected override void Invoke(StatePopulationContext context) => Console.WriteLine("Baz: Process state population");}public sealed class FooProvincePipe : PipeBase { public override string Description => "Province Population Processor Foo"; protected override void Invoke(ProvincePopulationContext context) => Console.WriteLine($"\tFoo: Process population of the province {context.Province}");}public sealed class BarProvincePipe : PipeBase { public override string Description => "Province Population Processor Bar"; protected override void Invoke(ProvincePopulationContext context) => Console.WriteLine($"\tBar: Process population of the province {context.Province}");}public sealed class BazProvincePipe : PipeBase { public override string Description => "Province Population Processor Baz"; protected override void Invoke(ProvincePopulationContext context) => Console.WriteLine($"\tBaz: Process population of the province {context.Province}");}public sealed class FooCityPipe : PipeBase { public override string Description => "City Population Processor Foo"; protected override void Invoke(CityPopulationContext context) => Console.WriteLine($"\t\tFoo: Process population of the city {context.City}");}public sealed class BarCityPipe : PipeBase { public override string Description => "City Population Processor Bar"; protected override void Invoke(CityPopulationContext context) => Console.WriteLine($"\t\tBar: Process population of the city {context.City}");}public sealed class BazCityPipe : PipeBase { public override string Description => "City Population Processor Baz"; protected override void Invoke(CityPopulationContext context) => Console.WriteLine($"\t\tBaz: Process population of the city {context.City}");}
static void BuildPipelines(IPipelineProvider pipelineProvider){ pipelineProvider.AddPipeline(name: "PopulationProcessor", setup: builder => builder .Use () .Use () .ForEach >( description: "For each province", collectionAccessor: context => context.PopulationData.Provinces, subPipelineSetup: provinceBuilder => provinceBuilder .Use () .Use () .ForEach >( description: "For each city", collectionAccessor: context => context.Cities, subPipelineSetup: cityBuilder => cityBuilder .Use () .Use () .Use ()) .Use ()) .Use ());}
终结点处理程序在执行新的Pipeline时,会按照如下的形式将StatePopulationContext上下文构建出来。处理人口数据涉及三个省份(江苏、山东和浙江),每个省份包含三个城市的人口数据。
var builder = WebApplication.CreateBuilder(args);builder.Services.AddPipelines(BuildPipelines);var app = builder.Build();app.MapGet("/test", async (HttpContext httpContext, IPipelineProvider provider, HttpResponse response) => { Console.WriteLine("Execute PopulationProcessor pipeline"); var data = new StatePopulationData { Provinces = new Dictionary() }; data.Provinces.Add("Jiangsu", new ProvincePopulationData { Cities = new Dictionary { {"Suzhou", new PopulationData() }, {"Wuxi", new PopulationData() }, {"Changezhou", new PopulationData() }, } }); data.Provinces.Add("Shandong", new ProvincePopulationData { Cities = new Dictionary { {"Qingdao", new PopulationData() }, {"Jinan", new PopulationData() }, {"Yantai", new PopulationData() }, } }); data.Provinces.Add("Zhejiang", new ProvincePopulationData { Cities = new Dictionary { {"Hangzhou", new PopulationData() }, {"Ningbo", new PopulationData() }, {"Wenzhou", new PopulationData() }, } }); var context = new StatePopulationContext(data); var pipeline = provider.GetPipeline ("PopulationProcessor"); await pipeline.ProcessAsync(context); return Results.Ok();});app.Run();
应用启动后,我们依然可以从Pipeline导出页面看到整个Pipeline的处理流程。
当我们请求“/test”,Pipeline针对国家人口数据的执行流程体现在控制台输出上。
五、利用扩展方法时Pipeline构建更简洁Pipeline的构建过程体现了完整的处理流程,所以我们应该构建代码尽可能地简洁,最理想的状态就是让非技术人员也能看懂。Pipelines提供的用于注册Pipe的API均为泛型方法,并且会涉及两到三个必须显式指定的泛型参数,使用起来还不是很方便。不过这个问题可以通过自定义扩展方法来解决。
public static class Extensions{ public static IPipelineBuilderUseStatePipe (this IPipelineBuilder builder) where TPipe : Pipe => builder.Use (); public static IPipelineBuilder UseProvincePipe (this IPipelineBuilder builder) where TPipe : Pipe => builder.Use (); public static IPipelineBuilder UseCityPipe (this IPipelineBuilder builder) where TPipe : Pipe => builder.Use (); public static IPipelineBuilder ForEachProvince(this IPipelineBuilder builder, Action > setup) => builder.ForEach("For each province", it => it.PopulationData.Provinces, (_, _) => true, setup); public static IPipelineBuilder ForEachCity(this IPipelineBuilder builder, Action > setup) => builder.ForEach("For each city", it => it.Cities, (_, _) => true, setup);}
如上面的代码片段所示,我们针对三个数据层次(国家、省份、城市)定义了注册对应Pipe的扩展方法UseStatePipe、UseProvincePipe和UseCityPipe。还分别定义了ForEachProvince和ForEachCity这两个扩展方法来注册构建处理省份/城市人口数据的子Pipeline。有了这5个扩展方法,构建整个Pipeline的代码就可以变得非常简单而清晰,即使不写任何的注释,相信每个人(包括非开发人员)都能读懂涉及的处理流程。
static void BuildPipelines(IPipelineProvider pipelineProvider){ pipelineProvider.AddPipeline(name: "PopulationProcessor", setup: builder => builder .UseStatePipe () .UseStatePipe () .ForEachProvince(provinceBuilder => provinceBuilder .UseProvincePipe () .UseProvincePipe () .ForEachCity(cityBuilder => cityBuilder .UseCityPipe () .UseCityPipe () .UseCityPipe ()) .UseProvincePipe ()) .UseStatePipe ());}
标签:
仓储物流“成渝圈”如何乘势而上? 12月3日,连接昆明和万象的中老铁路全线开通运营,被惠及的显...
两件西周青铜簋时隔三千年成功配对 考古工作者介绍,这个铜簋的盖、身分别时隔40余年出土,纹饰...
“医保砍价”不是一个人在战斗 晁星 “我眼泪都快掉下来了”“每一个小群体都不该被放弃”…...
“购物成瘾”真的是一种病 刘艳 牛雅娟 本周日即将迎来“双十二”促销季,很多人又开始摩拳...
因迷恋山间风景,一男子在甘孜州稻城县海拔4000多米的无人区迷失方向,随后与同伴失联。12月的稻城...
嫌疑人DNA信息比中后,成都市公安局刑侦支队技术处DNA实验室民警白小刚一下坐在凳子上,恍惚迟疑间...
一批反映南京大屠杀历史的新书发布 新华社南京12月7日电(记者邱冰清、蒋芳)“以史为鉴,开创未来...
我在现场·照片背后的故事|电影《亲爱的》里面没有的结局,在我眼前“上映” 12月6日,在深圳市...
冥想?泡脚?不如听听助眠音乐 晚上睡不着,白天睡不醒,成为最贴合都市人群的“睡眠画像”。随...
养老话题 老年教育面临缺口 “终身教育”潜力无限 【现实挑战】“新老年”群体愿意在培养兴...
孙海洋被拐14年儿子如何找到的? 警方侦办另一宗拐骗儿童案时发现线索,通过人像比对、DNA确认找...
北京天文馆、圆明园将对未成年人免费开放 12月6日,北京天文馆发布通知称,12月8日起试行对未成...
今年全国粮食总产量再创新高 连续7年保持在1 3万亿斤以上 根据对全国31个省(区、市)的抽样调...
斑块软的很危险 硬的就无碍? 血管里的“垃圾”分类 赶快学起来! 一项最新研究显示:中国...
诺西那生钠注射液大幅降价 聚焦医保谈判背后脊髓性肌萎缩症家庭 医保目录公布那天 好多家长都...
抖音“窗花剪剪”遭抄袭 被判获赔20万元 法院认为“窗花剪剪”的这种表达方式理应受到《著作权...
公安机关近日侦破3起拐卖儿童案件 失散十几年 3组家庭终于团圆了 北京青年报记者12月6日从公...
2021年度十大网络用语发布 本报讯(记者 路艳霞)作为年度“汉语盘点”活动最具网络特色的组成部...
北京天文馆向未成年人免费开放 本报讯(记者 牛伟坤)北京天文馆对票价免费及优惠政策作出调整:1...
2021北京百个网红打卡地发布 本报讯(记者 李洋)2021北京网红打卡地推荐榜单昨晚正式发布。自然...