业务脚本:为什么说可编程订阅式缓存服务更有用?
我们已经习惯了使用缓存集群对数据做缓存。然而,这种常见的内存缓存服务存在诸多不便之处。首先,集群会独占大量的内存。这意味着在资源有限的情况下,可能会对其他系统资源的分配造成压力,影响整体系统的性能和稳定性。
其次,不能原子修改缓存的某一个字段。在一些对数据一致性要求较高的场景中,这可能会引发数据不一致的问题,从而影响业务的正常运行。
再者,多次通讯有网络损耗。尤其是在频繁进行数据交互的情况下,网络损耗可能会导致数据传输延迟增加,降低系统的响应速度。
另外,很多时候我们获取数据并不需要全部字段,但因为缓存不支持筛选,在批量获取数据的场景下性能就会下降很多。这种情况在数据量较大时尤为明显,会严重影响系统的效率。
而这些问题在读多写多的场景下,会更加突出。那么,有什么方式能够解决这些问题呢?
缓存即服务
可编程订阅式缓存服务意味着我们能够自行实现一个数据缓存服务,进而直接提供给业务服务使用。这种实现方式具有独特的优势,它可以依据业务的具体需求,主动地对数据进行缓存,并且能够提供一些数据整理以及计算的服务。
虽然自行实现这样的数据缓存服务过程较为繁琐,然而其带来的优势却是显著的。除了吞吐能力能够得到提升之外,我们还可以实现众多有趣的定制功能。比如,我们可以根据业务的特定逻辑对数据进行个性化的整理和优化,使其更符合业务的使用场景。
同时,它还具备更好的计算能力。这使得我们在处理数据时,能够更加高效地进行各种复杂的计算操作,为业务提供更强大的数据支持。
甚至,它可以让我们的缓存直接对外提供基础数据的查询服务。这样一来,业务在获取基础数据时无需再通过繁琐的流程从其他数据源获取,大大提高了数据获取的效率和便捷性,进一步提升了整个业务系统的性能和灵活性。
上图展示了一个自实现的缓存功能结构。不得不说,这种缓存的性能和效果更为出色,究其原因在于它对数据的处理方式与传统模式大相径庭。
在传统模式下,缓存服务并不会对数据进行任何加工处理,所保存的是系列化的字符串。在这种情况下,大部分的数据无法直接进行修改。当我们利用这种缓存对外提供服务时,业务服务不得不将所有数据取出至本地内存,随后进行遍历加工才能投入使用。
然而,可编程缓存却能够将数据结构化地存储在 map 中。相较于传统模式下序列化的字符串,它更为节省内存。更为便捷的是,我们的服务无需再从其他服务获取数据来进行计算,如此便能够节省大量网络交互所耗费的时间,因而非常适合应用于对实时要求极高的场景之中。
倘若我们的热数据量颇为庞大,那么可以结合 RocksDB 等嵌入式引擎,凭借有限的内存来为大量数据提供服务。除了常规的数据缓存服务之外,可编程缓存还具备诸多强大的功能,比如支持对缓存数据的筛选过滤、统计计算、查询、分片以及数据拼合。
在此,关于查询服务,我要补充说明一下。对于对外的服务,建议通过类似 Redis 的简单文本协议来提供服务,因为这样相较于 HTTP 协议,其性能会更为优越。
Lua 脚本引擎
虽然缓存提供业务服务能够提升业务的灵活度,然而这种方式也存在诸多缺点。其中最大的缺点便是业务修改后,我们需要重启服务才能够更新我们的逻辑。由于内存中存储了大量的数据,每重启一次,数据就需要经历繁琐的预热过程,同步代价极为高昂。
为此,我们需要对设计进行再次升级。在这种情况下,lua 脚本引擎不失为一个上佳的选择。lua 是一种小巧的嵌入式脚本语言,借助它能够实现一个高性能、可热更新的脚本服务,进而与嵌入的服务进行高效灵活的互动。
我绘制了一张示意图,用以描述如何通过 lua 脚本来具体实现可编程缓存服务:
上图所示,可以看到我们提供了 Kafka 消费、周期任务管理、内存缓存、多种数据格式支持、多种数据驱动适配这些服务。不仅仅如此,为了减少由于逻辑变更导致的服务经常重启的情况,我们还以性能损耗为代价,在缓存服务里嵌入了 lua 脚本引擎,借此实现动态更新业务的逻辑。lua 引擎使用起来很方便,我们结合后面这个实现例子看一看,这是一个 Go 语言写的嵌入 lua 实现,代码如下所示:
import "github.com/yuin/gopher-lua"
// VarChange 用于被lua调用的函数
func VarChange(L *lua.LState) int {
lv := L.ToInt(1) //获取调用函数的第一个参数,并且转成int
L.Push(lua.LNumber(lv * 2)) //将参数内容直接x2,并返回结果给lua
return 1 //返回结果参数个数
}
func main() {
L := lua.NewState() //新lua线程
defer L.Close() //程序执行完毕自动回收
// 注册lua脚本可调用函数
// 在lua内调用varChange函数会调用这里注册的Go函数 VarChange
L.SetGlobal("varChange", L.NewFunction(VarChange))
//直接加载lua脚本
//脚本内容为:
// print "hello world"
// print(varChange(20)) # lua中调用go声明的函数
if err := L.DoFile("hello.lua"); err != nil {
panic(err)
}
// 或者直接执行string内容
if err := L.DoString(`print("hello")`); err != nil {
panic(err)
}
}
// 执行后输出结果:
//hello world
//40
//hello
从这个例子里我们可以看出,lua 引擎是可以直接执行 lua 脚本的,而 lua 脚本可以和 Golang 所有注册的函数相互调用,并且可以相互传递交换变量。回想一下,我们做的是数据缓存服务,所以需要让 lua 能够获取修改服务内的缓存数据,那么,lua 是如何和嵌入的语言交换数据的呢?我们来看看两者相互调用交换的例子:
import (
"fmt"
"github.com/yuin/gopher-lua"
)
func main() {
L := lua.NewState()
defer L.Close()
//加载脚本
err := L.DoFile("vardouble.lua")
if err != nil {
panic(err)
}
// 调用lua脚本内函数
err = L.CallByParam(lua.P{
Fn: L.GetGlobal("varDouble"), //指定要调用的函数名
NRet: 1, // 指定返回值数量
Protect: true, // 错误返回error
}, lua.LNumber(15)) //支持多个参数
if err != nil {
panic(err)
}
//获取返回结果
ret := L.Get(-1)
//清理下,等待下次用
L.Pop(1)
//结果转下类型,方便输出
res, ok := ret.(lua.LNumber)
if !ok {
panic("unexpected result")
}
fmt.Println(res.String())
}
// 输出结果:
// 30
其中 vardouble.lua 内容为:
function varDouble(n)
return n * 2
end
过这个方式,lua 和 Golang 就可以相互交换数据和相互调用。对于这种缓存服务普遍要求性能很好,这时我们可以统一管理加载过 lua 的脚本及 LState 脚本对象的实例对象池,这样会更加方便,不用每调用一次 lua 就加载一次脚本,方便获取和使用多线程、多协程。
Lua 脚本统一管理
从前面所做的讲解当中,我们能够察觉到,在实际进行使用的时候,lua 会有许多实例在内存之中运行着。
为了能够对这些实例实现更为妥善的管理,并且进一步提升效率,我们最为理想的做法便是运用一个专门的脚本管理系统,来对所有 lua 的运行实例展开管理。通过这样的操作,便可以达成对脚本进行统一更新、实现编译缓存、完成资源调度以及对单例加以控制等一系列目标。
lua 脚本其自身特性是单线程的,不过它的体量非常轻,单个实例所造成的内存损耗大概仅为 144kb 左右。在一些服务当中,平常运行的时候甚至能够同时开启成百上千个 lua 实例。
倘若要提高服务的并行处理能力,我们可以选择启动多个协程,让每一个协程都能够独立去运行一个 lua 线程。
正是出于这样的需求,gopher-lua 库为我们提供了一种类似于线程池的实现方式。借助于这种方式,我们就不再需要频繁地去创建以及关闭 lua 了,其官方所给出的具体例子如下:
//保存lua的LState的池子
type lStatePool struct {
m sync.Mutex
saved []*lua.LState
}
// 获取一个LState
func (pl *lStatePool) Get() *lua.LState {
pl.m.Lock()
defer pl.m.Unlock()
n := len(pl.saved)
if n == 0 {
return pl.New()
}
x := pl.saved[n-1]
pl.saved = pl.saved[0 : n-1]
return x
}
//新建一个LState
func (pl *lStatePool) New() *lua.LState {
L := lua.NewState()
// setting the L up here.
// load scripts, set global variables, share channels, etc...
//在这里我们可以做一些初始化
return L
}
//把Lstate对象放回到池中,方便下次使用
func (pl *lStatePool) Put(L *lua.LState) {
pl.m.Lock()
defer pl.m.Unlock()
pl.saved = append(pl.saved, L)
}
//释放所有句柄
func (pl *lStatePool) Shutdown() {
for _, L := range pl.saved {
L.Close()
}
}
// Global LState pool
var luaPool = &lStatePool{
saved: make([]*lua.LState, 0, 4),
}
//协程内运行的任务
func MyWorker() {
//通过pool获取一个LState
L := luaPool.Get()
//任务执行完毕后,将LState放回pool
defer luaPool.Put(L)
// 这里可以用LState变量运行各种lua脚本任务
//例如 调用之前例子中的的varDouble函数
err = L.CallByParam(lua.P{
Fn: L.GetGlobal("varDouble"), //指定要调用的函数名
NRet: 1, // 指定返回值数量
Protect: true, // 错误返回error
}, lua.LNumber(15)) //这里支持多个参数
if err != nil {
panic(err) //仅供演示用,实际生产不推荐用panic
}
}
func main() {
defer luaPool.Shutdown()
go MyWorker() // 启动一个协程
go MyWorker() // 启动另外一个协程
/* etc... */
}
通过这个方式我们可以预先创建一批 LState,让它们加载好所有需要的 lua 脚本,当我们执行 lua 脚本时直接调用它们,即可对外服务,提高我们的资源复用率。
变量的交互
实际上,我们的数据既能够存储在 lua 当中,也可以存放在 Go 里面,然后通过相互调用的方式来获取对方所存储的数据。就我个人而言,更习惯把数据放在 Go 中进行封装处理,之后供 lua 来调用。之所以会这样选择,主要是因为这种做法相对更加规范,在管理方面也会比较便捷,毕竟脚本在运行过程中是会存在一定损耗的。
前面也曾提到过,我们会采用 struct 和 map 相互组合的方式来对一些数据进行处理,进而对外提供数据服务。那么,lua 和 Golang 之间究竟是如何实现像 struct 这一类数据的交换呢?
在这里,我选取了官方所提供的例子,并且还额外添加了大量的注释内容,其目的就是为了能够更好地帮助大家去理解这两者之间的数据交互过程。
// go用于交换的 struct
type Person struct {
Name string
}
//为这个类型定义个类型名称
const luaPersonTypeName = "person"
// 在LState对象中,声明这种类型,这个只会在初始化LState时执行一次
// Registers my person type to given L.
func registerPersonType(L *lua.LState) {
//在LState中声明这个类型
mt := L.NewTypeMetatable(luaPersonTypeName)
//指定 person 对应 类型type 标识
//这样 person在lua内就像一个 类声明
L.SetGlobal("person", mt)
// static attributes
// 在lua中定义person的静态方法
// 这句声明后 lua中调用person.new即可调用go的newPerson方法
L.SetField(mt, "new", L.NewFunction(newPerson))
// person new后创建的实例,在lua中是table类型,你可以把table理解为lua内的对象
// 下面这句主要是给 table定义一组methods方法,可以在lua中调用
// personMethods是个map[string]LGFunction
// 用来告诉lua,method和go函数的对应关系
L.SetField(mt, "__index", L.SetFuncs(L.NewTable(), personMethods))
}
// person 实例对象的所有method
var personMethods = map[string]lua.LGFunction{
"name": personGetSetName,
}
// Constructor
// lua内调用person.new时,会触发这个go函数
func newPerson(L *lua.LState) int {
//初始化go struct 对象 并设置name为 1
person := &Person{L.CheckString(1)}
// 创建一个lua userdata对象用于传递数据
// 一般 userdata包装的都是go的struct,table是lua自己的对象
ud := L.NewUserData()
ud.Value = person //将 go struct 放入对象中
// 设置这个lua对象类型为 person type
L.SetMetatable(ud, L.GetTypeMetatable(luaPersonTypeName))
// 将创建对象返回给lua
L.Push(ud)
//告诉lua脚本,返回了数据个数
return 1
}
// Checks whether the first lua argument is a *LUserData
// with *Person and returns this *Person.
func checkPerson(L *lua.LState) *Person {
//检测第一个参数是否为其他语言传递的userdata
ud := L.CheckUserData(1)
// 检测是否转换成功
if v, ok := ud.Value.(*Person); ok {
return v
}
L.ArgError(1, "person expected")
return nil
}
// Getter and setter for the Person#Name
func personGetSetName(L *lua.LState) int {
// 检测第一个栈,如果就只有一个那么就只有修改值参数
p := checkPerson(L)
if L.GetTop() == 2 {
//如果栈里面是两个,那么第二个是修改值参数
p.Name = L.CheckString(2)
//代表什么数据不返回,只是修改数据
return 0
}
//如果只有一个在栈,那么是获取name值操作,返回结果
L.Push(lua.LString(p.Name))
//告诉会返回一个参数
return 1
}
func main() {
// 创建一个lua LState
L := lua.NewState()
defer L.Close()
//初始化 注册
registerPersonType(L)
// 执行lua脚本
if err := L.DoString(`
//创建person,并设置他的名字
p = person.new("Steven")
print(p:name()) -- "Steven"
//修改他的名字
p:name("Nico")
print(p:name()) -- "Nico"
`); err != nil {
panic(err)
}
}
可以看到,我们通过 lua 脚本引擎就能很方便地完成相互调用和交换数据,从而实现很多实用的功能,甚至可以用少量数据直接写成 lua 脚本的方式来加载服务。
缓存预热与数据来源
在对 lua 有了一定了解之后,接下来我们一起探讨一下服务是如何加载数据的。
当服务启动之时,我们首先要做的就是将数据缓存加载到缓存当中,以此来完成缓存预热的操作。只有在数据全部加载完成之后,才会开放对外的 API 端口,从而正式对外提供服务。
在这个加载数据的过程中,如果引入了 lua 脚本的话,那么就能够在服务启动之际,针对不同格式的数据开展适配加工的工作。如此一来,数据的来源也会变得更加丰富多样。
通常情况下,常见的数据来源是由大数据挖掘周期所生成的全量数据离线文件。这些文件会通过 NFS 或者 HDFS 进行挂载操作,并且会定期进行刷新,以便加载最新的文件。这种通过挂载离线文件来获取数据的方式,比较适合那些数据量庞大且更新速度较为缓慢的数据。不过,它也存在一定的缺点,那就是在加载数据的时候需要对数据进行整理工作。要是情况较为复杂的话,比如对于 800M 大小的数据,可能就需要花费 1 至 10 分钟的时间才能够完成加载操作。
除了采用上述这种利用文件获取数据的方式之外,我们还可以在程序启动之后,通过扫描数据表的方式来恢复数据。但是这样做的话,数据库将会承受一定的压力,所以建议使用专门的从库来进行此项操作。而且需要注意的是,相较于通过磁盘离线文件获取数据的方式,这种扫描数据表的方式其加载速度会更慢一些。
前面所提及的那两种数据加载方式,在速度方面都存在一定的不足。接下来,我们还可以考虑另外一种做法,那就是将 RocksDB 嵌入到进程之中。通过这样的操作,能够极大幅度地提升我们的数据存储容量,进而实现内存与磁盘之间高性能的读取和写入操作。不过,这么做也是有代价的,那就是相对而言会使得查询性能出现一定程度的降低。
关于 RocksDB 的数据获取,我们可以借助大数据来生成符合 RocksDB 格式的数据库文件,然后将其拷贝给我们的服务,以便服务能够直接进行加载。采用这种方式的好处在于,它可以大幅减少系统在启动过程中整理以及加载数据所耗费的时间,从而能够实现更多的数据查询操作。
另外,倘若我们存在对于本地有关系数据进行查询的需求,那么还可以选择嵌入 SQLite 引擎。通过这个引擎,我们就能够开展各种各样的关系数据查询工作。对于 SQLite 的数据生成,同样也可以利用相关工具提前完成,生成之后便可以直接提供给我们的服务使用。但在这里需要特别注意的是,这个数据库的数据量最好不要超过 10 万条,不然的话,很有可能会导致服务出现卡顿的现象。
最后,在涉及离线文件加载的情况时,我们最好能够制作一个类似于 CheckSum 的文件。制作这个文件的目的在于,在加载文件之前,可以利用它来检查文件的完整性。因为我们所使用的是网络磁盘,所以不太能确定这个文件是否正在处于拷贝的过程当中,这就需要运用一些小技巧来确保我们的数据完整性。其中,最为简单粗暴的方式就是,在每次拷贝完毕之后,生成一个与原文件同名的文件,并且在这个新生成的文件内部记录一下它的 CheckSum 值,这样一来,便方便我们在加载文件之前进行校验操作。
离线文件在实际应用当中具有一定的优势,它能够帮助我们快速实现多个节点之间的数据共享以及数据的统一。倘若我们要求多个节点的数据要保持最终的一致性,那么就需要通过离线与同步订阅相结合的方式来实现数据的同步操作。
订阅式数据同步及启动同步
那么,咱们的数据究竟是怎样实现同步更新的呢?通常而言,我们的数据是从多个基础数据服务获取而来的。要是希望能够实时同步数据所发生的更改情况,一般会采用订阅 binlog 的方式,先将变更的相关信息同步到 Kafka 当中。接着,再凭借 Kafka 的分组消费功能,去通知那些分布在不同集群里面的缓存。
当收到消息变更的服务接收到相关通知后,就会触发 lua 脚本,进而依据脚本对数据展开同步更新的操作。通过 lua 脚本的运作,我们能够以触发式的方式同步更新其他与之相关的缓存。
比如说,当用户购买了一个商品的时候,我们就需要同步刷新与之相关的一些数据,像是他的积分情况、订单信息以及消息列表的个数等等,以此来确保各项数据的一致性和实时性。
周期任务
在谈及任务管理这个话题时,就不得不着重说一说周期任务了。周期任务在通常情况下,主要是被用于对数据统计进行刷新的工作。我们只要将周期任务和 lua 自定义逻辑脚本相互结合起来加以运用,便能够轻松实现定期开展统计的操作,这无疑是给我们的工作带来了更多的便利条件。
在执行定期任务或者进行延迟刷新的这个过程当中,较为常见的一种处理方式就是运用时间轮来对任务加以管理。通过采用这种方式,能够把定时任务转化为事件触发的形式,如此一来,就可以非常便捷地对内存当中那些有待触发的任务列表进行管理了,进而能够实现并行处理多个周期任务,也就无需再通过使用 sleep 循环这种方式去不断地进行查询操作了。要是您对时间轮的具体实现方式感兴趣的话,可以直接点击此处进行查看哦。
另外,前面也曾提到过,我们所用到的很多数据都是借助离线文件来进行批量更新的。假如是每一小时就更新一次数据的话,那么在这一小时之内新更新的数据就需要进行同步处理。一般而言,处理的方式是这样的:当我们的服务启动并加载离线文件的时候,要把这个离线文件生成的时间给保存下来,然后凭借这个保存下来的时间来对数据更新队列当中的消息进行过滤操作。等到我们的队列任务进度追赶至接近当前时间的时候,再开启对外提供数据的服务。
总结
在那些读多写多的服务当中,实时交互类的服务数量相当多,并且这类服务对于数据的实时性有着很高的要求。若是采用集中型缓存的方式,往往很难满足此类服务所提出的各项需求。
基于这样的情况,在行业当中,多数会选择通过服务自身内存中的数据来为实时交互服务提供支持。然而,这种做法存在一个明显的弊端,那就是维护起来特别麻烦,一旦服务重启,就必须要对数据进行恢复操作。
为了能够实现业务逻辑在无需重启的情况下就可以完成更新,在行业里通常会采用内嵌脚本的热更新方案。在众多的脚本引擎当中,较为常见且通用的就是 lua 脚本引擎了。lua 是一种非常流行且使用起来极为方便的脚本引擎,在行业领域中,许多知名的游戏以及各类服务都借助 lua 来实现高性能服务的定制化业务功能,像 Nginx、Redis 等就是如此。
当我们把 lua 和我们所定制的缓存服务相互结合起来的时候,便能够打造出很多强大的功能,以此来应对各种各样不同的场景需求。
由于 lua 具备十分节省内存的特性,所以我们可以在进程当中开启数量多达成千上万的 lua 小线程。甚至可以做到为每一个用户都配备一个 LState 线程,从而为客户端提供类似于状态机一样的服务。
通过运用上述所提到的这些方法,再将 lua 和静态语言之间进行数据交换以及相互调用,并且配合上我们所实施的任务管理以及各种各样的数据驱动方式,这样就能够构建出一个几乎可以应对所有情况的万能缓存服务了。
在此,推荐大家在一些小型项目当中亲自去实践一下上述的这些内容。相信通过这样的实践,会让大家从一个与以往不同的视角去重新审视那些已经习惯了的服务,如此一来,大家必定会从中获得更多的收获。