定义问题:
我们有这个物联网设备,每个设备都会向我们发送有关汽车位置的日志。我们要计算汽车在线行驶的距离!因此,无论何时出现日志(将其放入队列等之后),我们都会这样做:
type Delta struct {
DeviceId string
time int64
Distance float64
}
var LastLogs = make(map[string]FullLog)
var Distances = make(map[string]Delta)
func addLastLog(l FullLog) {
LastLogs[l.DeviceID] = l
}
func AddToLogPerDay(l FullLog) {
//mutex.Lock()
if val, ok := LastLogs[l.DeviceID]; ok {
if distance, exist := Distances[l.DeviceID]; exist {
x := computingDistance(val, l)
Distances[l.DeviceID] = Delta{
DeviceId: l.DeviceID,
time: distance.time + 1,
Distance: distance.Distance + x,
}
} else {
Distances[l.DeviceID] = Delta{
DeviceId: l.DeviceID,
time: 1,
Distance: 0,
}
}
}
addLastLog(l)
}
基本上使用实用程序功能计算距离!因此在Distances
中,每个设备ID都映射到一定的行程距离!现在这里是问题开始的地方:虽然将此距离添加到Distances map
,但我希望使用go例程将此数据放入数据库中,但是由于有许多设备和许多日志,因此对每个日志执行此查询的操作并不是一个好主意。因此,我需要每5秒钟执行一次操作,这意味着每5秒钟尝试清空添加到地图上的所有最近距离的列表。我写了这个函数:
func UpdateLogPerDayTable() {
for {
for _, distance := range Distances {
logs := model.HourPerDay{}
result := services.CarDBProvider.DB.Table(model.HourPerDay{}.TableName()).
Where("created_at >? AND device_id = ?", getCurrentData(), distance.DeviceId).
Find(&logs)
if result.Error != nil && !result.RecordNotFound() {
log.Infof("Something went wrong while checking the log: %v", result.Error)
} else {
if !result.RecordNotFound() {
logs.CountDistance = distance.Distance
logs.CountSecond = distance.time
err := services.CarDBProvider.DB.Model(&logs).
Update(map[string]interface{}{
"count_second": logs.CountSecond,
"count_distance": logs.CountDistance,
})
if err.Error != nil {
log.Infof("Something went wrong while updating the log: %v", err.Error)
}
} else if result.RecordNotFound() {
dayLog := model.HourPerDay{
Model: gorm.Model{},
DeviceId: distance.DeviceId,
CountSecond: int64(distance.time),
CountDistance: distance.Distance,
}
err := services.CarDBProvider.DB.Create(&dayLog)
if err.Error != nil {
log.Infof("Something went wrong while adding the log: %v", err.Error)
}
}
}
}
time.Sleep(time.Second * 5)
}
}
在另一个执行例程中称为go utlis.UpdateLogPerDayTable()
。但是这里有很多问题:
Distances
,因此,当我将其添加到另一个例程中时,我会在其他地方读取它,一切都OK!(问题是我想使用go通道并且没有任何想法怎么做)抱歉,如果我的解释还不够,但我确实需要一些帮助。专用于代码实现
Go在多个通道上使用for / select具有非常酷的模式。这样,您就可以使用超时和最大记录大小来批量写入距离。使用此模式需要使用通道。
第一件事是将距离建模为渠道:
distances := make(chan Delta)
然后跟踪当前批次
var deltas []Delta
然后
ticker := time.NewTicker(time.Second * 5)
var deltas []Delta
for {
select {
case <-ticker.C:
// 5 seconds up flush to db
// reset deltas
case d := <-distances:
deltas = append(deltas, d)
if len(deltas) >= maxDeltasPerFlush {
// flush
// reset deltas
}
}
}
我不知道如何保护距离,所以当我将其添加到另一个例行程序,我在其他地方读过,一切正常!(问题是我想使用转到频道,但不知道如何操作)
如果要保留地图并共享内存,则需要使用mutual exclusion (mutex)在go例程之间同步访问,以保护它。使用channel可以将副本发送到通道,而无需在Delta对象之间进行同步。根据您的体系结构,您还可以创建通过通道连接的go例程的管道,这样可以使仅一个go例程(monitor go routine)访问Delta
,从而也无需进行同步。
如何解决此问题?
使用通道作为将Deltas
传递到不同的go例程的原语:)
可能我将添加一个redis来存储所有或在线的设备,因此我可以更快地执行选择查询,而只是更新实际数据库。还添加了Redis的到期时间,因此如果设备未发送和数据一段时间,它消失了!我应该在哪里放置此代码?
这取决于您完成的架构。您可以为选择操作编写一个decorator,该操作将首先检查redis,然后转到DB。此功能的客户端不必知道这一点。可以用相同的方式执行写操作:写入持久性存储,然后使用缓存的值和到期时间写回redis。使用装饰器,客户端不需要知道这一点,他们只需执行读写操作,就可以在装饰器内部实现缓存逻辑。有很多方法可以实现,并且很大程度上取决于实现的解决位置。