golang (4) golang 操作mongdb

时间:2021-07-09 16:09:22

1. 数据按照时间聚合操作

1.1 正常的数据结构
{
"_id" : ObjectId("5cac8d7b1202708adf5d4b64"),
"time" : ISODate("2019-04-09T20:18:03.308Z"),
"ip" : "10.10.23.2",
"metrics" : "bm",
"count" : NumberLong(3)
}
{
"_id" : ObjectId("5cac8d7c1202708adf5d4c75"),
"time" : ISODate("2019-04-09T20:18:04.062Z"),
"ip" : "10.13.23.2",
"metrics" : "bm",
"count" : NumberLong(0)
}
{
"_id" : ObjectId("5cac92a01202708adf613c88"),
"time" : ISODate("2019-04-09T20:40:00.024Z"),
"ip" : "10.13.23.2",
"metrics" : "bcc.proc.profile-bm",
"count" : NumberLong(0)
}
{
"_id" : ObjectId("5cac92a11202708adf613cb5"),
"time" : ISODate("2019-04-09T20:40:01.007Z"),
"ip" : "10.3.21.204",
"metrics" : "bcc.proc.profile-bm",
"count" : NumberLong(0)
}
1.2 mongo命令行查询

按照记录的时间指标,按照1分钟的纬度,进行统计聚合。

In the db.collection.aggregate method and db.aggregate method, pipeline stages appear in an array.

Documents pass through the stages in sequence.

aggregate函数中可以跟表达式,用到的表达式的如下所示:

$group Groups input documents by a specified identifier expression and applies the accumulator expression(s), if specified, to each group. Consumes all input documents and outputs one document per each distinct group. The output documents only contain the identifier field and, if specified, accumulated fields.

$match Filters the document stream to allow only matching documents to pass unmodified into the next pipeline stage. $match uses standard MongoDB queries. For each input document, outputs either one document (a match) or zero documents (no match).

$project Reshapes each document in the stream, such as by adding new fields or removing existing fields. For each input document, outputs one document.

$sort Reorders the document stream by a specified sort key. Only the order changes; the documents remain unmodified. For each input document, outputs one document.

db.count.aggregate(
{"$match":{ "time": {'$gte': ISODate("2019-04-09T14:00:00Z") ,'$lt':ISODate("2019-04-09T24:35:00Z")} }},
{"$group": { "_id": { "$subtract": [ { "$subtract": [ "$time", new Date("1970-01-01") ] }, { "$mod":[{"$subtract": ["$time", new Date("1970-01-01")]}, 1000 * 60 * 1 ]}]}, "total": {'$sum': '$count'}, } } ,
{"$project": { "_id": 0, 'datetime': {'$add': [new Date(0), '$_id']}, "total":1}},
{"$sort": { 'datetime': 1 }} )
1.3 golang mgo包查询:

golang 中mgo package中实现了mongo的聚合函数

func (c *Collection) Pipe(pipeline interface{}) *Pipe
For example:
pipe := collection.Pipe([]bson.M{{"$match": bson.M{"name": "Otavio"}}})
iter := pipe.Iter()

Pipe prepares a pipeline to aggregate. The pipeline document must be a slice built in terms of the aggregation framework language.

构造bson.M结构体,bson.M是map类型的定义。
这里主要解决三个问题:
1. 结构式的表达,整个结构式细分为子结构式
2. mongo $subtrace函数中需要的表达式为int类型的,主要将相应的数据类型转换为数值类型的
3. $mod表达式内部为表达式和数值的混合类型,用[]interface{}来表示, interface{}相当于C中的void*类型
m := []bson.M{}
type list []interface{}
date := time.Unix(0, 0)
sub := list{"$time", date}
sub_base := bson.M{"$subtract": sub} mode_list := list{sub_base, duration}
mode := bson.M{"$mod": mode_list} sub_array := []bson.M{}
sub_array = append(sub_array, sub_base)
sub_array = append(sub_array, mode) m = append(m, bson.M{"$match": bson.M{"time": bson.M{"$gte": bt, "$lte": et}, "ip": ip}})
m = append(m, bson.M{"$group": bson.M{"_id": bson.M{"$subtract": sub_array}, "count": bson.M{"$sum": "$count"}}}) add_list := list{date, "$_id"}
m = append(m, bson.M{"$project": bson.M{"_id": 0, "time": bson.M{"$add": add_list}, "count": 1}})
m = append(m, bson.M{"$sort": bson.M{"time": 1}})
count := make([]CountResult, 0)
err = count_db.Find(m, &count)

参考文档

  1. mongodb官方文档
  2. golang package