Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4fd262beae | ||
|
|
239115ead8 | ||
|
|
02b3e275d0 | ||
|
|
9810e55a15 | ||
|
|
03e4ad5db5 | ||
|
|
eb94e50f02 | ||
|
|
26444905cf |
@@ -26,6 +26,7 @@ var (
|
|||||||
Name = "game_act"
|
Name = "game_act"
|
||||||
ActList = gset.New(true)
|
ActList = gset.New(true)
|
||||||
RunTimeMax *gtime.Time
|
RunTimeMax *gtime.Time
|
||||||
|
TaskMax int64 = 5
|
||||||
)
|
)
|
||||||
|
|
||||||
type sGameAct struct {
|
type sGameAct struct {
|
||||||
@@ -77,7 +78,7 @@ func (s *sGameAct) Info(uid int64, actId int) (data *g.Var, err error) {
|
|||||||
_, err = g.Redis().Set(ctx, keyCache, data)
|
_, err = g.Redis().Set(ctx, keyCache, data)
|
||||||
|
|
||||||
var CacheKey = fmt.Sprintf("act:update:%d", uid)
|
var CacheKey = fmt.Sprintf("act:update:%d", uid)
|
||||||
pkg.Cache("redis").Set(ctx, CacheKey, uid, time.Hour*24*1)
|
pkg.Cache("redis").Set(ctx, CacheKey, uid, time.Hour*24*3+time.Hour*12)
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -236,7 +237,7 @@ func (s *sGameAct) Save(ctx context.Context, actId int) (err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
//删除缓存
|
//删除缓存
|
||||||
go s.DelCacheKey(ctx, v.ActId, v.Uid)
|
s.DelCacheKey(ctx, v.ActId, v.Uid)
|
||||||
|
|
||||||
updateCount++
|
updateCount++
|
||||||
update = make([]*entity.GameAct, 0)
|
update = make([]*entity.GameAct, 0)
|
||||||
@@ -264,10 +265,10 @@ func (s *sGameAct) Save(ctx context.Context, actId int) (err error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, v2 := range add {
|
//for _, v2 := range add {
|
||||||
//删除缓存
|
// //删除缓存
|
||||||
go s.DelCacheKey(ctx, v2.ActId, v2.Uid)
|
// s.DelCacheKey(ctx, v2.ActId, v2.Uid)
|
||||||
}
|
//}
|
||||||
|
|
||||||
//g.Log().Debugf(ctx, "当前 %v 写入数据库: %v 条", actId, count)
|
//g.Log().Debugf(ctx, "当前 %v 写入数据库: %v 条", actId, count)
|
||||||
add = make([]*entity.GameAct, 0)
|
add = make([]*entity.GameAct, 0)
|
||||||
@@ -311,7 +312,7 @@ func (s *sGameAct) SavesV2() (err error) {
|
|||||||
go func() {
|
go func() {
|
||||||
scanErr := tools.Redis.RedisScanV2("act:*", func(keys []string) error {
|
scanErr := tools.Redis.RedisScanV2("act:*", func(keys []string) error {
|
||||||
if gtime.Now().After(RunTimeMax) {
|
if gtime.Now().After(RunTimeMax) {
|
||||||
return errors.New("Redis扫描超时")
|
return errors.New("redis扫描超时")
|
||||||
}
|
}
|
||||||
for _, key := range keys {
|
for _, key := range keys {
|
||||||
if keyErr := s.SaveV2(ctx, key, addChan, updateChan); keyErr != nil {
|
if keyErr := s.SaveV2(ctx, key, addChan, updateChan); keyErr != nil {
|
||||||
@@ -421,12 +422,18 @@ func (s *sGameAct) SaveV2(ctx context.Context, cacheKey string, addChan, updateC
|
|||||||
// @param update []*entity.GameAct: 更新数据
|
// @param update []*entity.GameAct: 更新数据
|
||||||
// @return err error: 返回错误信息
|
// @return err error: 返回错误信息
|
||||||
func (s *sGameAct) Cache2Sql(ctx context.Context, add, update []*entity.GameAct) {
|
func (s *sGameAct) Cache2Sql(ctx context.Context, add, update []*entity.GameAct) {
|
||||||
|
tx, err := g.DB().Begin(ctx)
|
||||||
|
if err != nil {
|
||||||
|
g.Log().Error(ctx, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
//批量写入数据库
|
//批量写入数据库
|
||||||
updateCount := 0
|
var updateCount int64
|
||||||
if len(update) > 0 {
|
if len(update) > 0 {
|
||||||
for _, v := range update {
|
for _, v := range update {
|
||||||
v.UpdatedAt = gtime.Now()
|
v.UpdatedAt = gtime.Now()
|
||||||
updateRes, err2 := g.Model(Name).Where(do.GameAct{
|
updateRes, err2 := tx.Model(Name).Where(do.GameAct{
|
||||||
Uid: v.Uid,
|
Uid: v.Uid,
|
||||||
ActId: v.ActId,
|
ActId: v.ActId,
|
||||||
}).Data(v).Update()
|
}).Data(v).Update()
|
||||||
@@ -438,18 +445,30 @@ func (s *sGameAct) Cache2Sql(ctx context.Context, add, update []*entity.GameAct)
|
|||||||
g.Log().Error(ctx, "本次更新为0,更新数据失败: %v", v)
|
g.Log().Error(ctx, "本次更新为0,更新数据失败: %v", v)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
updateCount++
|
||||||
|
if updateCount > TaskMax {
|
||||||
|
g.Log().Debugf(ctx, "act当前更新数据库: %v 条", updateCount)
|
||||||
|
err = tx.Commit()
|
||||||
|
if err != nil {
|
||||||
|
g.Log().Debugf(ctx, "act当前更新数据库失败:%v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
updateCount = 0
|
||||||
|
tx, err = g.DB().Begin(ctx)
|
||||||
|
}
|
||||||
//删除缓存
|
//删除缓存
|
||||||
s.DelCacheKey(ctx, v.ActId, v.Uid)
|
s.DelCacheKey(ctx, v.ActId, v.Uid)
|
||||||
updateCount++
|
|
||||||
}
|
}
|
||||||
g.Log().Debugf(ctx, "act当前更新数据库: %v 条", updateCount)
|
//循环结束了,最后写入一波
|
||||||
|
g.Log().Debugf(ctx, "Cache2Sql运行结束,act当前更新数据库: %v 条", updateCount)
|
||||||
update = (update)[:0]
|
update = (update)[:0]
|
||||||
}
|
}
|
||||||
|
|
||||||
var addCount int64
|
var addCount int64
|
||||||
if len(add) > 0 {
|
if len(add) > 0 {
|
||||||
for _, v := range add {
|
for _, v := range add {
|
||||||
addRes, err2 := g.Model(Name).Data(v).Insert()
|
addRes, err2 := tx.Model(Name).Data(v).Insert()
|
||||||
if err2 != nil {
|
if err2 != nil {
|
||||||
g.Log().Error(ctx, err2)
|
g.Log().Error(ctx, err2)
|
||||||
continue
|
continue
|
||||||
@@ -459,12 +478,26 @@ func (s *sGameAct) Cache2Sql(ctx context.Context, add, update []*entity.GameAct)
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
addCount++
|
addCount++
|
||||||
|
if addCount > TaskMax {
|
||||||
|
g.Log().Debugf(ctx, "超过%v条,act当前写入数据库: %v 条", TaskMax, addCount)
|
||||||
|
err = tx.Commit()
|
||||||
|
if err != nil {
|
||||||
|
g.Log().Debugf(ctx, "act当前写入数据库失败:%v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
addCount = 0
|
||||||
|
tx, err = g.DB().Begin(ctx)
|
||||||
|
}
|
||||||
//删除缓存
|
//删除缓存
|
||||||
s.DelCacheKey(ctx, v.ActId, v.Uid)
|
s.DelCacheKey(ctx, v.ActId, v.Uid)
|
||||||
}
|
}
|
||||||
g.Log().Debugf(ctx, "act当前写入数据库: %v 条", addCount)
|
|
||||||
|
//循环结束了,最后写入一波
|
||||||
|
g.Log().Debugf(ctx, "Cache2Sql运行结束,act当前写入数据库: %v 条", addCount)
|
||||||
add = (add)[:0]
|
add = (add)[:0]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err = tx.Commit()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -478,8 +511,14 @@ func (s *sGameAct) Cache2SqlChan(ctx context.Context, addChan, updateChan chan *
|
|||||||
addClosed := false
|
addClosed := false
|
||||||
updateClosed := false
|
updateClosed := false
|
||||||
|
|
||||||
|
tx, err := g.DB().Begin(ctx)
|
||||||
|
if err != nil {
|
||||||
|
g.Log().Error(ctx, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// 使用链式安全模式
|
// 使用链式安全模式
|
||||||
var db = g.Model(Name).Safe()
|
//var db = tx.Model(Name).Safe()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
//检查是否两个通道都已关闭且为空
|
//检查是否两个通道都已关闭且为空
|
||||||
@@ -493,7 +532,7 @@ func (s *sGameAct) Cache2SqlChan(ctx context.Context, addChan, updateChan chan *
|
|||||||
addClosed = true // 仅标记关闭,不立即日志
|
addClosed = true // 仅标记关闭,不立即日志
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
addRes, err2 := db.Data(v).Insert()
|
addRes, err2 := tx.Model(Name).Data(v).Insert()
|
||||||
if err2 != nil {
|
if err2 != nil {
|
||||||
g.Log().Error(ctx, err2)
|
g.Log().Error(ctx, err2)
|
||||||
continue
|
continue
|
||||||
@@ -504,6 +543,18 @@ func (s *sGameAct) Cache2SqlChan(ctx context.Context, addChan, updateChan chan *
|
|||||||
}
|
}
|
||||||
row, _ := addRes.RowsAffected()
|
row, _ := addRes.RowsAffected()
|
||||||
addCount += row
|
addCount += row
|
||||||
|
|
||||||
|
if addCount > TaskMax {
|
||||||
|
g.Log().Debugf(ctx, "超过%v条,act当前写入数据库: %v 条", TaskMax, addCount)
|
||||||
|
err = tx.Commit()
|
||||||
|
if err != nil {
|
||||||
|
g.Log().Debugf(ctx, "act当前写入数据库失败:%v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
addCount = 0
|
||||||
|
tx, err = g.DB().Begin(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
//删除缓存
|
//删除缓存
|
||||||
s.DelCacheKey(ctx, v.ActId, v.Uid)
|
s.DelCacheKey(ctx, v.ActId, v.Uid)
|
||||||
|
|
||||||
@@ -513,7 +564,7 @@ func (s *sGameAct) Cache2SqlChan(ctx context.Context, addChan, updateChan chan *
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
v.UpdatedAt = gtime.Now()
|
v.UpdatedAt = gtime.Now()
|
||||||
updateRes, err2 := db.Where(do.GameAct{
|
updateRes, err2 := tx.Model(Name).Where(do.GameAct{
|
||||||
Uid: v.Uid,
|
Uid: v.Uid,
|
||||||
ActId: v.ActId,
|
ActId: v.ActId,
|
||||||
}).Data(v).Update()
|
}).Data(v).Update()
|
||||||
@@ -525,9 +576,21 @@ func (s *sGameAct) Cache2SqlChan(ctx context.Context, addChan, updateChan chan *
|
|||||||
//g.Log().Error(ctx, "本次更新为0,更新数据失败: %v", v)
|
//g.Log().Error(ctx, "本次更新为0,更新数据失败: %v", v)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
updateCount++
|
||||||
|
|
||||||
|
if updateCount > TaskMax {
|
||||||
|
g.Log().Debugf(ctx, "超过%v条,act当前更新数据库: %v 条", TaskMax, updateCount)
|
||||||
|
err = tx.Commit()
|
||||||
|
if err != nil {
|
||||||
|
g.Log().Debugf(ctx, "act当前更新数据库失败:%v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
updateCount = 0
|
||||||
|
tx, err = g.DB().Begin(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
//删除缓存
|
//删除缓存
|
||||||
s.DelCacheKey(ctx, v.ActId, v.Uid)
|
s.DelCacheKey(ctx, v.ActId, v.Uid)
|
||||||
updateCount++
|
|
||||||
|
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
g.Log().Debug(ctx, "act协程被上下文取消")
|
g.Log().Debug(ctx, "act协程被上下文取消")
|
||||||
@@ -535,25 +598,28 @@ func (s *sGameAct) Cache2SqlChan(ctx context.Context, addChan, updateChan chan *
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err = tx.Commit()
|
||||||
// 仅在所有通道处理完毕后打印最终计数(移除中间冗余日志)
|
// 仅在所有通道处理完毕后打印最终计数(移除中间冗余日志)
|
||||||
g.Log().Debugf(ctx, "act当前写入数据库: %v 条", addCount)
|
g.Log().Debugf(ctx, "运行结束act当前写入数据库: %v 条", addCount)
|
||||||
g.Log().Debugf(ctx, "act当前更新数据库: %v 条", updateCount)
|
g.Log().Debugf(ctx, "运行结束act当前更新数据库: %v 条", updateCount)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// 删除缓存key
|
// 删除缓存key
|
||||||
func (s *sGameAct) DelCacheKey(ctx context.Context, aid int, uid int64) {
|
func (s *sGameAct) DelCacheKey(ctx context.Context, aid int, uid int64) {
|
||||||
//如果有活跃,跳过删除
|
go func() {
|
||||||
if getBool, _ := pkg.Cache("redis").
|
//如果有活跃,跳过删除
|
||||||
Contains(ctx, fmt.Sprintf("act:update:%d", uid)); getBool {
|
if getBool, _ := pkg.Cache("redis").
|
||||||
return
|
Contains(ctx, fmt.Sprintf("act:update:%d", uid)); getBool {
|
||||||
}
|
return
|
||||||
|
}
|
||||||
|
|
||||||
cacheKey := fmt.Sprintf("act:%v:%v", aid, uid)
|
cacheKey := fmt.Sprintf("act:%v:%v", aid, uid)
|
||||||
_, err := g.Redis().Del(ctx, cacheKey)
|
_, err := g.Redis().Del(ctx, cacheKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
g.Log().Error(ctx, err)
|
g.Log().Error(ctx, err)
|
||||||
}
|
}
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
// 清空GetRedDot缓存
|
// 清空GetRedDot缓存
|
||||||
|
|||||||
@@ -100,9 +100,6 @@ func safeProperty(property map[string]any) {
|
|||||||
if _, ok := v.(string); ok {
|
if _, ok := v.(string); ok {
|
||||||
property[k] = onlyWordRE.ReplaceAllString(gconv.String(v), "*")
|
property[k] = onlyWordRE.ReplaceAllString(gconv.String(v), "*")
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
property[k] = safePropertyRE.ReplaceAllString(gconv.String(v), "*")
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for _, delkey := range delkeys {
|
for _, delkey := range delkeys {
|
||||||
|
|||||||
@@ -117,9 +117,17 @@ func (r *F64CountRank) IncrScore(id int64, score int64) (curScore float64, err e
|
|||||||
// @param score
|
// @param score
|
||||||
// @return err
|
// @return err
|
||||||
func (r *F64CountRank) SetScore(id int64, score int) (err error) {
|
func (r *F64CountRank) SetScore(id int64, score int) (err error) {
|
||||||
|
//如果分数小于0,则删除
|
||||||
|
if score <= 0 {
|
||||||
|
err = r.DelScore(id)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
// 记录当前时间戳,用于更新成员的最新活动时间。
|
// 记录当前时间戳,用于更新成员的最新活动时间。
|
||||||
now := time.Now().UnixMilli()
|
now := time.Now().UnixMilli()
|
||||||
|
|
||||||
// 将成员的更新时间戳加入到Redis的有序集合中,确保成员的排序依据是最新的活动时间。
|
// 将成员的更新时间戳加入到Redis的有序集合中,确保成员的排序依据是最新的活动时间。
|
||||||
_, err = g.Redis().ZAdd(ctx, r.updateTs, &gredis.ZAddOption{}, gredis.ZAddMember{
|
_, err = g.Redis().ZAdd(ctx, r.updateTs, &gredis.ZAddOption{}, gredis.ZAddMember{
|
||||||
Score: float64(now),
|
Score: float64(now),
|
||||||
@@ -128,16 +136,10 @@ func (r *F64CountRank) SetScore(id int64, score int) (err error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
//如果分数小于0,则删除
|
|
||||||
if score <= 0 {
|
// 覆盖成员的分数
|
||||||
err = r.DelScore(id)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// 增加成员的分数,并返回增加后的当前分数。
|
|
||||||
_, err = g.Redis().ZAdd(ctx, r.name, &gredis.ZAddOption{}, gredis.ZAddMember{
|
_, err = g.Redis().ZAdd(ctx, r.name, &gredis.ZAddOption{}, gredis.ZAddMember{
|
||||||
Score: float64(score) + (3*1e13 - float64(now)) / 1e14,
|
Score: float64(score) + (3*1e13-float64(now))/1e14,
|
||||||
Member: id,
|
Member: id,
|
||||||
})
|
})
|
||||||
|
|
||||||
@@ -187,6 +189,35 @@ func (r *F64CountRank) DelScore(id int64) (err error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DelByStopRank 删除指定名次后的元素
|
||||||
|
func (r *F64CountRank) DelByStopRank(stop int64) (err error) {
|
||||||
|
// 初始化一个空的int64切片,用于存储指定排名范围内的元素。
|
||||||
|
var members []int64
|
||||||
|
|
||||||
|
// 使用Redis的ZRange命令获取指定排名范围内的元素。
|
||||||
|
// 选项Rev设置为true,表示按照分数从高到低的顺序返回元素。
|
||||||
|
get, err := g.Redis().ZRange(ctx, r.name, stop, 9999999,
|
||||||
|
gredis.ZRangeOption{
|
||||||
|
Rev: true,
|
||||||
|
})
|
||||||
|
|
||||||
|
// 使用Scan方法将获取到的元素扫描到members切片中。
|
||||||
|
err = get.Scan(&members)
|
||||||
|
// 如果扫描过程中出现错误,直接返回错误。
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// 遍历members切片,对于每个元素,使用ZRem命令从更新时间集合中删除对应的成员。
|
||||||
|
for _, member := range members {
|
||||||
|
_, err = g.Redis().ZRem(ctx, r.updateTs, member)
|
||||||
|
// 忽略ZRem操作的错误,因为即使元素不存在,ZRem也不会返回错误。
|
||||||
|
}
|
||||||
|
//删除超过9999的数据
|
||||||
|
g.Redis().ZRemRangeByRank(ctx, r.name, 0, -(stop + 1))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// DelByRank 根据排名范围删除元素。
|
// DelByRank 根据排名范围删除元素。
|
||||||
// 该方法使用了Redis的有序集合数据结构,通过ZRange和ZRemRangeByRank命令来实现。
|
// 该方法使用了Redis的有序集合数据结构,通过ZRange和ZRemRangeByRank命令来实现。
|
||||||
// 参数start和stop定义了要删除的排名范围,从start到stop(包括start和stop)。
|
// 参数start和stop定义了要删除的排名范围,从start到stop(包括start和stop)。
|
||||||
|
|||||||
Reference in New Issue
Block a user